Select.php 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. <?php
  2. namespace Workerman\Events;
  3. class Select implements EventInterface
  4. {
  5. /**
  6. * 所有的事件
  7. * @var array
  8. */
  9. public $_allEvents = array();
  10. /**
  11. * 所有信号事件
  12. * @var array
  13. */
  14. public $_signalEvents = array();
  15. /**
  16. * 监听这些描述符的读事件
  17. * @var array
  18. */
  19. protected $_readFds = array();
  20. /**
  21. * 监听这些描述符的写事件
  22. * @var array
  23. */
  24. protected $_writeFds = array();
  25. /**
  26. * 构造函数
  27. * @return void
  28. */
  29. public function __construct()
  30. {
  31. // 创建一个管道,放入监听读的描述符集合中,避免空轮询
  32. $this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
  33. if($this->channel)
  34. {
  35. stream_set_blocking($this->channel[0], 0);
  36. $this->_readFds[0] = $this->channel[0];
  37. }
  38. }
  39. /**
  40. * 添加事件及处理函数
  41. * @see Events\EventInterface::add()
  42. */
  43. public function add($fd, $flag, $func)
  44. {
  45. // key
  46. $fd_key = (int)$fd;
  47. switch ($flag)
  48. {
  49. case self::EV_READ:
  50. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  51. $this->_readFds[$fd_key] = $fd;
  52. break;
  53. case self::EV_WRITE:
  54. $this->_allEvents[$fd_key][$flag] = array($func, $fd);
  55. $this->_writeFds[$fd_key] = $fd;
  56. break;
  57. case self::EV_SIGNAL:
  58. $this->_signalEvents[$fd_key][$flag] = array($func, $fd);
  59. pcntl_signal($fd, array($this, 'signalHandler'));
  60. break;
  61. }
  62. return true;
  63. }
  64. /**
  65. * 信号处理函数
  66. * @param int $signal
  67. */
  68. public function signalHandler($signal)
  69. {
  70. call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal));
  71. }
  72. /**
  73. * 删除某个描述符的某类事件的监听
  74. * @see Events\EventInterface::del()
  75. */
  76. public function del($fd ,$flag)
  77. {
  78. $fd_key = (int)$fd;
  79. switch ($flag)
  80. {
  81. case self::EV_READ:
  82. unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]);
  83. if(empty($this->_allEvents[$fd_key]))
  84. {
  85. unset($this->_allEvents[$fd_key]);
  86. }
  87. break;
  88. case self::EV_WRITE:
  89. unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]);
  90. if(empty($this->_allEvents[$fd_key]))
  91. {
  92. unset($this->_allEvents[$fd_key]);
  93. }
  94. break;
  95. case self::EV_SIGNAL:
  96. unset($this->_signalEvents[$fd_key]);
  97. pcntl_signal($fd, SIG_IGN);
  98. break;
  99. }
  100. return true;
  101. }
  102. /**
  103. * 主循环
  104. * @see Events\EventInterface::loop()
  105. */
  106. public function loop()
  107. {
  108. $e = null;
  109. while (1)
  110. {
  111. // 如果有信号,尝试执行信号处理函数
  112. pcntl_signal_dispatch();
  113. $read = $this->_readFds;
  114. $write = $this->_writeFds;
  115. // 等待可读或者可写事件
  116. if(!@stream_select($read, $write, $e, 60))
  117. {
  118. // 可能是被信号打断,尝试执行信号处理函数
  119. pcntl_signal_dispatch();
  120. continue;
  121. }
  122. // 这些描述符可读,执行对应描述符的读回调函数
  123. if($read)
  124. {
  125. foreach($read as $fd)
  126. {
  127. $fd_key = (int) $fd;
  128. if(isset($this->_allEvents[$fd_key][self::EV_READ]))
  129. {
  130. call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1]));
  131. }
  132. }
  133. }
  134. // 这些描述符可写,执行对应描述符的写回调函数
  135. if($write)
  136. {
  137. foreach($write as $fd)
  138. {
  139. $fd_key = (int) $fd;
  140. if(isset($this->_allEvents[$fd_key][self::EV_WRITE]))
  141. {
  142. call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1]));
  143. }
  144. }
  145. }
  146. }
  147. }
  148. }