|
|
@@ -60,12 +60,24 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
protected $recvBuffers = array();
|
|
|
|
|
|
/**
|
|
|
+ * 接收缓冲区最大值 单位字节 默认10MB
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
+ protected $maxRecvBufferSize = 10485760;
|
|
|
+
|
|
|
+ /**
|
|
|
* 客户端连接的写buffer
|
|
|
* @var array
|
|
|
*/
|
|
|
protected $sendBuffers = array();
|
|
|
|
|
|
/**
|
|
|
+ * 发送缓冲区最大自 单位字节 默认20MB
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
+ protected $maxSendBufferSize = 20971520;
|
|
|
+
|
|
|
+ /**
|
|
|
* 当前处理的fd
|
|
|
* @var integer
|
|
|
*/
|
|
|
@@ -173,6 +185,18 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
$this->prereadLength = 65535;
|
|
|
}
|
|
|
|
|
|
+ // 接收缓冲区大小限制
|
|
|
+ if($max_recv_buffer_size = Lib\Config::get( $this->workerName . '.max_recv_buffer_size') && $max_recv_buffer_size > 0)
|
|
|
+ {
|
|
|
+ $this->maxRecvBufferSize = $max_recv_buffer_size;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 发送缓冲区大小限制
|
|
|
+ if($max_send_buffer_size = Lib\Config::get( $this->workerName . '.max_send_buffer_size') && $max_send_buffer_size > 0)
|
|
|
+ {
|
|
|
+ $this->maxSendBufferSize = $max_send_buffer_size;
|
|
|
+ }
|
|
|
+
|
|
|
// worker启动时间
|
|
|
$this->statusInfo['start_time'] = time();
|
|
|
|
|
|
@@ -412,6 +436,12 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
+ // 判断是否大于接收缓冲区最大值限制
|
|
|
+ if(strlen($this->recvBuffers[$fd]['buf']) + $remain_len > $this->maxRecvBufferSize)
|
|
|
+ {
|
|
|
+ $this->notice('client_ip:'.$this->getRemoteIp().' strlen(recvBuffers['.$this->currentDealFd.'])='.strlen($this->recvBuffers[$fd]['buf']).'+' . $remain_len . '>' . $this->maxRecvBufferSize.' and close connection');
|
|
|
+ return false;
|
|
|
+ }
|
|
|
$this->recvBuffers[$fd]['remain_len'] = $remain_len;
|
|
|
}
|
|
|
|
|
|
@@ -522,29 +552,42 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
* 发送数据到客户端
|
|
|
* @return bool
|
|
|
*/
|
|
|
- public function sendToClient($str_to_send)
|
|
|
+ public function sendToClient($buffer_to_send)
|
|
|
{
|
|
|
// tcp
|
|
|
if($this->protocol != 'udp')
|
|
|
{
|
|
|
if(!empty($this->sendBuffers[$this->currentDealFd]))
|
|
|
{
|
|
|
- $this->sendBuffers[$this->currentDealFd] .= $str_to_send;
|
|
|
+ // 获得将要发送的buffer的长度
|
|
|
+ $total_send_buffer_len = strlen($this->sendBuffers[$this->currentDealFd]) + strlen($buffer_to_send);
|
|
|
+ // 如果大于最大限制值则丢弃这儿包
|
|
|
+ if($total_send_buffer_len > $this->maxSendBufferSize)
|
|
|
+ {
|
|
|
+ $this->notice('client_ip:'.$this->getRemoteIp().' strlen(sendBuffer['.$this->currentDealFd.'])='.$total_send_buffer_len.'>' . $this->maxSendBufferSize.' and droped');
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ // 将数据放入发送缓冲区中,等待发送
|
|
|
+ $this->sendBuffers[$this->currentDealFd] .= $buffer_to_send;
|
|
|
return;
|
|
|
}
|
|
|
- $send_len = @fwrite($this->connections[$this->currentDealFd], $str_to_send);
|
|
|
- if($send_len === strlen($str_to_send))
|
|
|
+ // 执行发送
|
|
|
+ $send_len = @fwrite($this->connections[$this->currentDealFd], $buffer_to_send);
|
|
|
+ // 发送完全
|
|
|
+ if($send_len === strlen($buffer_to_send))
|
|
|
{
|
|
|
return true;
|
|
|
}
|
|
|
+ // 长度大于0
|
|
|
if($send_len > 0)
|
|
|
{
|
|
|
- $this->sendBuffers[$this->currentDealFd] = substr($str_to_send, $send_len);
|
|
|
+ $this->sendBuffers[$this->currentDealFd] = substr($buffer_to_send, $send_len);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- $this->sendBuffers[$this->currentDealFd] = $str_to_send;
|
|
|
+ $this->sendBuffers[$this->currentDealFd] = $buffer_to_send;
|
|
|
}
|
|
|
+
|
|
|
if(!isset($this->connections[$this->currentDealFd]))
|
|
|
{
|
|
|
$debug_str = new \Exception('sendToClient fail $this->connections['.var_export($this->currentDealFd, true).'] is null');
|
|
|
@@ -559,7 +602,7 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
return null;
|
|
|
}
|
|
|
// udp 直接发送,要求数据包不能超过65515
|
|
|
- return strlen($str_to_send) == stream_socket_sendto($this->mainSocket, $str_to_send, 0, $this->currentClientAddress);
|
|
|
+ return strlen($buffer_to_send) == stream_socket_sendto($this->mainSocket, $buffer_to_send, 0, $this->currentClientAddress);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -579,12 +622,12 @@ abstract class SocketWorker extends AbstractWorker
|
|
|
$send_len = @fwrite($this->connections[$fd], $this->sendBuffers[$fd]);
|
|
|
if($send_len === strlen($this->sendBuffers[$fd]))
|
|
|
{
|
|
|
+ $this->event->del($this->connections[$fd], BaseEvent::EV_WRITE);
|
|
|
if(!$this->isPersistentConnection)
|
|
|
{
|
|
|
return $this->closeClient($fd);
|
|
|
}
|
|
|
$this->sendBuffers[$fd] = '';
|
|
|
- $this->event->del($this->connections[$fd], BaseEvent::EV_WRITE);
|
|
|
return;
|
|
|
}
|
|
|
|