Select.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. <?php
  2. namespace Workerman\Events;
  3. class Select implements EventInterface
  4. {
  5. /**
  6. * 所有的事件
  7. * @var array
  8. */
  9. public $_allEvents = array();
  10. /**
  11. * 所有信号事件
  12. * @var array
  13. */
  14. public $_signalEvents = array();
  15. /**
  16. * 监听这些描述符的读事件
  17. * @var array
  18. */
  19. protected $_readFds = array();
  20. /**
  21. * 监听这些描述符的写事件
  22. * @var array
  23. */
  24. protected $_writeFds = array();
  25. /**
  26. * 任务调度器,最大堆
  27. * {['data':timer_id, 'priority':run_timestamp], ..}
  28. * @var SplPriorityQueue
  29. */
  30. protected $_scheduler = null;
  31. /**
  32. * 定时任务
  33. * [[func, args, flag, timer_interval], ..]
  34. * @var array
  35. */
  36. protected $_task = array();
  37. /**
  38. * 定时器id
  39. * @var int
  40. */
  41. protected $_timerId = 1;
  42. /**
  43. * select超时时间,单位:微妙
  44. * @var int
  45. */
  46. protected $_selectTimeout = 100000000;
  47. /**
  48. * 构造函数
  49. * @return void
  50. */
  51. public function __construct()
  52. {
  53. // 创建一个管道,放入监听读的描述符集合中,避免空轮询
  54. $this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
  55. if($this->channel)
  56. {
  57. stream_set_blocking($this->channel[0], 0);
  58. $this->_readFds[0] = $this->channel[0];
  59. }
  60. // 初始化优先队列(最大堆)
  61. $this->_scheduler = new \SplPriorityQueue();
  62. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  63. }
  64. /**
  65. * 添加事件及处理函数
  66. * @see Events\EventInterface::add()
  67. */
  68. public function add($fd, $flag, $func, $args = null)
  69. {
  70. switch ($flag)
  71. {
  72. case self::EV_READ:
  73. $fd_key = (int)$fd;
  74. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  75. $this->_readFds[$fd_key] = $fd;
  76. break;
  77. case self::EV_WRITE:
  78. $fd_key = (int)$fd;
  79. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  80. $this->_writeFds[$fd_key] = $fd;
  81. break;
  82. case self::EV_SIGNAL:
  83. $fd_key = (int)$fd;
  84. $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
  85. pcntl_signal($fd, array($this, 'signalHandler'));
  86. break;
  87. case self::EV_TIMER:
  88. case self::EV_TIMER_ONCE:
  89. // $fd 为 定时的时间间隔,单位为秒,支持小数,能精确到0.001秒
  90. $run_time = microtime(true)+$fd;
  91. $this->_scheduler->insert($this->_timerId, -$run_time);
  92. $this->_task[$this->_timerId] = array($func, $args, $flag, $fd);
  93. $this->tick();
  94. return $this->_timerId++;
  95. }
  96. return true;
  97. }
  98. /**
  99. * 信号处理函数
  100. * @param int $signal
  101. */
  102. public function signalHandler($signal)
  103. {
  104. call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal));
  105. }
  106. /**
  107. * 删除某个描述符的某类事件的监听
  108. * @see Events\EventInterface::del()
  109. */
  110. public function del($fd ,$flag)
  111. {
  112. $fd_key = (int)$fd;
  113. switch ($flag)
  114. {
  115. case self::EV_READ:
  116. unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]);
  117. if(empty($this->_allEvents[$fd_key]))
  118. {
  119. unset($this->_allEvents[$fd_key]);
  120. }
  121. return true;
  122. case self::EV_WRITE:
  123. unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
  124. if(empty($this->_allEvents[$fd_key]))
  125. {
  126. unset($this->_allEvents[$fd_key]);
  127. }
  128. return true;
  129. case self::EV_SIGNAL:
  130. unset($this->_signalEvents[$fd_key]);
  131. pcntl_signal($fd, SIG_IGN);
  132. break;
  133. case self::EV_TIMER:
  134. case self::EV_TIMER_ONCE;
  135. // $fd_key为要删除的定时器id,即timerId
  136. unset($this->_task[$fd_key]);
  137. return true;
  138. }
  139. return false;;
  140. }
  141. /**
  142. * 检查是否有可执行的定时任务,有的话执行
  143. * @return void
  144. */
  145. protected function tick()
  146. {
  147. while(!$this->_scheduler->isEmpty())
  148. {
  149. $scheduler_data = $this->_scheduler->top();
  150. $timer_id = $scheduler_data['data'];
  151. $next_run_time = -$scheduler_data['priority'];
  152. $time_now = microtime(true);
  153. if($time_now >= $next_run_time)
  154. {
  155. $this->_scheduler->extract();
  156. // 如果任务不存在,则是对应的定时器已经删除
  157. if(!isset($this->_task[$timer_id]))
  158. {
  159. continue;
  160. }
  161. // 任务数据[func, args, flag, timer_interval]
  162. $task_data = $this->_task[$timer_id];
  163. // 如果是持续的定时任务,再把任务加到定时队列
  164. if($task_data[2] == self::EV_TIMER)
  165. {
  166. $next_run_time = $time_now+$task_data[3];
  167. $this->_scheduler->insert($timer_id, -$next_run_time);
  168. }
  169. // 尝试执行任务
  170. try
  171. {
  172. call_user_func_array($task_data[0], $task_data[1]);
  173. }
  174. catch(\Exception $e)
  175. {
  176. echo $e;
  177. }
  178. continue;
  179. }
  180. else
  181. {
  182. // 设定超时时间
  183. $this->_selectTimeout = ($next_run_time - $time_now)*1000000;
  184. return;
  185. }
  186. }
  187. $this->_selectTimeout = 100000000;
  188. }
  189. /**
  190. * 删除所有定时器
  191. * @return void
  192. */
  193. public function clearAllTimer()
  194. {
  195. $this->_scheduler = new \SplPriorityQueue();
  196. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  197. $this->_task = array();
  198. }
  199. /**
  200. * 主循环
  201. * @see Events\EventInterface::loop()
  202. */
  203. public function loop()
  204. {
  205. $e = null;
  206. while (1)
  207. {
  208. // 如果有信号,尝试执行信号处理函数
  209. pcntl_signal_dispatch();
  210. $read = $this->_readFds;
  211. $write = $this->_writeFds;
  212. // 等待可读或者可写事件
  213. @stream_select($read, $write, $e, 0, $this->_selectTimeout);
  214. // 这些描述符可读,执行对应描述符的读回调函数
  215. if($read)
  216. {
  217. foreach($read as $fd)
  218. {
  219. $fd_key = (int) $fd;
  220. if(isset($this->_allEvents[$fd_key][self::EV_READ]))
  221. {
  222. call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1]));
  223. }
  224. }
  225. }
  226. // 这些描述符可写,执行对应描述符的写回调函数
  227. if($write)
  228. {
  229. foreach($write as $fd)
  230. {
  231. $fd_key = (int) $fd;
  232. if(isset($this->_allEvents[$fd_key][self::EV_WRITE]))
  233. {
  234. call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
  235. }
  236. }
  237. }
  238. // 尝试执行定时任务
  239. if(!$this->_scheduler->isEmpty())
  240. {
  241. $this->tick();
  242. }
  243. }
  244. }
  245. }