Select.php 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. <?php
  2. namespace WORKERMAN\Core\Events;
  3. require_once WORKERMAN_ROOT_DIR . 'Core/Events/interfaces.php';
  4. /**
  5. *
  6. * select 轮询封装
  7. * 目前master进程使用这个库
  8. * 如果没有其它可用库worker进程也会自动使用该库
  9. *
  10. * @author walkor <worker-man@qq.com>
  11. */
  12. class Select implements BaseEvent
  13. {
  14. /**
  15. * 记录所有事件处理函数及参数
  16. * @var array
  17. */
  18. public $allEvents = array();
  19. /**
  20. * 记录所有信号处理函数及参数
  21. * @var array
  22. */
  23. public $signalEvents = array();
  24. /**
  25. * 监听的读描述符
  26. * @var array
  27. */
  28. public $readFds = array();
  29. /**
  30. * 监听的写描述符
  31. * @var array
  32. */
  33. public $writeFds = array();
  34. /**
  35. * 搞个fd,避免 $readFds $writeFds 都为空时select 失败
  36. * @var resource
  37. */
  38. public $channel = null;
  39. /**
  40. * 读超时 毫秒
  41. * @var integer
  42. */
  43. protected $readTimeout = 1000;
  44. /**
  45. * 写超时 毫秒
  46. * @var integer
  47. */
  48. protected $writeTimeout = 1000;
  49. /**
  50. * 超时触发的事件
  51. * @var array
  52. */
  53. protected $selectTimeOutEvent = array();
  54. /**
  55. * 系统调用被打断触发的事件,一般是收到信号
  56. * @var array
  57. */
  58. protected $selectInterruptEvent = array();
  59. /**
  60. * 构造函数 创建一个管道,避免select空fd
  61. * @return void
  62. */
  63. public function __construct()
  64. {
  65. $this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
  66. if($this->channel)
  67. {
  68. stream_set_blocking($this->channel[0], 0);
  69. $this->readFds[0] = $this->channel[0];
  70. }
  71. fclose($this->channel[0]);
  72. }
  73. /**
  74. * 添加事件
  75. * @see \WORKERMAN\Core\Events\BaseEvent::add()
  76. */
  77. public function add($fd, $flag, $func, $args = null)
  78. {
  79. // key
  80. $event_key = (int)$fd;
  81. switch ($flag)
  82. {
  83. // 可读事件
  84. case self::EV_READ:
  85. $this->allEvents[$event_key][$flag] = array('args'=>$args, 'func'=>$func, 'fd'=>$fd);
  86. $this->readFds[$event_key] = $fd;
  87. break;
  88. // 写事件 目前没用到,未实现
  89. case self::EV_WRITE:
  90. $this->allEvents[$event_key][$flag] = array('args'=>$args, 'func'=>$func, 'fd'=>$fd);
  91. $this->writeFds[$event_key] = $fd;
  92. break;
  93. // 信号处理事件
  94. case self::EV_SIGNAL:
  95. $this->signalEvents[$event_key][$flag] = array('args'=>$args, 'func'=>$func, 'fd'=>$fd);
  96. pcntl_signal($fd, array($this, 'signalHandler'));
  97. break;
  98. }
  99. return true;
  100. }
  101. /**
  102. * 回调信号处理函数
  103. * @param int $signal
  104. */
  105. public function signalHandler($signal)
  106. {
  107. call_user_func_array($this->signalEvents[$signal][self::EV_SIGNAL]['func'], array($signal, self::EV_SIGNAL, $signal));
  108. }
  109. /**
  110. * 删除某个fd的某个事件
  111. * @see \WORKERMAN\Core\Events\BaseEvent::del()
  112. */
  113. public function del($fd ,$flag)
  114. {
  115. $event_key = (int)$fd;
  116. switch ($flag)
  117. {
  118. // 可读事件
  119. case self::EV_READ:
  120. unset($this->allEvents[$event_key][$flag], $this->readFds[$event_key]);
  121. if(empty($this->allEvents[$event_key]))
  122. {
  123. unset($this->allEvents[$event_key]);
  124. }
  125. break;
  126. // 可写事件
  127. case self::EV_WRITE:
  128. unset($this->allEvents[$event_key][$flag], $this->writeFds[$event_key]);
  129. if(empty($this->allEvents[$event_key]))
  130. {
  131. unset($this->allEvents[$event_key]);
  132. }
  133. break;
  134. // 信号
  135. case self::EV_SIGNAL:
  136. unset($this->signalEvents[$event_key]);
  137. pcntl_signal($fd, SIG_IGN);
  138. break;
  139. }
  140. return true;
  141. }
  142. /**
  143. * 事件轮训库主循环
  144. * @see \WORKERMAN\Core\Events\BaseEvent::loop()
  145. */
  146. public function loop()
  147. {
  148. $e = null;
  149. while (1)
  150. {
  151. $read = $this->readFds;
  152. $write = $this->writeFds;
  153. // stream_select false:出错 0:超时
  154. if(!($ret = @stream_select($read, $write, $e, 1)))
  155. {
  156. // 超时
  157. if($ret === 0)
  158. {
  159. }
  160. // 被系统调用或者信号打断
  161. elseif($ret === false)
  162. {
  163. }
  164. // 触发信号处理函数
  165. pcntl_signal_dispatch();
  166. continue;
  167. }
  168. // 触发信号处理函数
  169. pcntl_signal_dispatch();
  170. // 检查所有可读描述符
  171. foreach($read as $fd)
  172. {
  173. $event_key = (int) $fd;
  174. if(isset($this->allEvents[$event_key][self::EV_READ]))
  175. {
  176. call_user_func_array($this->allEvents[$event_key][self::EV_READ]['func'], array($this->allEvents[$event_key][self::EV_READ]['fd'], self::EV_READ, $this->allEvents[$event_key][self::EV_READ]['args']));
  177. }
  178. }
  179. // 检查可写描述符,没用到,暂不实现
  180. foreach($write as $fd)
  181. {
  182. $event_key = (int) $fd;
  183. if(isset($this->allEvents[$event_key][self::EV_WRITE]))
  184. {
  185. // 留空
  186. }
  187. }
  188. }
  189. }
  190. }