StreamSelectLoop.php 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Events\React;
  15. use Workerman\Events\EventInterface;
  16. /**
  17. * Class StreamSelectLoop
  18. * @package Workerman\Events\React
  19. */
  20. class StreamSelectLoop extends \React\EventLoop\StreamSelectLoop
  21. {
  22. /**
  23. * @var array
  24. */
  25. protected $_timerIdMap = array();
  26. /**
  27. * @var int
  28. */
  29. protected $_timerIdIndex = 0;
  30. /**
  31. * Add event listener to event loop.
  32. *
  33. * @param $fd
  34. * @param $flag
  35. * @param $func
  36. * @param array $args
  37. * @return bool
  38. */
  39. public function add($fd, $flag, $func, $args = array())
  40. {
  41. $args = (array)$args;
  42. switch ($flag) {
  43. case EventInterface::EV_READ:
  44. return $this->addReadStream($fd, $func);
  45. case EventInterface::EV_WRITE:
  46. return $this->addWriteStream($fd, $func);
  47. case EventInterface::EV_SIGNAL:
  48. return $this->addSignal($fd, $func);
  49. case EventInterface::EV_TIMER:
  50. $timer_obj = $this->addPeriodicTimer($fd, function() use ($func, $args) {
  51. call_user_func_array($func, $args);
  52. });
  53. $this->_timerIdMap[++$this->_timerIdIndex] = $timer_obj;
  54. return $this->_timerIdIndex;
  55. case EventInterface::EV_TIMER_ONCE:
  56. $timer_obj = $this->addTimer($fd, function() use ($func, $args) {
  57. call_user_func_array($func, $args);
  58. });
  59. $this->_timerIdMap[++$this->_timerIdIndex] = $timer_obj;
  60. return $this->_timerIdIndex;
  61. }
  62. return false;
  63. }
  64. /**
  65. * Remove event listener from event loop.
  66. *
  67. * @param mixed $fd
  68. * @param int $flag
  69. * @return bool
  70. */
  71. public function del($fd, $flag)
  72. {
  73. switch ($flag) {
  74. case EventInterface::EV_READ:
  75. return $this->removeReadStream($fd);
  76. case EventInterface::EV_WRITE:
  77. return $this->removeWriteStream($fd);
  78. case EventInterface::EV_SIGNAL:
  79. return $this->removeSignal($fd);
  80. case EventInterface::EV_TIMER:
  81. case EventInterface::EV_TIMER_ONCE;
  82. if (isset($this->_timerIdMap[$fd])){
  83. $timer_obj = $this->_timerIdMap[$fd];
  84. unset($this->_timerIdMap[$fd]);
  85. $this->cancelTimer($timer_obj);
  86. return true;
  87. }
  88. }
  89. return false;
  90. }
  91. /**
  92. * Main loop.
  93. *
  94. * @return void
  95. */
  96. public function loop()
  97. {
  98. $this->run();
  99. }
  100. /**
  101. * Add signal handler.
  102. *
  103. * @param $signal
  104. * @param $callback
  105. * @return bool
  106. */
  107. public function addSignal($signal, $callback)
  108. {
  109. if(PHP_EOL !== "\r\n") {
  110. pcntl_signal($signal, $callback);
  111. }
  112. }
  113. /**
  114. * Remove signal handler.
  115. *
  116. * @param $signal
  117. */
  118. public function removeSignal($signal)
  119. {
  120. if(PHP_EOL !== "\r\n") {
  121. pcntl_signal($signal, SIG_IGN);
  122. }
  123. }
  124. /**
  125. * Emulate a stream_select() implementation that does not break when passed
  126. * empty stream arrays.
  127. *
  128. * @param array &$read An array of read streams to select upon.
  129. * @param array &$write An array of write streams to select upon.
  130. * @param integer|null $timeout Activity timeout in microseconds, or null to wait forever.
  131. *
  132. * @return integer|false The total number of streams that are ready for read/write.
  133. * Can return false if stream_select() is interrupted by a signal.
  134. */
  135. protected function streamSelect(array &$read, array &$write, $timeout)
  136. {
  137. if ($read || $write) {
  138. $except = null;
  139. // Calls signal handlers for pending signals
  140. pcntl_signal_dispatch();
  141. // suppress warnings that occur, when stream_select is interrupted by a signal
  142. return @stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);
  143. }
  144. // Calls signal handlers for pending signals
  145. if(PHP_EOL !== "\r\n") {
  146. pcntl_signal_dispatch();
  147. }
  148. $timeout && usleep($timeout);
  149. return 0;
  150. }
  151. /**
  152. * Destroy loop.
  153. *
  154. * @return void
  155. */
  156. public function destroy()
  157. {
  158. }
  159. }