Select.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Events;
  16. use SplPriorityQueue;
  17. use Throwable;
  18. use function count;
  19. use function max;
  20. use function microtime;
  21. use function pcntl_signal;
  22. use function pcntl_signal_dispatch;
  23. use const DIRECTORY_SEPARATOR;
  24. /**
  25. * select eventloop
  26. */
  27. class Select implements EventInterface
  28. {
  29. /**
  30. * Running.
  31. * @var bool
  32. */
  33. protected bool $running = true;
  34. /**
  35. * All listeners for read/write event.
  36. *
  37. * @var array
  38. */
  39. protected array $readEvents = [];
  40. /**
  41. * All listeners for read/write event.
  42. *
  43. * @var array
  44. */
  45. protected array $writeEvents = [];
  46. /**
  47. * @var array
  48. */
  49. protected array $exceptEvents = [];
  50. /**
  51. * Event listeners of signal.
  52. *
  53. * @var array
  54. */
  55. protected array $signalEvents = [];
  56. /**
  57. * Fds waiting for read event.
  58. *
  59. * @var array
  60. */
  61. protected array $readFds = [];
  62. /**
  63. * Fds waiting for write event.
  64. *
  65. * @var array
  66. */
  67. protected array $writeFds = [];
  68. /**
  69. * Fds waiting for except event.
  70. *
  71. * @var array
  72. */
  73. protected array $exceptFds = [];
  74. /**
  75. * Timer scheduler.
  76. * {['data':timer_id, 'priority':run_timestamp], ..}
  77. *
  78. * @var SplPriorityQueue
  79. */
  80. protected SplPriorityQueue $scheduler;
  81. /**
  82. * All timer event listeners.
  83. * [[func, args, flag, timer_interval], ..]
  84. *
  85. * @var array
  86. */
  87. protected array $eventTimer = [];
  88. /**
  89. * Timer id.
  90. *
  91. * @var int
  92. */
  93. protected int $timerId = 1;
  94. /**
  95. * Select timeout.
  96. *
  97. * @var int
  98. */
  99. protected int $selectTimeout = 100000000;
  100. /**
  101. * @var ?callable
  102. */
  103. protected $errorHandler;
  104. /**
  105. * Construct.
  106. */
  107. public function __construct()
  108. {
  109. // Init SplPriorityQueue.
  110. $this->scheduler = new SplPriorityQueue();
  111. $this->scheduler->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
  112. }
  113. /**
  114. * {@inheritdoc}
  115. */
  116. public function delay(float $delay, callable $func, array $args = []): int
  117. {
  118. $timerId = $this->timerId++;
  119. $runTime = microtime(true) + $delay;
  120. $this->scheduler->insert($timerId, -$runTime);
  121. $this->eventTimer[$timerId] = [$func, $args];
  122. $selectTimeout = ($runTime - microtime(true)) * 1000000;
  123. $selectTimeout = $selectTimeout <= 0 ? 1 : (int)$selectTimeout;
  124. if ($this->selectTimeout > $selectTimeout) {
  125. $this->selectTimeout = $selectTimeout;
  126. }
  127. return $timerId;
  128. }
  129. /**
  130. * {@inheritdoc}
  131. */
  132. public function repeat(float $interval, callable $func, array $args = []): int
  133. {
  134. $timerId = $this->timerId++;
  135. $runTime = microtime(true) + $interval;
  136. $this->scheduler->insert($timerId, -$runTime);
  137. $this->eventTimer[$timerId] = [$func, $args, $interval];
  138. $selectTimeout = ($runTime - microtime(true)) * 1000000;
  139. $selectTimeout = $selectTimeout <= 0 ? 1 : (int)$selectTimeout;
  140. if ($this->selectTimeout > $selectTimeout) {
  141. $this->selectTimeout = $selectTimeout;
  142. }
  143. return $timerId;
  144. }
  145. /**
  146. * {@inheritdoc}
  147. */
  148. public function offDelay(int $timerId): bool
  149. {
  150. if (isset($this->eventTimer[$timerId])) {
  151. unset($this->eventTimer[$timerId]);
  152. return true;
  153. }
  154. return false;
  155. }
  156. /**
  157. * {@inheritdoc}
  158. */
  159. public function offRepeat(int $timerId): bool
  160. {
  161. return $this->offDelay($timerId);
  162. }
  163. /**
  164. * {@inheritdoc}
  165. */
  166. public function onReadable($stream, callable $func): void
  167. {
  168. $count = count($this->readFds);
  169. if ($count >= 1024) {
  170. echo "Warning: system call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.\n";
  171. } else if (DIRECTORY_SEPARATOR !== '/' && $count >= 256) {
  172. echo "Warning: system call select exceeded the maximum number of connections 256.\n";
  173. }
  174. $fdKey = (int)$stream;
  175. $this->readEvents[$fdKey] = $func;
  176. $this->readFds[$fdKey] = $stream;
  177. }
  178. /**
  179. * {@inheritdoc}
  180. */
  181. public function offReadable($stream): bool
  182. {
  183. $fdKey = (int)$stream;
  184. if (isset($this->readEvents[$fdKey])) {
  185. unset($this->readEvents[$fdKey], $this->readFds[$fdKey]);
  186. return true;
  187. }
  188. return false;
  189. }
  190. /**
  191. * {@inheritdoc}
  192. */
  193. public function onWritable($stream, callable $func): void
  194. {
  195. $count = count($this->writeFds);
  196. if ($count >= 1024) {
  197. echo "Warning: system call select exceeded the maximum number of connections 1024, please install event/libevent extension for more connections.\n";
  198. } else if (DIRECTORY_SEPARATOR !== '/' && $count >= 256) {
  199. echo "Warning: system call select exceeded the maximum number of connections 256.\n";
  200. }
  201. $fdKey = (int)$stream;
  202. $this->writeEvents[$fdKey] = $func;
  203. $this->writeFds[$fdKey] = $stream;
  204. }
  205. /**
  206. * {@inheritdoc}
  207. */
  208. public function offWritable($stream): bool
  209. {
  210. $fdKey = (int)$stream;
  211. if (isset($this->writeEvents[$fdKey])) {
  212. unset($this->writeEvents[$fdKey], $this->writeFds[$fdKey]);
  213. return true;
  214. }
  215. return false;
  216. }
  217. /**
  218. * On except.
  219. * @param resource $stream
  220. * @param $func
  221. */
  222. public function onExcept($stream, $func): void
  223. {
  224. $fdKey = (int)$stream;
  225. $this->exceptEvents[$fdKey] = $func;
  226. $this->exceptFds[$fdKey] = $stream;
  227. }
  228. /**
  229. * Off except.
  230. * @param resource $stream
  231. * @return bool
  232. */
  233. public function offExcept($stream): bool
  234. {
  235. $fdKey = (int)$stream;
  236. if (isset($this->exceptEvents[$fdKey])) {
  237. unset($this->exceptEvents[$fdKey], $this->exceptFds[$fdKey]);
  238. return true;
  239. }
  240. return false;
  241. }
  242. /**
  243. * {@inheritdoc}
  244. */
  245. public function onSignal(int $signal, callable $func): void
  246. {
  247. if (!function_exists('pcntl_signal')) {
  248. return;
  249. }
  250. $this->signalEvents[$signal] = $func;
  251. pcntl_signal($signal, $this->signalHandler(...));
  252. }
  253. /**
  254. * {@inheritdoc}
  255. */
  256. public function offSignal(int $signal): bool
  257. {
  258. if (!function_exists('pcntl_signal')) {
  259. return false;
  260. }
  261. pcntl_signal($signal, SIG_IGN);
  262. if (isset($this->signalEvents[$signal])) {
  263. unset($this->signalEvents[$signal]);
  264. return true;
  265. }
  266. return false;
  267. }
  268. /**
  269. * Signal handler.
  270. *
  271. * @param int $signal
  272. */
  273. public function signalHandler(int $signal): void
  274. {
  275. $this->signalEvents[$signal]($signal);
  276. }
  277. /**
  278. * Tick for timer.
  279. *
  280. * @return void
  281. * @throws Throwable
  282. */
  283. protected function tick(): void
  284. {
  285. $tasksToInsert = [];
  286. while (!$this->scheduler->isEmpty()) {
  287. $schedulerData = $this->scheduler->top();
  288. $timerId = $schedulerData['data'];
  289. $nextRunTime = -$schedulerData['priority'];
  290. $timeNow = microtime(true);
  291. $this->selectTimeout = (int)(($nextRunTime - $timeNow) * 1000000);
  292. if ($this->selectTimeout <= 0) {
  293. $this->scheduler->extract();
  294. if (!isset($this->eventTimer[$timerId])) {
  295. continue;
  296. }
  297. // [func, args, timer_interval]
  298. $taskData = $this->eventTimer[$timerId];
  299. if (isset($taskData[2])) {
  300. $nextRunTime = $timeNow + $taskData[2];
  301. $tasksToInsert[] = [$timerId, -$nextRunTime];
  302. } else {
  303. unset($this->eventTimer[$timerId]);
  304. }
  305. try {
  306. $taskData[0](...$taskData[1]);
  307. } catch (Throwable $e) {
  308. $this->error($e);
  309. continue;
  310. }
  311. } else {
  312. break;
  313. }
  314. }
  315. foreach ($tasksToInsert as $item) {
  316. $this->scheduler->insert($item[0], $item[1]);
  317. }
  318. if (!$this->scheduler->isEmpty()) {
  319. $schedulerData = $this->scheduler->top();
  320. $nextRunTime = -$schedulerData['priority'];
  321. $timeNow = microtime(true);
  322. $this->selectTimeout = max((int)(($nextRunTime - $timeNow) * 1000000), 0);
  323. return;
  324. }
  325. $this->selectTimeout = 100000000;
  326. }
  327. /**
  328. * {@inheritdoc}
  329. */
  330. public function deleteAllTimer(): void
  331. {
  332. $this->scheduler = new SplPriorityQueue();
  333. $this->scheduler->setExtractFlags(SplPriorityQueue::EXTR_BOTH);
  334. $this->eventTimer = [];
  335. }
  336. /**
  337. * {@inheritdoc}
  338. */
  339. public function run(): void
  340. {
  341. while ($this->running) {
  342. $read = $this->readFds;
  343. $write = $this->writeFds;
  344. $except = $this->exceptFds;
  345. if ($read || $write || $except) {
  346. // Waiting read/write/signal/timeout events.
  347. try {
  348. @stream_select($read, $write, $except, 0, $this->selectTimeout);
  349. } catch (Throwable) {
  350. }
  351. } else {
  352. $this->selectTimeout >= 1 && usleep($this->selectTimeout);
  353. }
  354. if (!$this->scheduler->isEmpty()) {
  355. $this->tick();
  356. }
  357. foreach ($read as $fd) {
  358. $fdKey = (int)$fd;
  359. if (isset($this->readEvents[$fdKey])) {
  360. $this->readEvents[$fdKey]($fd);
  361. }
  362. }
  363. foreach ($write as $fd) {
  364. $fdKey = (int)$fd;
  365. if (isset($this->writeEvents[$fdKey])) {
  366. $this->writeEvents[$fdKey]($fd);
  367. }
  368. }
  369. foreach ($except as $fd) {
  370. $fdKey = (int)$fd;
  371. if (isset($this->exceptEvents[$fdKey])) {
  372. $this->exceptEvents[$fdKey]($fd);
  373. }
  374. }
  375. if (!empty($this->signalEvents)) {
  376. // Calls signal handlers for pending signals
  377. pcntl_signal_dispatch();
  378. }
  379. }
  380. }
  381. /**
  382. * {@inheritdoc}
  383. */
  384. public function stop(): void
  385. {
  386. $this->running = false;
  387. $this->deleteAllTimer();
  388. foreach ($this->signalEvents as $signal => $item) {
  389. $this->offsignal($signal);
  390. }
  391. $this->readFds = $this->writeFds = $this->exceptFds = $this->readEvents
  392. = $this->writeEvents = $this->exceptEvents = $this->signalEvents = [];
  393. }
  394. /**
  395. * {@inheritdoc}
  396. */
  397. public function getTimerCount(): int
  398. {
  399. return count($this->eventTimer);
  400. }
  401. /**
  402. * {@inheritdoc}
  403. */
  404. public function setErrorHandler(callable $errorHandler): void
  405. {
  406. $this->errorHandler = $errorHandler;
  407. }
  408. /**
  409. * {@inheritdoc}
  410. */
  411. public function getErrorHandler(): ?callable
  412. {
  413. return $this->errorHandler;
  414. }
  415. /**
  416. * @param Throwable $e
  417. * @return void
  418. * @throws Throwable
  419. */
  420. public function error(Throwable $e): void
  421. {
  422. if (!$this->errorHandler) {
  423. throw new $e;
  424. }
  425. ($this->errorHandler)($e);
  426. }
  427. }