Swow.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. <?php
  2. declare(strict_types=1);
  3. namespace Workerman\Events;
  4. use RuntimeException;
  5. use Swow\Coroutine;
  6. use Swow\Signal;
  7. use Swow\SignalException;
  8. use Throwable;
  9. use function Swow\Sync\waitAll;
  10. class Swow implements EventInterface
  11. {
  12. /**
  13. * All listeners for read timer
  14. * @var array
  15. */
  16. protected array $eventTimer = [];
  17. /**
  18. * All listeners for read event.
  19. * @var array<Coroutine>
  20. */
  21. protected array $readEvents = [];
  22. /**
  23. * All listeners for write event.
  24. * @var array<Coroutine>
  25. */
  26. protected array $writeEvents = [];
  27. /**
  28. * All listeners for signal.
  29. * @var array<Coroutine>
  30. */
  31. protected array $signalListener = [];
  32. /**
  33. * @var ?callable
  34. */
  35. protected $errorHandler = null;
  36. /**
  37. * Get timer count.
  38. *
  39. * @return integer
  40. */
  41. public function getTimerCount(): int
  42. {
  43. return count($this->eventTimer);
  44. }
  45. /**
  46. * {@inheritdoc}
  47. * @throws Throwable
  48. */
  49. public function delay(float $delay, callable $func, array $args = []): int
  50. {
  51. $t = (int)($delay * 1000);
  52. $t = max($t, 1);
  53. $that = $this;
  54. $coroutine = Coroutine::run(function () use ($t, $func, $args, $that): void {
  55. msleep($t);
  56. unset($this->eventTimer[Coroutine::getCurrent()->getId()]);
  57. try {
  58. $func(...$args);
  59. } catch (Throwable $e) {
  60. $that->error($e);
  61. }
  62. });
  63. $timerId = $coroutine->getId();
  64. $this->eventTimer[$timerId] = $timerId;
  65. return $timerId;
  66. }
  67. /**
  68. * {@inheritdoc}
  69. * @throws Throwable
  70. */
  71. public function repeat(float $interval, callable $func, array $args = []): int
  72. {
  73. $t = (int)($interval * 1000);
  74. $t = max($t, 1);
  75. $that = $this;
  76. $coroutine = Coroutine::run(static function () use ($t, $func, $args, $that): void {
  77. // @phpstan-ignore-next-line While loop condition is always true.
  78. while (true) {
  79. msleep($t);
  80. try {
  81. $func(...$args);
  82. } catch (Throwable $e) {
  83. $that->error($e);
  84. }
  85. }
  86. });
  87. $timerId = $coroutine->getId();
  88. $this->eventTimer[$timerId] = $timerId;
  89. return $timerId;
  90. }
  91. /**
  92. * {@inheritdoc}
  93. */
  94. public function offDelay(int $timerId): bool
  95. {
  96. if (isset($this->eventTimer[$timerId])) {
  97. try {
  98. (Coroutine::getAll()[$timerId])->kill();
  99. return true;
  100. } finally {
  101. unset($this->eventTimer[$timerId]);
  102. }
  103. }
  104. return false;
  105. }
  106. /**
  107. * {@inheritdoc}
  108. */
  109. public function offRepeat(int $timerId): bool
  110. {
  111. return $this->offDelay($timerId);
  112. }
  113. /**
  114. * {@inheritdoc}
  115. */
  116. public function deleteAllTimer(): void
  117. {
  118. foreach ($this->eventTimer as $timerId) {
  119. $this->offDelay($timerId);
  120. }
  121. }
  122. /**
  123. * {@inheritdoc}
  124. */
  125. public function onReadable($stream, callable $func): void
  126. {
  127. $fd = (int)$stream;
  128. if (isset($this->readEvents[$fd])) {
  129. $this->offReadable($stream);
  130. }
  131. Coroutine::run(function () use ($stream, $func, $fd): void {
  132. try {
  133. $this->readEvents[$fd] = Coroutine::getCurrent();
  134. while (true) {
  135. if (!is_resource($stream)) {
  136. $this->offReadable($stream);
  137. break;
  138. }
  139. $rEvent = stream_poll_one($stream, STREAM_POLLIN | STREAM_POLLHUP);
  140. if (!isset($this->readEvents[$fd]) || $this->readEvents[$fd] !== Coroutine::getCurrent()) {
  141. break;
  142. }
  143. if ($rEvent !== STREAM_POLLNONE) {
  144. $func($stream);
  145. }
  146. if ($rEvent !== STREAM_POLLIN) {
  147. $this->offReadable($stream);
  148. break;
  149. }
  150. }
  151. } catch (RuntimeException) {
  152. $this->offReadable($stream);
  153. }
  154. });
  155. }
  156. /**
  157. * {@inheritdoc}
  158. */
  159. public function offReadable($stream): bool
  160. {
  161. // 在当前协程执行 $coroutine->kill() 会导致不可预知问题,所以没有使用$coroutine->kill()
  162. $fd = (int)$stream;
  163. if (isset($this->readEvents[$fd])) {
  164. unset($this->readEvents[$fd]);
  165. return true;
  166. }
  167. return false;
  168. }
  169. /**
  170. * {@inheritdoc}
  171. */
  172. public function onWritable($stream, callable $func): void
  173. {
  174. $fd = (int)$stream;
  175. if (isset($this->writeEvents[$fd])) {
  176. $this->offWritable($stream);
  177. }
  178. Coroutine::run(function () use ($stream, $func, $fd): void {
  179. try {
  180. $this->writeEvents[$fd] = Coroutine::getCurrent();
  181. while (true) {
  182. $rEvent = stream_poll_one($stream, STREAM_POLLOUT | STREAM_POLLHUP);
  183. if (!isset($this->writeEvents[$fd]) || $this->writeEvents[$fd] !== Coroutine::getCurrent()) {
  184. break;
  185. }
  186. if ($rEvent !== STREAM_POLLNONE) {
  187. $func($stream);
  188. }
  189. if ($rEvent !== STREAM_POLLOUT) {
  190. $this->offWritable($stream);
  191. break;
  192. }
  193. }
  194. } catch (RuntimeException) {
  195. $this->offWritable($stream);
  196. }
  197. });
  198. }
  199. /**
  200. * {@inheritdoc}
  201. */
  202. public function offWritable($stream): bool
  203. {
  204. $fd = (int)$stream;
  205. if (isset($this->writeEvents[$fd])) {
  206. unset($this->writeEvents[$fd]);
  207. return true;
  208. }
  209. return false;
  210. }
  211. /**
  212. * {@inheritdoc}
  213. */
  214. public function onSignal(int $signal, callable $func): void
  215. {
  216. Coroutine::run(function () use ($signal, $func): void {
  217. $this->signalListener[$signal] = Coroutine::getCurrent();
  218. while (1) {
  219. try {
  220. Signal::wait($signal);
  221. if (!isset($this->signalListener[$signal]) ||
  222. $this->signalListener[$signal] !== Coroutine::getCurrent()) {
  223. break;
  224. }
  225. $func($signal);
  226. } catch (SignalException) {
  227. }
  228. }
  229. });
  230. }
  231. /**
  232. * {@inheritdoc}
  233. */
  234. public function offSignal(int $signal): bool
  235. {
  236. if (!isset($this->signalListener[$signal])) {
  237. return false;
  238. }
  239. unset($this->signalListener[$signal]);
  240. return true;
  241. }
  242. /**
  243. * {@inheritdoc}
  244. */
  245. public function run(): void
  246. {
  247. waitAll();
  248. }
  249. /**
  250. * Destroy loop.
  251. *
  252. * @return void
  253. */
  254. public function stop(): void
  255. {
  256. Coroutine::killAll();
  257. }
  258. /**
  259. * {@inheritdoc}
  260. */
  261. public function setErrorHandler(callable $errorHandler): void
  262. {
  263. $this->errorHandler = $errorHandler;
  264. }
  265. /**
  266. * {@inheritdoc}
  267. */
  268. public function getErrorHandler(): ?callable
  269. {
  270. return $this->errorHandler;
  271. }
  272. /**
  273. * @param Throwable $e
  274. * @return void
  275. * @throws Throwable
  276. */
  277. public function error(Throwable $e): void
  278. {
  279. if (!$this->errorHandler) {
  280. throw new $e;
  281. }
  282. ($this->errorHandler)($e);
  283. }
  284. }