| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- <?php
- namespace WORKERMAN\Core\Events;
- require_once WORKERMAN_ROOT_DIR . 'Core/Events/interfaces.php';
- /**
- *
- * select 轮询封装
- * 目前master进程使用这个库
- * 如果没有其它可用库worker进程也会自动使用该库
- *
- * @author walkor <worker-man@qq.com>
- */
- class Select implements BaseEvent
- {
- /**
- * 记录所有事件处理函数及参数
- * @var array
- */
- public $allEvents = array();
-
- /**
- * 记录所有信号处理函数及参数
- * @var array
- */
- public $signalEvents = array();
-
- /**
- * 监听的读描述符
- * @var array
- */
- public $readFds = array();
-
- /**
- * 监听的写描述符
- * @var array
- */
- public $writeFds = array();
-
- /**
- * 搞个fd,避免 $readFds $writeFds 都为空时select 失败
- * @var resource
- */
- public $channel = null;
-
- /**
- * 读超时 毫秒
- * @var integer
- */
- protected $readTimeout = 1000;
-
- /**
- * 写超时 毫秒
- * @var integer
- */
- protected $writeTimeout = 1000;
-
- /**
- * 超时触发的事件
- * @var array
- */
- protected $selectTimeOutEvent = array();
-
- /**
- * 系统调用被打断触发的事件,一般是收到信号
- * @var array
- */
- protected $selectInterruptEvent = array();
-
- /**
- * 构造函数 创建一个管道,避免select空fd
- * @return void
- */
- public function __construct()
- {
- $this->channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
- if($this->channel)
- {
- stream_set_blocking($this->channel[0], 0);
- $this->readFds[0] = $this->channel[0];
- }
- fclose($this->channel[0]);
- }
-
- /**
- * 添加事件
- * @see \WORKERMAN\Core\Events\BaseEvent::add()
- */
- public function add($fd, $flag, $func, $args = null)
- {
- // key
- $event_key = (int)$fd;
- switch ($flag)
- {
- // 可读事件
- case self::EV_READ:
- $this->allEvents[$event_key][$flag] = array('args'=>$args, 'func'=>$func, 'fd'=>$fd);
- $this->readFds[$event_key] = $fd;
- break;
- // 写事件 目前没用到,未实现
- case self::EV_WRITE:
- $this->allEvents[$event_key][$flag] = array('args'=>$args, 'func'=>$func, 'fd'=>$fd);
- $this->writeFds[$event_key] = $fd;
- break;
- // 信号处理事件
- case self::EV_SIGNAL:
- $this->signalEvents[$event_key][$flag] = array('args'=>$args, 'func'=>$func, 'fd'=>$fd);
- pcntl_signal($fd, array($this, 'signalHandler'));
- break;
- }
-
- return true;
- }
-
- /**
- * 回调信号处理函数
- * @param int $signal
- */
- public function signalHandler($signal)
- {
- call_user_func_array($this->signalEvents[$signal][self::EV_SIGNAL]['func'], array($signal, self::EV_SIGNAL, $signal));
- }
-
- /**
- * 删除某个fd的某个事件
- * @see \WORKERMAN\Core\Events\BaseEvent::del()
- */
- public function del($fd ,$flag)
- {
- $event_key = (int)$fd;
- switch ($flag)
- {
- // 可读事件
- case self::EV_READ:
- unset($this->allEvents[$event_key][$flag], $this->readFds[$event_key]);
- if(empty($this->allEvents[$event_key]))
- {
- unset($this->allEvents[$event_key]);
- }
- break;
- // 可写事件
- case self::EV_WRITE:
- unset($this->allEvents[$event_key][$flag], $this->writeFds[$event_key]);
- if(empty($this->allEvents[$event_key]))
- {
- unset($this->allEvents[$event_key]);
- }
- break;
- // 信号
- case self::EV_SIGNAL:
- unset($this->signalEvents[$event_key]);
- pcntl_signal($fd, SIG_IGN);
- break;
- }
- return true;
- }
- /**
- * 事件轮训库主循环
- * @see \WORKERMAN\Core\Events\BaseEvent::loop()
- */
- public function loop()
- {
- $e = null;
- while (1)
- {
- $read = $this->readFds;
- $write = $this->writeFds;
- // stream_select false:出错 0:超时
- if(!($ret = @stream_select($read, $write, $e, 1)))
- {
- // 超时
- if($ret === 0)
- {
- }
- // 被系统调用或者信号打断
- elseif($ret === false)
- {
- }
- // 触发信号处理函数
- pcntl_signal_dispatch();
- continue;
- }
- // 触发信号处理函数
- pcntl_signal_dispatch();
-
- // 检查所有可读描述符
- foreach($read as $fd)
- {
- $event_key = (int) $fd;
- if(isset($this->allEvents[$event_key][self::EV_READ]))
- {
- call_user_func_array($this->allEvents[$event_key][self::EV_READ]['func'], array($this->allEvents[$event_key][self::EV_READ]['fd'], self::EV_READ, $this->allEvents[$event_key][self::EV_READ]['args']));
- }
- }
-
- // 检查可写描述符,没用到,暂不实现
- foreach($write as $fd)
- {
- $event_key = (int) $fd;
- if(isset($this->allEvents[$event_key][self::EV_WRITE]))
- {
- // 留空
- }
- }
- }
- }
-
- }
|