channel = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP); if($this->channel) { stream_set_blocking($this->channel[0], 0); $this->_readFds[0] = $this->channel[0]; } } /** * 添加事件及处理函数 * @see Events\EventInterface::add() */ public function add($fd, $flag, $func) { // key $fd_key = (int)$fd; switch ($flag) { case self::EV_READ: $this->_allEvents[$fd_key][$flag] = array($func, $fd); $this->_readFds[$fd_key] = $fd; break; case self::EV_WRITE: $this->_allEvents[$fd_key][$flag] = array($func, $fd); $this->_writeFds[$fd_key] = $fd; break; case self::EV_SIGNAL: $this->_signalEvents[$fd_key][$flag] = array($func, $fd); pcntl_signal($fd, array($this, 'signalHandler')); break; } return true; } /** * 信号处理函数 * @param int $signal */ public function signalHandler($signal) { call_user_func_array($this->_signalEvents[$signal][self::EV_SIGNAL][0], array($signal)); } /** * 删除某个描述符的某类事件的监听 * @see Events\EventInterface::del() */ public function del($fd ,$flag) { $fd_key = (int)$fd; switch ($flag) { case self::EV_READ: unset($this->_allEvents[$fd_key][$flag], $this->_readFds[$fd_key]); if(empty($this->_allEvents[$fd_key])) { unset($this->_allEvents[$fd_key]); } break; case self::EV_WRITE: unset($this->_allEvents[$fd_key][$flag], $this->_writeFds[$fd_key]); if(empty($this->_allEvents[$fd_key])) { unset($this->_allEvents[$fd_key]); } break; case self::EV_SIGNAL: unset($this->_signalEvents[$fd_key]); pcntl_signal($fd, SIG_IGN); break; } return true; } /** * 主循环 * @see Events\EventInterface::loop() */ public function loop() { $e = null; while (1) { // 如果有信号,尝试执行信号处理函数 pcntl_signal_dispatch(); $read = $this->_readFds; $write = $this->_writeFds; // 等待可读或者可写事件 if(!@stream_select($read, $write, $e, 60)) { // 可能是被信号打断,尝试执行信号处理函数 pcntl_signal_dispatch(); continue; } // 这些描述符可读,执行对应描述符的读回调函数 if($read) { foreach($read as $fd) { $fd_key = (int) $fd; if(isset($this->_allEvents[$fd_key][self::EV_READ])) { call_user_func_array($this->_allEvents[$fd_key][self::EV_READ][0], array($this->_allEvents[$fd_key][self::EV_READ][1])); } } } // 这些描述符可写,执行对应描述符的写回调函数 if($write) { foreach($write as $fd) { $fd_key = (int) $fd; if(isset($this->_allEvents[$fd_key][self::EV_WRITE])) { call_user_func_array($this->_allEvents[$fd_key][self::EV_WRITE][0], array($this->_allEvents[$fd_key][self::EV_WRITE][1])); } } } } } }