Select.php 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. <?php
  2. namespace Workerman\Events;
  3. class Select implements EventInterface
  4. {
  5. /**
  6. * 所有的事件
  7. * @var array
  8. */
  9. public $_allEvents = array();
  10. /**
  11. * 所有信号事件
  12. * @var array
  13. */
  14. public $_signalEvents = array();
  15. /**
  16. * 监听这些描述符的读事件
  17. * @var array
  18. */
  19. protected $_readFds = array();
  20. /**
  21. * 监听这些描述符的写事件
  22. * @var array
  23. */
  24. protected $_writeFds = array();
  25. /**
  26. * 任务调度器,最大堆
  27. * {['data':timer_id, 'priority':run_timestamp], ..}
  28. * @var SplPriorityQueue
  29. */
  30. protected $_scheduler = null;
  31. /**
  32. * 定时任务
  33. * [[func, args, flag, timer_interval], ..]
  34. * @var array
  35. */
  36. protected $_task = array();
  37. /**
  38. * 定时器id
  39. * @var int
  40. */
  41. protected $_timerId = 1;
  42. /**
  43. * select超时时间,单位:微妙
  44. * @var int
  45. */
  46. protected $_selectTimeout = 100000000;
  47. /**
  48. * 构造函数
  49. * @return void
  50. */
  51. public function __construct()
  52. {
  53. // 创建一个管道,放入监听读的描述符集合中,避免空轮询
  54. $this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
  55. if($this->channel)
  56. {
  57. stream_set_blocking($this->channel[0], 0);
  58. $this->_readFds[0] = $this->channel[0];
  59. }
  60. // 初始化优先队列(最大堆)
  61. $this->_scheduler = new \SplPriorityQueue();
  62. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  63. }
  64. /**
  65. * 添加事件及处理函数
  66. * @see Events\EventInterface::add()
  67. */
  68. public function add($fd, $flag, $func, $args = null)
  69. {
  70. // key
  71. $fd_key = (int)$fd;
  72. switch ($flag)
  73. {
  74. case self::EV_READ:
  75. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  76. $this->_readFds[$fd_key] = $fd;
  77. break;
  78. case self::EV_WRITE:
  79. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  80. $this->_writeFds[$fd_key] = $fd;
  81. break;
  82. case self::EV_SIGNAL:
  83. $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
  84. pcntl_signal($fd, array($this, 'signalHandler'));
  85. break;
  86. case self::EV_TIMER:
  87. case self::EV_TIMER_ONCE:
  88. // $fd 为 定时的时间间隔,单位为秒,支持小数,能精确到0.001秒
  89. $run_time = microtime(true)+$fd_key;
  90. $this->_scheduler->insert($this->_timerId, -$run_time);
  91. $this->_task[$this->_timerId] = array($func, $args, $flag, $fd_key);
  92. $this->tick();
  93. return $this->_timerId++;
  94. }
  95. return true;
  96. }
  97. /**
  98. * 信号处理函数
  99. * @param int $signal
  100. */
  101. public function signalHandler($signal)
  102. {
  103. call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal));
  104. }
  105. /**
  106. * 删除某个描述符的某类事件的监听
  107. * @see Events\EventInterface::del()
  108. */
  109. public function del($fd ,$flag)
  110. {
  111. $fd_key = (int)$fd;
  112. switch ($flag)
  113. {
  114. case self::EV_READ:
  115. unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]);
  116. if(empty($this->_allEvents[$fd_key]))
  117. {
  118. unset($this->_allEvents[$fd_key]);
  119. }
  120. return true;
  121. case self::EV_WRITE:
  122. unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
  123. if(empty($this->_allEvents[$fd_key]))
  124. {
  125. unset($this->_allEvents[$fd_key]);
  126. }
  127. return true;
  128. case self::EV_SIGNAL:
  129. unset($this->_signalEvents[$fd_key]);
  130. pcntl_signal($fd, SIG_IGN);
  131. break;
  132. case self::EV_TIMER:
  133. case self::EV_TIMER_ONCE;
  134. // $fd_key为要删除的定时器id,即timerId
  135. unset($this->_task[$fd_key]);
  136. return true;
  137. }
  138. return false;;
  139. }
  140. /**
  141. * 检查是否有可执行的定时任务,有的话执行
  142. * @return void
  143. */
  144. protected function tick()
  145. {
  146. while(!$this->_scheduler->isEmpty())
  147. {
  148. $scheduler_data = $this->_scheduler->top();
  149. $timer_id = $scheduler_data['data'];
  150. $next_run_time = -$scheduler_data['priority'];
  151. $time_now = microtime(true);
  152. if($time_now >= $next_run_time)
  153. {
  154. $this->_scheduler->extract();
  155. // 如果任务不存在,则是对应的定时器已经删除
  156. if(!isset($this->_task[$timer_id]))
  157. {
  158. continue;
  159. }
  160. // 任务数据[func, args, flag, timer_interval]
  161. $task_data = $this->_task[$timer_id];
  162. // 如果是持续的定时任务,再把任务加到定时队列
  163. if($task_data[2] == self::EV_TIMER)
  164. {
  165. $next_run_time = $time_now+$task_data[3];
  166. $this->_scheduler->insert($timer_id, -$next_run_time);
  167. }
  168. // 尝试执行任务
  169. try
  170. {
  171. call_user_func($task_data[0], $task_data[1]);
  172. }
  173. catch(\Exception $e)
  174. {
  175. echo $e;
  176. }
  177. continue;
  178. }
  179. else
  180. {
  181. // 设定超时时间
  182. $this->_selectTimeout = ($next_run_time - $time_now)*1000000;
  183. return;
  184. }
  185. }
  186. $this->_selectTimeout = 100000000;
  187. }
  188. /**
  189. * 删除所有定时器
  190. * @return void
  191. */
  192. public function clearAllTimer()
  193. {
  194. $this->_scheduler = new \SplPriorityQueue();
  195. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  196. $this->_task = array();
  197. }
  198. /**
  199. * 主循环
  200. * @see Events\EventInterface::loop()
  201. */
  202. public function loop()
  203. {
  204. $e = null;
  205. while (1)
  206. {
  207. // 如果有信号,尝试执行信号处理函数
  208. pcntl_signal_dispatch();
  209. $read = $this->_readFds;
  210. $write = $this->_writeFds;
  211. // 等待可读或者可写事件
  212. @stream_select($read, $write, $e, 0, $this->_selectTimeout);
  213. // 这些描述符可读,执行对应描述符的读回调函数
  214. if($read)
  215. {
  216. foreach($read as $fd)
  217. {
  218. $fd_key = (int) $fd;
  219. if(isset($this->_allEvents[$fd_key][self::EV_READ]))
  220. {
  221. call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1]));
  222. }
  223. }
  224. }
  225. // 这些描述符可写,执行对应描述符的写回调函数
  226. if($write)
  227. {
  228. foreach($write as $fd)
  229. {
  230. $fd_key = (int) $fd;
  231. if(isset($this->_allEvents[$fd_key][self::EV_WRITE]))
  232. {
  233. call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
  234. }
  235. }
  236. }
  237. // 尝试执行定时任务
  238. if(!$this->_scheduler->isEmpty())
  239. {
  240. $this->tick();
  241. }
  242. }
  243. }
  244. }