*/ protected array $readEvents = []; /** * All listeners for write event. * @var array */ protected array $writeEvents = []; /** * All listeners for signal. * @var array */ protected array $signalListener = []; /** * @var ?callable */ protected $errorHandler = null; /** * Get timer count. * * @return integer */ public function getTimerCount(): int { return count($this->eventTimer); } /** * {@inheritdoc} * @throws Throwable */ public function delay(float $delay, callable $func, array $args = []): int { $t = (int)($delay * 1000); $t = max($t, 1); $that = $this; $coroutine = Coroutine::run(function () use ($t, $func, $args, $that): void { msleep($t); unset($this->eventTimer[Coroutine::getCurrent()->getId()]); try { $func(...$args); } catch (Throwable $e) { $that->error($e); } }); $timerId = $coroutine->getId(); $this->eventTimer[$timerId] = $timerId; return $timerId; } /** * {@inheritdoc} * @throws Throwable */ public function repeat(float $interval, callable $func, array $args = []): int { $t = (int)($interval * 1000); $t = max($t, 1); $that = $this; $coroutine = Coroutine::run(static function () use ($t, $func, $args, $that): void { while (true) { msleep($t); try { $func(...$args); } catch (Throwable $e) { $that->error($e); } } }); $timerId = $coroutine->getId(); $this->eventTimer[$timerId] = $timerId; return $timerId; } /** * {@inheritdoc} */ public function offDelay(int $timerId): bool { if (isset($this->eventTimer[$timerId])) { try { (Coroutine::getAll()[$timerId])->kill(); return true; } finally { unset($this->eventTimer[$timerId]); } } return false; } /** * {@inheritdoc} */ public function offRepeat(int $timerId): bool { return $this->offDelay($timerId); } /** * {@inheritdoc} */ public function deleteAllTimer(): void { foreach ($this->eventTimer as $timerId) { $this->offDelay($timerId); } } /** * {@inheritdoc} */ public function onReadable($stream, callable $func): void { $fd = (int)$stream; if (isset($this->readEvents[$fd])) { $this->offReadable($stream); } Coroutine::run(function () use ($stream, $func, $fd): void { try { $this->readEvents[$fd] = Coroutine::getCurrent(); while (true) { if (!is_resource($stream)) { $this->offReadable($stream); break; } $rEvent = stream_poll_one($stream, STREAM_POLLIN | STREAM_POLLHUP); if (!isset($this->readEvents[$fd]) || $this->readEvents[$fd] !== Coroutine::getCurrent()) { break; } if ($rEvent !== STREAM_POLLNONE) { $func($stream); } if ($rEvent !== STREAM_POLLIN) { $this->offReadable($stream); break; } } } catch (RuntimeException) { $this->offReadable($stream); } }); } /** * {@inheritdoc} */ public function offReadable($stream): bool { // 在当前协程执行 $coroutine->kill() 会导致不可预知问题,所以没有使用$coroutine->kill() $fd = (int)$stream; if (isset($this->readEvents[$fd])) { unset($this->readEvents[$fd]); return true; } return false; } /** * {@inheritdoc} */ public function onWritable($stream, callable $func): void { $fd = (int)$stream; if (isset($this->writeEvents[$fd])) { $this->offWritable($stream); } Coroutine::run(function () use ($stream, $func, $fd): void { try { $this->writeEvents[$fd] = Coroutine::getCurrent(); while (true) { $rEvent = stream_poll_one($stream, STREAM_POLLOUT | STREAM_POLLHUP); if (!isset($this->writeEvents[$fd]) || $this->writeEvents[$fd] !== Coroutine::getCurrent()) { break; } if ($rEvent !== STREAM_POLLNONE) { $func($stream); } if ($rEvent !== STREAM_POLLOUT) { $this->offWritable($stream); break; } } } catch (RuntimeException) { $this->offWritable($stream); } }); } /** * {@inheritdoc} */ public function offWritable($stream): bool { $fd = (int)$stream; if (isset($this->writeEvents[$fd])) { unset($this->writeEvents[$fd]); return true; } return false; } /** * {@inheritdoc} */ public function onSignal(int $signal, callable $func): void { Coroutine::run(function () use ($signal, $func): void { $this->signalListener[$signal] = Coroutine::getCurrent(); while (1) { try { Signal::wait($signal); if (!isset($this->signalListener[$signal]) || $this->signalListener[$signal] !== Coroutine::getCurrent()) { break; } $func($signal); } catch (SignalException) { } } }); } /** * {@inheritdoc} */ public function offSignal(int $signal): bool { if (!isset($this->signalListener[$signal])) { return false; } unset($this->signalListener[$signal]); return true; } /** * {@inheritdoc} */ public function run(): void { waitAll(); } /** * Destroy loop. * * @return void */ public function stop(): void { Coroutine::killAll(); } /** * {@inheritdoc} */ public function setErrorHandler(callable $errorHandler): void { $this->errorHandler = $errorHandler; } /** * {@inheritdoc} */ public function getErrorHandler(): ?callable { return $this->errorHandler; } /** * @param Throwable $e * @return void * @throws Throwable */ public function error(Throwable $e): void { if (!$this->errorHandler) { throw new $e; } ($this->errorHandler)($e); } }