Select.php 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Events;
  15. /**
  16. * select eventloop
  17. */
  18. class Select implements EventInterface
  19. {
  20. /**
  21. * 所有的事件
  22. * @var array
  23. */
  24. public $_allEvents = array();
  25. /**
  26. * 所有信号事件
  27. * @var array
  28. */
  29. public $_signalEvents = array();
  30. /**
  31. * 监听这些描述符的读事件
  32. * @var array
  33. */
  34. protected $_readFds = array();
  35. /**
  36. * 监听这些描述符的写事件
  37. * @var array
  38. */
  39. protected $_writeFds = array();
  40. /**
  41. * 任务调度器,最大堆
  42. * {['data':timer_id, 'priority':run_timestamp], ..}
  43. * @var SplPriorityQueue
  44. */
  45. protected $_scheduler = null;
  46. /**
  47. * 定时任务
  48. * [[func, args, flag, timer_interval], ..]
  49. * @var array
  50. */
  51. protected $_task = array();
  52. /**
  53. * 定时器id
  54. * @var int
  55. */
  56. protected $_timerId = 1;
  57. /**
  58. * select超时时间,单位:微妙
  59. * @var int
  60. */
  61. protected $_selectTimeout = 100000000;
  62. /**
  63. * 构造函数
  64. * @return void
  65. */
  66. public function __construct()
  67. {
  68. // 创建一个管道,放入监听读的描述符集合中,避免空轮询
  69. $this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
  70. if($this->channel)
  71. {
  72. stream_set_blocking($this->channel[0], 0);
  73. $this->_readFds[0] = $this->channel[0];
  74. }
  75. // 初始化优先队列(最大堆)
  76. $this->_scheduler = new \SplPriorityQueue();
  77. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  78. }
  79. /**
  80. * 添加事件及处理函数
  81. * @see Events\EventInterface::add()
  82. */
  83. public function add($fd, $flag, $func, $args = null)
  84. {
  85. switch ($flag)
  86. {
  87. case self::EV_READ:
  88. $fd_key = (int)$fd;
  89. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  90. $this->_readFds[$fd_key] = $fd;
  91. break;
  92. case self::EV_WRITE:
  93. $fd_key = (int)$fd;
  94. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  95. $this->_writeFds[$fd_key] = $fd;
  96. break;
  97. case self::EV_SIGNAL:
  98. $fd_key = (int)$fd;
  99. $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
  100. pcntl_signal($fd, array($this, 'signalHandler'));
  101. break;
  102. case self::EV_TIMER:
  103. case self::EV_TIMER_ONCE:
  104. // $fd 为 定时的时间间隔,单位为秒,支持小数,能精确到0.001秒
  105. $run_time = microtime(true)+$fd;
  106. $this->_scheduler->insert($this->_timerId, -$run_time);
  107. $this->_task[$this->_timerId] = array($func, $args, $flag, $fd);
  108. $this->tick();
  109. return $this->_timerId++;
  110. }
  111. return true;
  112. }
  113. /**
  114. * 信号处理函数
  115. * @param int $signal
  116. */
  117. public function signalHandler($signal)
  118. {
  119. call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal));
  120. }
  121. /**
  122. * 删除某个描述符的某类事件的监听
  123. * @see Events\EventInterface::del()
  124. */
  125. public function del($fd ,$flag)
  126. {
  127. $fd_key = (int)$fd;
  128. switch ($flag)
  129. {
  130. case self::EV_READ:
  131. unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]);
  132. if(empty($this->_allEvents[$fd_key]))
  133. {
  134. unset($this->_allEvents[$fd_key]);
  135. }
  136. return true;
  137. case self::EV_WRITE:
  138. unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
  139. if(empty($this->_allEvents[$fd_key]))
  140. {
  141. unset($this->_allEvents[$fd_key]);
  142. }
  143. return true;
  144. case self::EV_SIGNAL:
  145. unset($this->_signalEvents[$fd_key]);
  146. pcntl_signal($fd, SIG_IGN);
  147. break;
  148. case self::EV_TIMER:
  149. case self::EV_TIMER_ONCE;
  150. // $fd_key为要删除的定时器id,即timerId
  151. unset($this->_task[$fd_key]);
  152. return true;
  153. }
  154. return false;;
  155. }
  156. /**
  157. * 检查是否有可执行的定时任务,有的话执行
  158. * @return void
  159. */
  160. protected function tick()
  161. {
  162. while(!$this->_scheduler->isEmpty())
  163. {
  164. $scheduler_data = $this->_scheduler->top();
  165. $timer_id = $scheduler_data['data'];
  166. $next_run_time = -$scheduler_data['priority'];
  167. $time_now = microtime(true);
  168. if($time_now >= $next_run_time)
  169. {
  170. $this->_scheduler->extract();
  171. // 如果任务不存在,则是对应的定时器已经删除
  172. if(!isset($this->_task[$timer_id]))
  173. {
  174. continue;
  175. }
  176. // 任务数据[func, args, flag, timer_interval]
  177. $task_data = $this->_task[$timer_id];
  178. // 如果是持续的定时任务,再把任务加到定时队列
  179. if($task_data[2] === self::EV_TIMER)
  180. {
  181. $next_run_time = $time_now+$task_data[3];
  182. $this->_scheduler->insert($timer_id, -$next_run_time);
  183. }
  184. // 尝试执行任务
  185. try
  186. {
  187. call_user_func_array($task_data[0], $task_data[1]);
  188. }
  189. catch(\Exception $e)
  190. {
  191. echo $e;
  192. }
  193. continue;
  194. }
  195. else
  196. {
  197. // 设定超时时间
  198. $this->_selectTimeout = ($next_run_time - $time_now)*1000000;
  199. return;
  200. }
  201. }
  202. $this->_selectTimeout = 100000000;
  203. }
  204. /**
  205. * 删除所有定时器
  206. * @return void
  207. */
  208. public function clearAllTimer()
  209. {
  210. $this->_scheduler = new \SplPriorityQueue();
  211. $this->_scheduler->setExtractFlags(\SplPriorityQueue::EXTR_BOTH);
  212. $this->_task = array();
  213. }
  214. /**
  215. * 主循环
  216. * @see Events\EventInterface::loop()
  217. */
  218. public function loop()
  219. {
  220. $e = null;
  221. while (1)
  222. {
  223. // 如果有信号,尝试执行信号处理函数
  224. pcntl_signal_dispatch();
  225. $read = $this->_readFds;
  226. $write = $this->_writeFds;
  227. // 等待可读或者可写事件
  228. @stream_select($read, $write, $e, 0, $this->_selectTimeout);
  229. // 尝试执行定时任务
  230. if(!$this->_scheduler->isEmpty())
  231. {
  232. $this->tick();
  233. }
  234. // 这些描述符可读,执行对应描述符的读回调函数
  235. if($read)
  236. {
  237. foreach($read as $fd)
  238. {
  239. $fd_key = (int) $fd;
  240. if(isset($this->_allEvents[$fd_key][self::EV_READ]))
  241. {
  242. call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1]));
  243. }
  244. }
  245. }
  246. // 这些描述符可写,执行对应描述符的写回调函数
  247. if($write)
  248. {
  249. foreach($write as $fd)
  250. {
  251. $fd_key = (int) $fd;
  252. if(isset($this->_allEvents[$fd_key][self::EV_WRITE]))
  253. {
  254. call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
  255. }
  256. }
  257. }
  258. }
  259. }
  260. }