* @copyright walkor * @link http://www.workerman.net/ * @license http://www.opensource.org/licenses/mit-license.php MIT License */ namespace Workerman\Events; use Throwable; use Workerman\Worker; /** * select eventloop */ class Select implements EventInterface { /** * All listeners for read/write event. * * @var array */ protected $readEvents = []; /** * All listeners for read/write event. * * @var array */ protected $writeEvents = []; /** * @var array */ protected $exceptEvents = []; /** * Event listeners of signal. * * @var array */ protected $signalEvents = []; /** * Fds waiting for read event. * * @var array */ protected $readFds = []; /** * Fds waiting for write event. * * @var array */ protected $writeFds = []; /** * Fds waiting for except event. * * @var array */ protected $exceptFds = []; /** * Timer scheduler. * {['data':timer_id, 'priority':run_timestamp], ..} * * @var \SplPriorityQueue */ protected $scheduler = null; /** * All timer event listeners. * [[func, args, flag, timer_interval], ..] * * @var array */ protected $eventTimer = []; /** * Timer id. * * @var int */ protected $timerId = 1; /** * Select timeout. * * @var int */ protected $selectTimeout = 100000000; /** * Construct. */ public function __construct() { // Init SplPriorityQueue. $this->scheduler = new \SplPriorityQueue(); $this->scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH); } /** * {@inheritdoc} */ public function delay(float $delay, $func, $args) { $timerId = $this->timerId++; $runTime = \microtime(true) + $delay; $this->scheduler->insert($timerId, -$runTime); $this->eventTimer[$timerId] = [$func, (array)$args]; $selectTimeout = ($runTime - \microtime(true)) * 1000000; $selectTimeout = $selectTimeout <= 0 ? 1 : (int)$selectTimeout; if ($this->selectTimeout > $selectTimeout) { $this->selectTimeout = $selectTimeout; } return $timerId; } /** * {@inheritdoc} */ public function repeat(float $delay, $func, $args) { $timerId = $this->timerId++; $runTime = \microtime(true) + $delay; $this->scheduler->insert($timerId, -$runTime); $this->eventTimer[$timerId] = [$func, (array)$args, $delay]; $selectTimeout = ($runTime - \microtime(true)) * 1000000; $selectTimeout = $selectTimeout <= 0 ? 1 : (int)$selectTimeout; if ($this->selectTimeout > $selectTimeout) { $this->selectTimeout = $selectTimeout; } return $timerId; } /** * {@inheritdoc} */ public function offDelay($timerId) { if (isset($this->eventTimer[$timerId])) { unset($this->eventTimer[$timerId]); return true; } return false; } /** * {@inheritdoc} */ public function offRepeat($timerId) { return $this->offDelay($timerId); } /** * {@inheritdoc} */ public function onReadable($stream, $func) { $count = \count($this->readFds); if ($count >= 1024) { echo "Warning: system call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.\n"; } else if (\DIRECTORY_SEPARATOR !== '/' && $count >= 256) { echo "Warning: system call select exceeded the maximum number of connections 256.\n"; } $fdKey = (int)$stream; $this->readEvents[$fdKey] = $func; $this->readFds[$fdKey] = $stream; } /** * {@inheritdoc} */ public function offReadable($stream) { $fdKey = (int)$stream; unset($this->readEvents[$fdKey], $this->readFds[$fdKey]); } /** * {@inheritdoc} */ public function onWritable($stream, $func) { $count = \count($this->writeFds); if ($count >= 1024) { echo "Warning: system call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.\n"; } else if (\DIRECTORY_SEPARATOR !== '/' && $count >= 256) { echo "Warning: system call select exceeded the maximum number of connections 256.\n"; } $fdKey = (int)$stream; $this->writeEvents[$fdKey] = $func; $this->writeFds[$fdKey] = $stream; } /** * {@inheritdoc} */ public function offWritable($stream) { $fdKey = (int)$stream; unset($this->writeEvents[$fdKey], $this->writeFds[$fdKey]); } /** * {@inheritdoc} */ public function onExcept($stream, $func) { $fdKey = (int)$stream; $this->exceptEvents[$fdKey] = $func; $this->exceptFds[$fdKey] = $stream; } /** * {@inheritdoc} */ public function offExcept($stream) { $fdKey = (int)$stream; unset($this->exceptEvents[$fdKey], $this->exceptFds[$fdKey]); } /** * {@inheritdoc} */ public function onSignal($signal, $func) { if (\DIRECTORY_SEPARATOR !== '/') { return null; } $this->signalEvents[$signal] = $func; \pcntl_signal($signal, [$this, 'signalHandler']); } /** * {@inheritdoc} */ public function offsignal($signal) { unset($this->signalEvents[$signal]); \pcntl_signal($signal, SIG_IGN); } /** * Signal handler. * * @param int $signal */ public function signalHandler($signal) { $this->signalEvents[$signal]($signal); } /** * Tick for timer. * * @return void */ protected function tick() { $tasksToInsert = []; while (!$this->scheduler->isEmpty()) { $schedulerData = $this->scheduler->top(); $timerId = $schedulerData['data']; $nextRunTime = -$schedulerData['priority']; $timeNow = \microtime(true); $this->selectTimeout = (int)(($nextRunTime - $timeNow) * 1000000); if ($this->selectTimeout <= 0) { $this->scheduler->extract(); if (!isset($this->eventTimer[$timerId])) { continue; } // [func, args, timer_interval] $taskData = $this->eventTimer[$timerId]; if (isset($taskData[2])) { $nextRunTime = $timeNow + $taskData[2]; $tasksToInsert[] = [$timerId, -$nextRunTime]; } else { unset($this->eventTimer[$timerId]); } try { $taskData[0](...$taskData[1]); } catch (Throwable $e) { Worker::stopAll(250, $e); } } else { break; } } foreach ($tasksToInsert as $item) { $this->scheduler->insert($item[0], $item[1]); } if (!$this->scheduler->isEmpty()) { $schedulerData = $this->scheduler->top(); $nextRunTime = -$schedulerData['priority']; $timeNow = \microtime(true); $this->selectTimeout = \max((int)(($nextRunTime - $timeNow) * 1000000), 0); return; } $this->selectTimeout = 100000000; } /** * {@inheritdoc} */ public function deleteAllTimer() { $this->scheduler = new \SplPriorityQueue(); $this->scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH); $this->eventTimer = []; } /** * {@inheritdoc} */ public function run() { while (1) { if (\DIRECTORY_SEPARATOR === '/') { // Calls signal handlers for pending signals \pcntl_signal_dispatch(); } $read = $this->readFds; $write = $this->writeFds; $except = $this->exceptFds; if ($read || $write || $except) { // Waiting read/write/signal/timeout events. try { @stream_select($read, $write, $except, 0, $this->selectTimeout); } catch (Throwable $e) { } } else { $this->selectTimeout >= 1 && usleep($this->selectTimeout); } if (!$this->scheduler->isEmpty()) { $this->tick(); } foreach ($read as $fd) { $fdKey = (int)$fd; if (isset($this->readEvents[$fdKey])) { $this->readEvents[$fdKey]($fd); } } foreach ($write as $fd) { $fdKey = (int)$fd; if (isset($this->writeEvents[$fdKey])) { $this->writeEvents[$fdKey]($fd); } } foreach ($except as $fd) { $fdKey = (int)$fd; if (isset($this->exceptEvents[$fdKey])) { $this->exceptEvents[$fdKey]($fd); } } } } /** * {@inheritdoc} */ public function stop() { $this->deleteAllTimer(); foreach ($this->signalEvents as $signal => $item) { $this->offsignal($signal); } $this->readFds = $this->writeFds = $this->exceptFds = $this->readEvents = $this->writeEvents = $this->exceptEvents = $this->signalEvents = []; } /** * {@inheritdoc} */ public function getTimerCount() { return \count($this->eventTimer); } }