|
|
@@ -90,7 +90,7 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
protected $eventLoopName ="\\Man\\Core\\Events\\Select";
|
|
|
|
|
|
/**
|
|
|
- * 时间轮询库实例
|
|
|
+ * 事件轮询库实例
|
|
|
* @var object
|
|
|
*/
|
|
|
protected $event = null;
|
|
|
@@ -129,20 +129,19 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * 用户worker继承此worker类必须实现该方法,根据具体协议和当前收到的数据决定是否继续收包
|
|
|
- * @param string $recv_str 收到的数据包
|
|
|
- * @return int/false 返回0表示接收完毕/>0表示还有多少字节没有接收到/false出错
|
|
|
+ * 必须实现该方法,根据具体协议和当前收到的数据决定是否继续收包
|
|
|
+ * @param string $bin 收到的数据包(可能是二进制)
|
|
|
+ * @return int/false 返回0表示接收完毕 ; int>0表示还有int字节没有接收; false数据包出错(例如数据包不合法等)
|
|
|
*/
|
|
|
- abstract public function dealInput($recv_str);
|
|
|
+ abstract public function dealInput($bin);
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * 用户worker继承此worker类必须实现该方法,根据包中的数据处理逻辑
|
|
|
- * 逻辑处理
|
|
|
- * @param string $recv_str 收到的数据包
|
|
|
+ * 必须实现该方法,根据包中的数据处理逻辑
|
|
|
+ * @param string $bin 收到的数据包
|
|
|
* @return void
|
|
|
*/
|
|
|
- abstract public function dealProcess($recv_str);
|
|
|
+ abstract public function dealProcess($bin);
|
|
|
|
|
|
|
|
|
/**
|
|
|
@@ -159,10 +158,11 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
|
|
|
// 是否开启长连接
|
|
|
$this->isPersistentConnection = (bool)Lib\Config::get( $this->workerName . '.persistent_connection');
|
|
|
- // 最大请求数,如果没有配置则使用PHP_INT_MAX
|
|
|
+ // 最大请求数,超过这个数则安全重启,如果没有配置则使用PHP_INT_MAX
|
|
|
$this->maxRequests = (int)Lib\Config::get( $this->workerName . '.max_requests');
|
|
|
$this->maxRequests = $this->maxRequests <= 0 ? PHP_INT_MAX : $this->maxRequests;
|
|
|
|
|
|
+ // 预读数据长度,长连接需要设置此项
|
|
|
$preread_length = (int)Lib\Config::get( $this->workerName . '.preread_length');
|
|
|
if($preread_length > 0)
|
|
|
{
|
|
|
@@ -186,8 +186,6 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
$this->addShutdownHook();
|
|
|
|
|
|
// 初始化事件轮询库
|
|
|
- // $this->event = new Libevent();
|
|
|
- // $this->event = new Select();
|
|
|
$this->event = new $this->eventLoopName();
|
|
|
}
|
|
|
|
|
|
@@ -221,16 +219,15 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
|
|
|
// 主体循环,整个子进程会阻塞在这个函数上
|
|
|
$ret = $this->event->loop();
|
|
|
- $this->notice('worker loop exit');
|
|
|
+ $this->notice('worker loop exit unexpected');
|
|
|
exit(0);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 停止服务
|
|
|
- * @param bool $exit 是否退出
|
|
|
* @return void
|
|
|
*/
|
|
|
- public function stop($exit = true)
|
|
|
+ public function stop()
|
|
|
{
|
|
|
// 触发该worker进程onStop事件
|
|
|
if($this->onStop())
|
|
|
@@ -250,10 +247,7 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
// 没有链接要处理了
|
|
|
if($this->allTaskHasDone())
|
|
|
{
|
|
|
- if($exit)
|
|
|
- {
|
|
|
- exit(0);
|
|
|
- }
|
|
|
+ exit(0);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -271,7 +265,6 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
// 获取协议
|
|
|
$mata_data = stream_get_meta_data($socket);
|
|
|
$this->protocol = substr($mata_data['stream_type'], 0, 3);
|
|
|
-
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -395,7 +388,7 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
{
|
|
|
$this->notice('CODE:' . $e->getCode() . ' MESSAGE:' . $e->getMessage()."\n".$e->getTraceAsString()."\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
|
|
|
$this->statusInfo['throw_exception'] ++;
|
|
|
- $this->sendToClient($e->getMessage());
|
|
|
+ $this->sendToClient($e);
|
|
|
}
|
|
|
|
|
|
// 是否是长连接
|
|
|
@@ -461,15 +454,15 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
protected function installSignal()
|
|
|
{
|
|
|
// 闹钟信号
|
|
|
- $this->event->add(SIGALRM, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGALRM);
|
|
|
+ $this->event->add(SIGALRM, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
// 终止进程信号
|
|
|
- $this->event->add(SIGINT, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGINT);
|
|
|
+ $this->event->add(SIGINT, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
// 平滑重启信号
|
|
|
- $this->event->add(SIGHUP, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGHUP);
|
|
|
+ $this->event->add(SIGHUP, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
// 报告进程状态
|
|
|
- $this->event->add(SIGUSR1, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGUSR1);
|
|
|
+ $this->event->add(SIGUSR1, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
// 报告该进程使用的文件
|
|
|
- $this->event->add(SIGUSR2, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'), SIGUSR2);
|
|
|
+ $this->event->add(SIGUSR2, Events\BaseEvent::EV_SIGNAL, array($this, 'signalHandler'));
|
|
|
|
|
|
// 设置忽略信号
|
|
|
pcntl_signal(SIGTTIN, SIG_IGN);
|
|
|
@@ -538,14 +531,6 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
$this->sendBuffers[$this->currentDealFd] = $str_to_send;
|
|
|
}
|
|
|
$this->event->add($this->connections[$this->currentDealFd], Events\BaseEvent::EV_WRITE, array($this, 'tcpWriteToClient'), array($this->currentDealFd));
|
|
|
- /*
|
|
|
- // tcp 如果一次没写完(一般是缓冲区满的情况),则阻塞写
|
|
|
- if(!$this->blockWrite($this->connections[$this->currentDealFd], $str_to_send, 500))
|
|
|
- {
|
|
|
- $this->notice('sendToClient fail ,Data length = ' . strlen($str_to_send));
|
|
|
- $this->statusInfo['send_fail']++;
|
|
|
- return false;
|
|
|
- }*/
|
|
|
return null;
|
|
|
}
|
|
|
// udp 直接发送,要求数据包不能超过65515
|
|
|
@@ -554,7 +539,7 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
|
|
|
/**
|
|
|
* 向客户端socket写数据
|
|
|
- * @param resource $fd
|
|
|
+ * @param int $fd
|
|
|
* @param string $bin_data
|
|
|
*/
|
|
|
public function tcpWriteToClient($fd)
|
|
|
@@ -583,41 +568,6 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 向fd写数据,如果socket缓冲区满了,则改用阻塞模式写数据
|
|
|
- * @param resource $fd
|
|
|
- * @param string $str_to_write
|
|
|
- * @param int $time_out 单位毫秒
|
|
|
- * @return bool
|
|
|
- */
|
|
|
- protected function blockWrite($fd, $str_to_write, $timeout_ms = 500)
|
|
|
- {
|
|
|
- $send_len = @fwrite($fd, $str_to_write);
|
|
|
- if($send_len == strlen($str_to_write))
|
|
|
- {
|
|
|
- return true;
|
|
|
- }
|
|
|
-
|
|
|
- // 客户端关闭
|
|
|
- if(feof($fd))
|
|
|
- {
|
|
|
- $this->notice("blockWrite client close");
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- // 设置阻塞
|
|
|
- stream_set_blocking($fd, 1);
|
|
|
- // 设置超时
|
|
|
- $timeout_sec = floor($timeout_ms/1000);
|
|
|
- $timeout_ms = $timeout_ms%1000;
|
|
|
- stream_set_timeout($fd, $timeout_sec, $timeout_ms*1000);
|
|
|
- $send_len += @fwrite($fd, substr($str_to_write, $send_len));
|
|
|
- // 改回非阻塞
|
|
|
- stream_set_blocking($fd, 0);
|
|
|
-
|
|
|
- return $send_len == strlen($str_to_write);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
* 获取客户端地址
|
|
|
* @param int $fd 已经链接的socket id
|
|
|
* @return string ip:port
|
|
|
@@ -798,6 +748,3 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
}
|
|
|
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
-
|