Select.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Events;
  16. use function count;
  17. use function max;
  18. use function microtime;
  19. use function pcntl_signal;
  20. use function pcntl_signal_dispatch;
  21. use const DIRECTORY_SEPARATOR;
  22. /**
  23. * select eventloop
  24. */
  25. final class Select implements EventInterface
  26. {
  27. /**
  28. * Running.
  29. *
  30. * @var bool
  31. */
  32. private bool $running = true;
  33. /**
  34. * All listeners for read/write event.
  35. *
  36. * @var array<int, callable>
  37. */
  38. private array $readEvents = [];
  39. /**
  40. * All listeners for read/write event.
  41. *
  42. * @var array<int, callable>
  43. */
  44. private array $writeEvents = [];
  45. /**
  46. * @var array<int, callable>
  47. */
  48. private array $exceptEvents = [];
  49. /**
  50. * Event listeners of signal.
  51. *
  52. * @var array<int, callable>
  53. */
  54. private array $signalEvents = [];
  55. /**
  56. * Fds waiting for read event.
  57. *
  58. * @var array<int, resource>
  59. */
  60. private array $readFds = [];
  61. /**
  62. * Fds waiting for write event.
  63. *
  64. * @var array<int, resource>
  65. */
  66. private array $writeFds = [];
  67. /**
  68. * Fds waiting for except event.
  69. *
  70. * @var array<int, resource>
  71. */
  72. private array $exceptFds = [];
  73. /**
  74. * Timer scheduler.
  75. * {['data':timer_id, 'priority':run_timestamp], ..}
  76. *
  77. * @var \SplPriorityQueue
  78. */
  79. private \SplPriorityQueue $scheduler;
  80. /**
  81. * All timer event listeners.
  82. * [[func, args, flag, timer_interval], ..]
  83. *
  84. * @var array
  85. */
  86. private array $eventTimer = [];
  87. /**
  88. * Timer id.
  89. *
  90. * @var int
  91. */
  92. private int $timerId = 1;
  93. /**
  94. * Select timeout.
  95. *
  96. * @var int
  97. */
  98. private int $selectTimeout = 100000000;
  99. /**
  100. * @var ?callable
  101. */
  102. private $errorHandler = null;
  103. /**
  104. * Construct.
  105. */
  106. public function __construct()
  107. {
  108. $this->scheduler = new \SplPriorityQueue();
  109. $this->scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  110. }
  111. /**
  112. * {@inheritdoc}
  113. */
  114. public function delay(float $delay, callable $func, array $args = []): int
  115. {
  116. $timerId = $this->timerId++;
  117. $runTime = microtime(true) + $delay;
  118. $this->scheduler->insert($timerId, -$runTime);
  119. $this->eventTimer[$timerId] = [$func, $args];
  120. $selectTimeout = ($runTime - microtime(true)) * 1000000;
  121. $selectTimeout = $selectTimeout <= 0 ? 1 : (int)$selectTimeout;
  122. if ($this->selectTimeout > $selectTimeout) {
  123. $this->selectTimeout = $selectTimeout;
  124. }
  125. return $timerId;
  126. }
  127. /**
  128. * {@inheritdoc}
  129. */
  130. public function repeat(float $interval, callable $func, array $args = []): int
  131. {
  132. $timerId = $this->timerId++;
  133. $runTime = microtime(true) + $interval;
  134. $this->scheduler->insert($timerId, -$runTime);
  135. $this->eventTimer[$timerId] = [$func, $args, $interval];
  136. $selectTimeout = ($runTime - microtime(true)) * 1000000;
  137. $selectTimeout = $selectTimeout <= 0 ? 1 : (int)$selectTimeout;
  138. if ($this->selectTimeout > $selectTimeout) {
  139. $this->selectTimeout = $selectTimeout;
  140. }
  141. return $timerId;
  142. }
  143. /**
  144. * {@inheritdoc}
  145. */
  146. public function offDelay(int $timerId): bool
  147. {
  148. if (isset($this->eventTimer[$timerId])) {
  149. unset($this->eventTimer[$timerId]);
  150. return true;
  151. }
  152. return false;
  153. }
  154. /**
  155. * {@inheritdoc}
  156. */
  157. public function offRepeat(int $timerId): bool
  158. {
  159. return $this->offDelay($timerId);
  160. }
  161. /**
  162. * {@inheritdoc}
  163. */
  164. public function onReadable($stream, callable $func): void
  165. {
  166. $count = count($this->readFds);
  167. if ($count >= 1024) {
  168. trigger_error("System call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.", E_USER_WARNING);
  169. } else if (DIRECTORY_SEPARATOR !== '/' && $count >= 256) {
  170. trigger_error("System call select exceeded the maximum number of connections 256.", E_USER_WARNING);
  171. }
  172. $fdKey = (int)$stream;
  173. $this->readEvents[$fdKey] = $func;
  174. $this->readFds[$fdKey] = $stream;
  175. }
  176. /**
  177. * {@inheritdoc}
  178. */
  179. public function offReadable($stream): bool
  180. {
  181. $fdKey = (int)$stream;
  182. if (isset($this->readEvents[$fdKey])) {
  183. unset($this->readEvents[$fdKey], $this->readFds[$fdKey]);
  184. return true;
  185. }
  186. return false;
  187. }
  188. /**
  189. * {@inheritdoc}
  190. */
  191. public function onWritable($stream, callable $func): void
  192. {
  193. $count = count($this->writeFds);
  194. if ($count >= 1024) {
  195. trigger_error("System call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.", E_USER_WARNING);
  196. } else if (DIRECTORY_SEPARATOR !== '/' && $count >= 256) {
  197. trigger_error("System call select exceeded the maximum number of connections 256.", E_USER_WARNING);
  198. }
  199. $fdKey = (int)$stream;
  200. $this->writeEvents[$fdKey] = $func;
  201. $this->writeFds[$fdKey] = $stream;
  202. }
  203. /**
  204. * {@inheritdoc}
  205. */
  206. public function offWritable($stream): bool
  207. {
  208. $fdKey = (int)$stream;
  209. if (isset($this->writeEvents[$fdKey])) {
  210. unset($this->writeEvents[$fdKey], $this->writeFds[$fdKey]);
  211. return true;
  212. }
  213. return false;
  214. }
  215. /**
  216. * On except.
  217. *
  218. * @param resource $stream
  219. * @param callable $func
  220. */
  221. public function onExcept($stream, callable $func): void
  222. {
  223. $fdKey = (int)$stream;
  224. $this->exceptEvents[$fdKey] = $func;
  225. $this->exceptFds[$fdKey] = $stream;
  226. }
  227. /**
  228. * Off except.
  229. *
  230. * @param resource $stream
  231. * @return bool
  232. */
  233. public function offExcept($stream): bool
  234. {
  235. $fdKey = (int)$stream;
  236. if (isset($this->exceptEvents[$fdKey])) {
  237. unset($this->exceptEvents[$fdKey], $this->exceptFds[$fdKey]);
  238. return true;
  239. }
  240. return false;
  241. }
  242. /**
  243. * {@inheritdoc}
  244. */
  245. public function onSignal(int $signal, callable $func): void
  246. {
  247. if (!function_exists('pcntl_signal')) {
  248. return;
  249. }
  250. $this->signalEvents[$signal] = $func;
  251. pcntl_signal($signal, fn () => $this->safeCall($this->signalEvents[$signal], [$signal]));
  252. }
  253. /**
  254. * {@inheritdoc}
  255. */
  256. public function offSignal(int $signal): bool
  257. {
  258. if (!function_exists('pcntl_signal')) {
  259. return false;
  260. }
  261. pcntl_signal($signal, SIG_IGN);
  262. if (isset($this->signalEvents[$signal])) {
  263. unset($this->signalEvents[$signal]);
  264. return true;
  265. }
  266. return false;
  267. }
  268. /**
  269. * Tick for timer.
  270. *
  271. * @return void
  272. * @throws \Throwable
  273. */
  274. protected function tick(): void
  275. {
  276. $tasksToInsert = [];
  277. while (!$this->scheduler->isEmpty()) {
  278. $schedulerData = $this->scheduler->top();
  279. $timerId = $schedulerData['data'];
  280. $nextRunTime = -$schedulerData['priority'];
  281. $timeNow = microtime(true);
  282. $this->selectTimeout = (int)(($nextRunTime - $timeNow) * 1000000);
  283. if ($this->selectTimeout <= 0) {
  284. $this->scheduler->extract();
  285. if (!isset($this->eventTimer[$timerId])) {
  286. continue;
  287. }
  288. // [func, args, timer_interval]
  289. $taskData = $this->eventTimer[$timerId];
  290. if (isset($taskData[2])) {
  291. $nextRunTime = $timeNow + $taskData[2];
  292. $tasksToInsert[] = [$timerId, -$nextRunTime];
  293. } else {
  294. unset($this->eventTimer[$timerId]);
  295. }
  296. $this->safeCall($taskData[0], $taskData[1]);
  297. } else {
  298. break;
  299. }
  300. }
  301. foreach ($tasksToInsert as $item) {
  302. $this->scheduler->insert($item[0], $item[1]);
  303. }
  304. if (!$this->scheduler->isEmpty()) {
  305. $schedulerData = $this->scheduler->top();
  306. $nextRunTime = -$schedulerData['priority'];
  307. $timeNow = microtime(true);
  308. $this->selectTimeout = max((int)(($nextRunTime - $timeNow) * 1000000), 0);
  309. return;
  310. }
  311. $this->selectTimeout = 100000000;
  312. }
  313. /**
  314. * {@inheritdoc}
  315. */
  316. public function deleteAllTimer(): void
  317. {
  318. $this->scheduler = new \SplPriorityQueue();
  319. $this->scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  320. $this->eventTimer = [];
  321. }
  322. /**
  323. * {@inheritdoc}
  324. */
  325. public function run(): void
  326. {
  327. while ($this->running) {
  328. $read = $this->readFds;
  329. $write = $this->writeFds;
  330. $except = $this->exceptFds;
  331. if (!empty($read) || !empty($write) || !empty($except)) {
  332. // Waiting read/write/signal/timeout events.
  333. try {
  334. @stream_select($read, $write, $except, 0, $this->selectTimeout);
  335. } catch (\Throwable) {
  336. // do nothing
  337. }
  338. } else {
  339. $this->selectTimeout >= 1 && usleep($this->selectTimeout);
  340. }
  341. if (!$this->scheduler->isEmpty()) {
  342. $this->tick();
  343. }
  344. foreach ($read as $fd) {
  345. $fdKey = (int)$fd;
  346. if (isset($this->readEvents[$fdKey])) {
  347. $this->readEvents[$fdKey]($fd);
  348. }
  349. }
  350. foreach ($write as $fd) {
  351. $fdKey = (int)$fd;
  352. if (isset($this->writeEvents[$fdKey])) {
  353. $this->writeEvents[$fdKey]($fd);
  354. }
  355. }
  356. foreach ($except as $fd) {
  357. $fdKey = (int)$fd;
  358. if (isset($this->exceptEvents[$fdKey])) {
  359. $this->exceptEvents[$fdKey]($fd);
  360. }
  361. }
  362. if (!empty($this->signalEvents)) {
  363. // Calls signal handlers for pending signals
  364. pcntl_signal_dispatch();
  365. }
  366. }
  367. }
  368. /**
  369. * {@inheritdoc}
  370. */
  371. public function stop(): void
  372. {
  373. $this->running = false;
  374. $this->deleteAllTimer();
  375. foreach ($this->signalEvents as $signal => $item) {
  376. $this->offsignal($signal);
  377. }
  378. $this->readFds = [];
  379. $this->writeFds = [];
  380. $this->exceptFds = [];
  381. $this->readEvents = [];
  382. $this->writeEvents = [];
  383. $this->exceptEvents = [];
  384. $this->signalEvents = [];
  385. }
  386. /**
  387. * {@inheritdoc}
  388. */
  389. public function getTimerCount(): int
  390. {
  391. return count($this->eventTimer);
  392. }
  393. /**
  394. * {@inheritdoc}
  395. */
  396. public function setErrorHandler(callable $errorHandler): void
  397. {
  398. $this->errorHandler = $errorHandler;
  399. }
  400. /**
  401. * @param callable $func
  402. * @param array $args
  403. * @return void
  404. */
  405. private function safeCall(callable $func, array $args = []): void
  406. {
  407. try {
  408. $func(...$args);
  409. } catch (\Throwable $e) {
  410. if ($this->errorHandler === null) {
  411. echo $e;
  412. } else {
  413. ($this->errorHandler)($e);
  414. }
  415. }
  416. }
  417. }