Select.php 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Events;
  15. /**
  16. * select eventloop
  17. */
  18. class Select implements EventInterface
  19. {
  20. /**
  21. * All listeners for read/write event.
  22. * @var array
  23. */
  24. public $_allEvents = array();
  25. /**
  26. * Event listeners of signal.
  27. * @var array
  28. */
  29. public $_signalEvents = array();
  30. /**
  31. * Fds waiting for read event.
  32. * @var array
  33. */
  34. protected $_readFds = array();
  35. /**
  36. * Fds waiting for write event.
  37. * @var array
  38. */
  39. protected $_writeFds = array();
  40. /**
  41. * Timer scheduler.
  42. * {['data':timer_id, 'priority':run_timestamp], ..}
  43. * @var \SplPriorityQueue
  44. */
  45. protected $_scheduler = null;
  46. /**
  47. * All timer event listeners.
  48. * [[func, args, flag, timer_interval], ..]
  49. * @var array
  50. */
  51. protected $_task = array();
  52. /**
  53. * Timer id.
  54. * @var int
  55. */
  56. protected $_timerId = 1;
  57. /**
  58. * Select timeout.
  59. * @var int
  60. */
  61. protected $_selectTimeout = 100000000;
  62. /**
  63. * Paired socket channels
  64. * @var array
  65. */
  66. protected $channel = array();
  67. /**
  68. * Construct.
  69. */
  70. public function __construct()
  71. {
  72. // Create a pipeline and put into the collection of the read to read the descriptor to avoid empty polling.
  73. $this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
  74. if($this->channel)
  75. {
  76. stream_set_blocking($this->channel[0], 0);
  77. $this->_readFds[0] = $this->channel[0];
  78. }
  79. // Init SplPriorityQueue.
  80. $this->_scheduler = new \SplPriorityQueue();
  81. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  82. }
  83. /**
  84. * {@inheritdoc}
  85. */
  86. public function add($fd, $flag, $func, $args = array())
  87. {
  88. switch ($flag)
  89. {
  90. case self::EV_READ:
  91. $fd_key = (int)$fd;
  92. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  93. $this->_readFds[$fd_key] = $fd;
  94. break;
  95. case self::EV_WRITE:
  96. $fd_key = (int)$fd;
  97. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  98. $this->_writeFds[$fd_key] = $fd;
  99. break;
  100. case self::EV_SIGNAL:
  101. $fd_key = (int)$fd;
  102. $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
  103. pcntl_signal($fd, array($this, 'signalHandler'));
  104. break;
  105. case self::EV_TIMER:
  106. case self::EV_TIMER_ONCE:
  107. $run_time = microtime(true)+$fd;
  108. $this->_scheduler->insert($this->_timerId, -$run_time);
  109. $this->_task[$this->_timerId] = array($func, (array)$args, $flag, $fd);
  110. $this->tick();
  111. return $this->_timerId++;
  112. }
  113. return true;
  114. }
  115. /**
  116. * Signal handler.
  117. * @param int $signal
  118. */
  119. public function signalHandler($signal)
  120. {
  121. call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal));
  122. }
  123. /**
  124. * {@inheritdoc}
  125. */
  126. public function del($fd ,$flag)
  127. {
  128. $fd_key = (int)$fd;
  129. switch ($flag)
  130. {
  131. case self::EV_READ:
  132. unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]);
  133. if(empty($this->_allEvents[$fd_key]))
  134. {
  135. unset($this->_allEvents[$fd_key]);
  136. }
  137. return true;
  138. case self::EV_WRITE:
  139. unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
  140. if(empty($this->_allEvents[$fd_key]))
  141. {
  142. unset($this->_allEvents[$fd_key]);
  143. }
  144. return true;
  145. case self::EV_SIGNAL:
  146. unset($this->_signalEvents[$fd_key]);
  147. pcntl_signal($fd, SIG_IGN);
  148. break;
  149. case self::EV_TIMER:
  150. case self::EV_TIMER_ONCE;
  151. unset($this->_task[$fd_key]);
  152. return true;
  153. }
  154. return false;
  155. }
  156. /**
  157. * Tick for timer.
  158. * @return void
  159. */
  160. protected function tick()
  161. {
  162. while(!$this->_scheduler->isEmpty())
  163. {
  164. $scheduler_data = $this->_scheduler->top();
  165. $timer_id = $scheduler_data['data'];
  166. $next_run_time = -$scheduler_data['priority'];
  167. $time_now = microtime(true);
  168. if($time_now >= $next_run_time)
  169. {
  170. $this->_scheduler->extract();
  171. if(!isset($this->_task[$timer_id]))
  172. {
  173. continue;
  174. }
  175. // [func, args, flag, timer_interval]
  176. $task_data = $this->_task[$timer_id];
  177. if($task_data[2] === self::EV_TIMER)
  178. {
  179. $next_run_time = $time_now+$task_data[3];
  180. $this->_scheduler->insert($timer_id, -$next_run_time);
  181. }
  182. call_user_func_array($task_data[0], $task_data[1]);
  183. if(isset($this->_task[$timer_id]) && $task_data[2] === self::EV_TIMER_ONCE)
  184. {
  185. $this->del($timer_id, self::EV_TIMER_ONCE);
  186. }
  187. continue;
  188. }
  189. else
  190. {
  191. $this->_selectTimeout = ($next_run_time - $time_now)*1000000;
  192. return;
  193. }
  194. }
  195. $this->_selectTimeout = 100000000;
  196. }
  197. /**
  198. * {@inheritdoc}
  199. */
  200. public function clearAllTimer()
  201. {
  202. $this->_scheduler = new \SplPriorityQueue();
  203. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  204. $this->_task = array();
  205. }
  206. /**
  207. * {@inheritdoc}
  208. */
  209. public function loop()
  210. {
  211. $e = null;
  212. while (1)
  213. {
  214. // Calls signal handlers for pending signals
  215. pcntl_signal_dispatch();
  216. $read = $this->_readFds;
  217. $write = $this->_writeFds;
  218. // Waiting read/write/signal/timeout events.
  219. $ret = @stream_select($read, $write, $e, 0, $this->_selectTimeout);
  220. if(!$this->_scheduler->isEmpty())
  221. {
  222. $this->tick();
  223. }
  224. if(!$ret)
  225. {
  226. continue;
  227. }
  228. foreach($read as $fd)
  229. {
  230. $fd_key = (int) $fd;
  231. if(isset($this->_allEvents[$fd_key][self::EV_READ]))
  232. {
  233. call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1]));
  234. }
  235. }
  236. foreach($write as $fd)
  237. {
  238. $fd_key = (int) $fd;
  239. if(isset($this->_allEvents[$fd_key][self::EV_WRITE]))
  240. {
  241. call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
  242. }
  243. }
  244. }
  245. }
  246. }