Select.php 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Events;
  15. /**
  16. * select eventloop
  17. */
  18. class Select implements EventInterface
  19. {
  20. /**
  21. * All listeners for read/write event.
  22. * @var array
  23. */
  24. public $_allEvents = array();
  25. /**
  26. * Event listeners of signal.
  27. * @var array
  28. */
  29. public $_signalEvents = array();
  30. /**
  31. * Fds waiting for read event.
  32. * @var array
  33. */
  34. protected $_readFds = array();
  35. /**
  36. * Fds waiting for write event.
  37. * @var array
  38. */
  39. protected $_writeFds = array();
  40. /**
  41. * Timer scheduler.
  42. * {['data':timer_id, 'priority':run_timestamp], ..}
  43. * @var SplPriorityQueue
  44. */
  45. protected $_scheduler = null;
  46. /**
  47. * All timer event listeners.
  48. * [[func, args, flag, timer_interval], ..]
  49. * @var array
  50. */
  51. protected $_task = array();
  52. /**
  53. * Timer id.
  54. * @var int
  55. */
  56. protected $_timerId = 1;
  57. /**
  58. * Select timeout.
  59. * @var int
  60. */
  61. protected $_selectTimeout = 100000000;
  62. /**
  63. * Construct.
  64. * @return void
  65. */
  66. public function __construct()
  67. {
  68. // Create a pipeline and put into the collection of the read to read the descriptor to avoid empty polling.
  69. $this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
  70. if($this->channel)
  71. {
  72. stream_set_blocking($this->channel[0], 0);
  73. $this->_readFds[0] = $this->channel[0];
  74. }
  75. // Init SplPriorityQueue.
  76. $this->_scheduler = new \SplPriorityQueue();
  77. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  78. }
  79. /**
  80. * @see Events\EventInterface::add()
  81. */
  82. public function add($fd, $flag, $func, $args = array())
  83. {
  84. switch ($flag)
  85. {
  86. case self::EV_READ:
  87. $fd_key = (int)$fd;
  88. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  89. $this->_readFds[$fd_key] = $fd;
  90. break;
  91. case self::EV_WRITE:
  92. $fd_key = (int)$fd;
  93. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  94. $this->_writeFds[$fd_key] = $fd;
  95. break;
  96. case self::EV_SIGNAL:
  97. $fd_key = (int)$fd;
  98. $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
  99. pcntl_signal($fd, array($this, 'signalHandler'));
  100. break;
  101. case self::EV_TIMER:
  102. case self::EV_TIMER_ONCE:
  103. $run_time = microtime(true)+$fd;
  104. $this->_scheduler->insert($this->_timerId, -$run_time);
  105. $this->_task[$this->_timerId] = array($func, (array)$args, $flag, $fd);
  106. $this->tick();
  107. return $this->_timerId++;
  108. }
  109. return true;
  110. }
  111. /**
  112. * Signal handler.
  113. * @param int $signal
  114. */
  115. public function signalHandler($signal)
  116. {
  117. call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal));
  118. }
  119. /**
  120. * @see Events\EventInterface::del()
  121. */
  122. public function del($fd ,$flag)
  123. {
  124. $fd_key = (int)$fd;
  125. switch ($flag)
  126. {
  127. case self::EV_READ:
  128. unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]);
  129. if(empty($this->_allEvents[$fd_key]))
  130. {
  131. unset($this->_allEvents[$fd_key]);
  132. }
  133. return true;
  134. case self::EV_WRITE:
  135. unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
  136. if(empty($this->_allEvents[$fd_key]))
  137. {
  138. unset($this->_allEvents[$fd_key]);
  139. }
  140. return true;
  141. case self::EV_SIGNAL:
  142. unset($this->_signalEvents[$fd_key]);
  143. pcntl_signal($fd, SIG_IGN);
  144. break;
  145. case self::EV_TIMER:
  146. case self::EV_TIMER_ONCE;
  147. unset($this->_task[$fd_key]);
  148. return true;
  149. }
  150. return false;;
  151. }
  152. /**
  153. * Tick for timer.
  154. * @return void
  155. */
  156. protected function tick()
  157. {
  158. while(!$this->_scheduler->isEmpty())
  159. {
  160. $scheduler_data = $this->_scheduler->top();
  161. $timer_id = $scheduler_data['data'];
  162. $next_run_time = -$scheduler_data['priority'];
  163. $time_now = microtime(true);
  164. if($time_now >= $next_run_time)
  165. {
  166. $this->_scheduler->extract();
  167. if(!isset($this->_task[$timer_id]))
  168. {
  169. continue;
  170. }
  171. // [func, args, flag, timer_interval]
  172. $task_data = $this->_task[$timer_id];
  173. if($task_data[2] === self::EV_TIMER)
  174. {
  175. $next_run_time = $time_now+$task_data[3];
  176. $this->_scheduler->insert($timer_id, -$next_run_time);
  177. }
  178. call_user_func_array($task_data[0], $task_data[1]);
  179. if(isset($this->_task[$timer_id]) && $task_data[2] === self::EV_TIMER_ONCE)
  180. {
  181. $this->del($timer_id, self::EV_TIMER_ONCE);
  182. }
  183. continue;
  184. }
  185. else
  186. {
  187. $this->_selectTimeout = ($next_run_time - $time_now)*1000000;
  188. return;
  189. }
  190. }
  191. $this->_selectTimeout = 100000000;
  192. }
  193. /**
  194. * @see Events\EventInterface::clearAllTimer()
  195. */
  196. public function clearAllTimer()
  197. {
  198. $this->_scheduler = new \SplPriorityQueue();
  199. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  200. $this->_task = array();
  201. }
  202. /**
  203. * @see Events\EventInterface::loop()
  204. */
  205. public function loop()
  206. {
  207. $e = null;
  208. while (1)
  209. {
  210. // Calls signal handlers for pending signals
  211. pcntl_signal_dispatch();
  212. $read = $this->_readFds;
  213. $write = $this->_writeFds;
  214. // Waiting read/write/signal/timeout events.
  215. @stream_select($read, $write, $e, 0, $this->_selectTimeout);
  216. if(!$this->_scheduler->isEmpty())
  217. {
  218. $this->tick();
  219. }
  220. if($read)
  221. {
  222. foreach($read as $fd)
  223. {
  224. $fd_key = (int) $fd;
  225. if(isset($this->_allEvents[$fd_key][self::EV_READ]))
  226. {
  227. call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1]));
  228. }
  229. }
  230. }
  231. if($write)
  232. {
  233. foreach($write as $fd)
  234. {
  235. $fd_key = (int) $fd;
  236. if(isset($this->_allEvents[$fd_key][self::EV_WRITE]))
  237. {
  238. call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
  239. }
  240. }
  241. }
  242. }
  243. }
  244. }