Swow.php 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. <?php
  2. declare(strict_types=1);
  3. namespace Workerman\Events;
  4. use RuntimeException;
  5. use Swow\Coroutine;
  6. use Swow\Signal;
  7. use Swow\SignalException;
  8. use Throwable;
  9. use function Swow\Sync\waitAll;
  10. class Swow implements EventInterface
  11. {
  12. /**
  13. * All listeners for read timer
  14. * @var array
  15. */
  16. protected array $eventTimer = [];
  17. /**
  18. * All listeners for read event.
  19. * @var array<Coroutine>
  20. */
  21. protected array $readEvents = [];
  22. /**
  23. * All listeners for write event.
  24. * @var array<Coroutine>
  25. */
  26. protected array $writeEvents = [];
  27. /**
  28. * All listeners for signal.
  29. * @var array<Coroutine>
  30. */
  31. protected array $signalListener = [];
  32. /**
  33. * @var ?callable
  34. */
  35. protected $errorHandler = null;
  36. /**
  37. * Get timer count.
  38. *
  39. * @return integer
  40. */
  41. public function getTimerCount(): int
  42. {
  43. return count($this->eventTimer);
  44. }
  45. /**
  46. * {@inheritdoc}
  47. * @throws Throwable
  48. */
  49. public function delay(float $delay, callable $func, array $args = []): int
  50. {
  51. $t = (int)($delay * 1000);
  52. $t = max($t, 1);
  53. $that = $this;
  54. $coroutine = Coroutine::run(function () use ($t, $func, $args, $that): void {
  55. msleep($t);
  56. unset($this->eventTimer[Coroutine::getCurrent()->getId()]);
  57. try {
  58. $func(...$args);
  59. } catch (Throwable $e) {
  60. $that->error($e);
  61. }
  62. });
  63. $timerId = $coroutine->getId();
  64. $this->eventTimer[$timerId] = $timerId;
  65. return $timerId;
  66. }
  67. /**
  68. * {@inheritdoc}
  69. * @throws Throwable
  70. */
  71. public function repeat(float $interval, callable $func, array $args = []): int
  72. {
  73. $t = (int)($interval * 1000);
  74. $t = max($t, 1);
  75. $that = $this;
  76. $coroutine = Coroutine::run(static function () use ($t, $func, $args, $that): void {
  77. while (true) {
  78. msleep($t);
  79. try {
  80. $func(...$args);
  81. } catch (Throwable $e) {
  82. $that->error($e);
  83. }
  84. }
  85. });
  86. $timerId = $coroutine->getId();
  87. $this->eventTimer[$timerId] = $timerId;
  88. return $timerId;
  89. }
  90. /**
  91. * {@inheritdoc}
  92. */
  93. public function offDelay(int $timerId): bool
  94. {
  95. if (isset($this->eventTimer[$timerId])) {
  96. try {
  97. (Coroutine::getAll()[$timerId])->kill();
  98. return true;
  99. } finally {
  100. unset($this->eventTimer[$timerId]);
  101. }
  102. }
  103. return false;
  104. }
  105. /**
  106. * {@inheritdoc}
  107. */
  108. public function offRepeat(int $timerId): bool
  109. {
  110. return $this->offDelay($timerId);
  111. }
  112. /**
  113. * {@inheritdoc}
  114. */
  115. public function deleteAllTimer(): void
  116. {
  117. foreach ($this->eventTimer as $timerId) {
  118. $this->offDelay($timerId);
  119. }
  120. }
  121. /**
  122. * {@inheritdoc}
  123. */
  124. public function onReadable($stream, callable $func): void
  125. {
  126. $fd = (int)$stream;
  127. if (isset($this->readEvents[$fd])) {
  128. $this->offReadable($stream);
  129. }
  130. Coroutine::run(function () use ($stream, $func, $fd): void {
  131. try {
  132. $this->readEvents[$fd] = Coroutine::getCurrent();
  133. while (true) {
  134. if (!is_resource($stream)) {
  135. $this->offReadable($stream);
  136. break;
  137. }
  138. $rEvent = stream_poll_one($stream, STREAM_POLLIN | STREAM_POLLHUP);
  139. if (!isset($this->readEvents[$fd]) || $this->readEvents[$fd] !== Coroutine::getCurrent()) {
  140. break;
  141. }
  142. if ($rEvent !== STREAM_POLLNONE) {
  143. $func($stream);
  144. }
  145. if ($rEvent !== STREAM_POLLIN) {
  146. $this->offReadable($stream);
  147. break;
  148. }
  149. }
  150. } catch (RuntimeException) {
  151. $this->offReadable($stream);
  152. }
  153. });
  154. }
  155. /**
  156. * {@inheritdoc}
  157. */
  158. public function offReadable($stream): bool
  159. {
  160. // 在当前协程执行 $coroutine->kill() 会导致不可预知问题,所以没有使用$coroutine->kill()
  161. $fd = (int)$stream;
  162. if (isset($this->readEvents[$fd])) {
  163. unset($this->readEvents[$fd]);
  164. return true;
  165. }
  166. return false;
  167. }
  168. /**
  169. * {@inheritdoc}
  170. */
  171. public function onWritable($stream, callable $func): void
  172. {
  173. $fd = (int)$stream;
  174. if (isset($this->writeEvents[$fd])) {
  175. $this->offWritable($stream);
  176. }
  177. Coroutine::run(function () use ($stream, $func, $fd): void {
  178. try {
  179. $this->writeEvents[$fd] = Coroutine::getCurrent();
  180. while (true) {
  181. $rEvent = stream_poll_one($stream, STREAM_POLLOUT | STREAM_POLLHUP);
  182. if (!isset($this->writeEvents[$fd]) || $this->writeEvents[$fd] !== Coroutine::getCurrent()) {
  183. break;
  184. }
  185. if ($rEvent !== STREAM_POLLNONE) {
  186. $func($stream);
  187. }
  188. if ($rEvent !== STREAM_POLLOUT) {
  189. $this->offWritable($stream);
  190. break;
  191. }
  192. }
  193. } catch (RuntimeException) {
  194. $this->offWritable($stream);
  195. }
  196. });
  197. }
  198. /**
  199. * {@inheritdoc}
  200. */
  201. public function offWritable($stream): bool
  202. {
  203. $fd = (int)$stream;
  204. if (isset($this->writeEvents[$fd])) {
  205. unset($this->writeEvents[$fd]);
  206. return true;
  207. }
  208. return false;
  209. }
  210. /**
  211. * {@inheritdoc}
  212. */
  213. public function onSignal(int $signal, callable $func): void
  214. {
  215. Coroutine::run(function () use ($signal, $func): void {
  216. $this->signalListener[$signal] = Coroutine::getCurrent();
  217. while (1) {
  218. try {
  219. Signal::wait($signal);
  220. if (!isset($this->signalListener[$signal]) ||
  221. $this->signalListener[$signal] !== Coroutine::getCurrent()) {
  222. break;
  223. }
  224. $func($signal);
  225. } catch (SignalException) {
  226. }
  227. }
  228. });
  229. }
  230. /**
  231. * {@inheritdoc}
  232. */
  233. public function offSignal(int $signal): bool
  234. {
  235. if (!isset($this->signalListener[$signal])) {
  236. return false;
  237. }
  238. unset($this->signalListener[$signal]);
  239. return true;
  240. }
  241. /**
  242. * {@inheritdoc}
  243. */
  244. public function run(): void
  245. {
  246. waitAll();
  247. }
  248. /**
  249. * Destroy loop.
  250. *
  251. * @return void
  252. */
  253. public function stop(): void
  254. {
  255. Coroutine::killAll();
  256. }
  257. /**
  258. * {@inheritdoc}
  259. */
  260. public function setErrorHandler(callable $errorHandler): void
  261. {
  262. $this->errorHandler = $errorHandler;
  263. }
  264. /**
  265. * {@inheritdoc}
  266. */
  267. public function getErrorHandler(): ?callable
  268. {
  269. return $this->errorHandler;
  270. }
  271. /**
  272. * @param Throwable $e
  273. * @return void
  274. * @throws Throwable
  275. */
  276. public function error(Throwable $e): void
  277. {
  278. if (!$this->errorHandler) {
  279. throw new $e;
  280. }
  281. ($this->errorHandler)($e);
  282. }
  283. }