|
@@ -18,178 +18,169 @@ use Workerman\Worker;
|
|
|
use \Exception;
|
|
use \Exception;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Tcp连接类
|
|
|
|
|
|
|
+ * TcpConnection.
|
|
|
*/
|
|
*/
|
|
|
class TcpConnection extends ConnectionInterface
|
|
class TcpConnection extends ConnectionInterface
|
|
|
{
|
|
{
|
|
|
/**
|
|
/**
|
|
|
- * 当数据可读时,从socket缓冲区读取多少字节数据
|
|
|
|
|
|
|
+ * Read buffer size.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
const READ_BUFFER_SIZE = 65535;
|
|
const READ_BUFFER_SIZE = 65535;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 连接状态 连接中
|
|
|
|
|
|
|
+ * Status connecting.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
const STATUS_CONNECTING = 1;
|
|
const STATUS_CONNECTING = 1;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 连接状态 已经建立连接
|
|
|
|
|
|
|
+ * Status connection established.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
const STATUS_ESTABLISH = 2;
|
|
const STATUS_ESTABLISH = 2;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 连接状态 连接关闭中,标识调用了close方法,但是发送缓冲区中任然有数据
|
|
|
|
|
- * 等待发送缓冲区的数据发送完毕(写入到socket写缓冲区)后执行关闭
|
|
|
|
|
|
|
+ * Status closing.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
const STATUS_CLOSING = 4;
|
|
const STATUS_CLOSING = 4;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 连接状态 已经关闭
|
|
|
|
|
|
|
+ * Status closed.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
const STATUS_CLOSED = 8;
|
|
const STATUS_CLOSED = 8;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 当对端发来数据时,如果设置了$onMessage回调,则执行
|
|
|
|
|
|
|
+ * Emitted when data is received.
|
|
|
* @var callback
|
|
* @var callback
|
|
|
*/
|
|
*/
|
|
|
public $onMessage = null;
|
|
public $onMessage = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 当连接关闭时,如果设置了$onClose回调,则执行
|
|
|
|
|
|
|
+ * Emitted when the other end of the socket sends a FIN packet.
|
|
|
* @var callback
|
|
* @var callback
|
|
|
*/
|
|
*/
|
|
|
public $onClose = null;
|
|
public $onClose = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 当出现错误是,如果设置了$onError回调,则执行
|
|
|
|
|
|
|
+ * Emitted when an error occurs with connection.
|
|
|
* @var callback
|
|
* @var callback
|
|
|
*/
|
|
*/
|
|
|
public $onError = null;
|
|
public $onError = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 当发送缓冲区满时,如果设置了$onBufferFull回调,则执行
|
|
|
|
|
|
|
+ * Emitted when the send buffer becomes full.
|
|
|
* @var callback
|
|
* @var callback
|
|
|
*/
|
|
*/
|
|
|
public $onBufferFull = null;
|
|
public $onBufferFull = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 当发送缓冲区被清空时,如果设置了$onBufferDrain回调,则执行
|
|
|
|
|
|
|
+ * Emitted when the send buffer becomes empty.
|
|
|
* @var callback
|
|
* @var callback
|
|
|
*/
|
|
*/
|
|
|
public $onBufferDrain = null;
|
|
public $onBufferDrain = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 使用的应用层协议,是协议类的名称
|
|
|
|
|
- * 值类似于 Workerman\\Protocols\\Http
|
|
|
|
|
|
|
+ * Application layer protocol.
|
|
|
|
|
+ * The format is like this Workerman\\Protocols\\Http.
|
|
|
* @var string
|
|
* @var string
|
|
|
*/
|
|
*/
|
|
|
public $protocol = '';
|
|
public $protocol = '';
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 属于哪个worker
|
|
|
|
|
|
|
+ * Which worker belong to.
|
|
|
* @var Worker
|
|
* @var Worker
|
|
|
*/
|
|
*/
|
|
|
public $worker = null;
|
|
public $worker = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 连接的id,一个自增整数
|
|
|
|
|
|
|
+ * Connection->id.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
public $id = 0;
|
|
public $id = 0;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 连接的id,为$id的副本,用来清理connections中的连接
|
|
|
|
|
|
|
+ * A copy of $worker->id which used to clean up the connection in worker->connections
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
protected $_id = 0;
|
|
protected $_id = 0;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 设置当前连接的最大发送缓冲区大小,默认大小为TcpConnection::$defaultMaxSendBufferSize
|
|
|
|
|
- * 当发送缓冲区满时,会尝试触发onBufferFull回调(如果有设置的话)
|
|
|
|
|
- * 如果没设置onBufferFull回调,由于发送缓冲区满,则后续发送的数据将被丢弃,
|
|
|
|
|
- * 并触发onError回调,直到发送缓冲区有空位
|
|
|
|
|
- * 注意 此值可以动态设置
|
|
|
|
|
|
|
+ * Sets the maximum send buffer size for the current connection.
|
|
|
|
|
+ * OnBufferFull callback will be emited When the send buffer is full.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
public $maxSendBufferSize = 1048576;
|
|
public $maxSendBufferSize = 1048576;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 默认发送缓冲区大小,设置此属性会影响所有连接的默认发送缓冲区大小
|
|
|
|
|
- * 如果想设置某个连接发送缓冲区的大小,可以单独设置对应连接的$maxSendBufferSize属性
|
|
|
|
|
|
|
+ * Default send buffer size.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
public static $defaultMaxSendBufferSize = 1048576;
|
|
public static $defaultMaxSendBufferSize = 1048576;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 能接受的最大数据包,为了防止恶意攻击,当数据包的大小大于此值时执行断开
|
|
|
|
|
- * 注意 此值可以动态设置
|
|
|
|
|
- * 例如 Workerman\Connection\TcpConnection::$maxPackageSize=1024000;
|
|
|
|
|
|
|
+ * Maximum acceptable packet size.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
public static $maxPackageSize = 10485760;
|
|
public static $maxPackageSize = 10485760;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * id 记录器
|
|
|
|
|
|
|
+ * Id recorder.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
protected static $_idRecorder = 1;
|
|
protected static $_idRecorder = 1;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 实际的socket资源
|
|
|
|
|
|
|
+ * Socket
|
|
|
* @var resource
|
|
* @var resource
|
|
|
*/
|
|
*/
|
|
|
protected $_socket = null;
|
|
protected $_socket = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 发送缓冲区
|
|
|
|
|
|
|
+ * Send buffer.
|
|
|
* @var string
|
|
* @var string
|
|
|
*/
|
|
*/
|
|
|
protected $_sendBuffer = '';
|
|
protected $_sendBuffer = '';
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 接收缓冲区
|
|
|
|
|
|
|
+ * Receive buffer.
|
|
|
* @var string
|
|
* @var string
|
|
|
*/
|
|
*/
|
|
|
protected $_recvBuffer = '';
|
|
protected $_recvBuffer = '';
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 当前正在处理的数据包的包长(此值是协议的intput方法的返回值)
|
|
|
|
|
|
|
+ * Current package length.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
protected $_currentPackageLength = 0;
|
|
protected $_currentPackageLength = 0;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 当前的连接状态
|
|
|
|
|
|
|
+ * Connection status.
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
protected $_status = self::STATUS_ESTABLISH;
|
|
protected $_status = self::STATUS_ESTABLISH;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 对端的地址 ip+port
|
|
|
|
|
- * 值类似于 192.168.1.100:3698
|
|
|
|
|
|
|
+ * Remote address.
|
|
|
* @var string
|
|
* @var string
|
|
|
*/
|
|
*/
|
|
|
protected $_remoteAddress = '';
|
|
protected $_remoteAddress = '';
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 是否是停止接收数据
|
|
|
|
|
|
|
+ * Is paused.
|
|
|
* @var bool
|
|
* @var bool
|
|
|
*/
|
|
*/
|
|
|
protected $_isPaused = false;
|
|
protected $_isPaused = false;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 构造函数
|
|
|
|
|
|
|
+ * Construct.
|
|
|
* @param resource $socket
|
|
* @param resource $socket
|
|
|
* @param EventInterface $event
|
|
* @param EventInterface $event
|
|
|
*/
|
|
*/
|
|
|
public function __construct($socket, $remote_address = '')
|
|
public function __construct($socket, $remote_address = '')
|
|
|
{
|
|
{
|
|
|
- // 统计数据
|
|
|
|
|
self::$statistics['connection_count']++;
|
|
self::$statistics['connection_count']++;
|
|
|
$this->id = $this->_id = self::$_idRecorder++;
|
|
$this->id = $this->_id = self::$_idRecorder++;
|
|
|
$this->_socket = $socket;
|
|
$this->_socket = $socket;
|
|
@@ -200,14 +191,14 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 发送数据给对端
|
|
|
|
|
|
|
+ * Sends data on the connection.
|
|
|
* @param string $send_buffer
|
|
* @param string $send_buffer
|
|
|
* @param bool $raw
|
|
* @param bool $raw
|
|
|
* @return void|boolean
|
|
* @return void|boolean
|
|
|
*/
|
|
*/
|
|
|
public function send($send_buffer, $raw = false)
|
|
public function send($send_buffer, $raw = false)
|
|
|
{
|
|
{
|
|
|
- // 如果没有设置以原始数据发送,并且有设置协议则按照协议编码
|
|
|
|
|
|
|
+ // Try to call protocol::encode($send_buffer) before sending.
|
|
|
if(false === $raw && $this->protocol)
|
|
if(false === $raw && $this->protocol)
|
|
|
{
|
|
{
|
|
|
$parser = $this->protocol;
|
|
$parser = $this->protocol;
|
|
@@ -218,42 +209,36 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 如果当前状态是连接中,则把数据放入发送缓冲区
|
|
|
|
|
if($this->_status === self::STATUS_CONNECTING)
|
|
if($this->_status === self::STATUS_CONNECTING)
|
|
|
{
|
|
{
|
|
|
$this->_sendBuffer .= $send_buffer;
|
|
$this->_sendBuffer .= $send_buffer;
|
|
|
return null;
|
|
return null;
|
|
|
}
|
|
}
|
|
|
- // 如果当前连接是关闭,则返回false
|
|
|
|
|
elseif($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED)
|
|
elseif($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED)
|
|
|
{
|
|
{
|
|
|
return false;
|
|
return false;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 如果发送缓冲区为空,尝试直接发送
|
|
|
|
|
|
|
+ // Attempt to send data directly.
|
|
|
if($this->_sendBuffer === '')
|
|
if($this->_sendBuffer === '')
|
|
|
{
|
|
{
|
|
|
- // 直接发送
|
|
|
|
|
$len = @fwrite($this->_socket, $send_buffer);
|
|
$len = @fwrite($this->_socket, $send_buffer);
|
|
|
- // 所有数据都发送完毕
|
|
|
|
|
|
|
+ // send successful.
|
|
|
if($len === strlen($send_buffer))
|
|
if($len === strlen($send_buffer))
|
|
|
{
|
|
{
|
|
|
return true;
|
|
return true;
|
|
|
}
|
|
}
|
|
|
- // 只有部分数据发送成功
|
|
|
|
|
|
|
+ // Send only part of the data.
|
|
|
if($len > 0)
|
|
if($len > 0)
|
|
|
{
|
|
{
|
|
|
- // 未发送成功部分放入发送缓冲区
|
|
|
|
|
$this->_sendBuffer = substr($send_buffer, $len);
|
|
$this->_sendBuffer = substr($send_buffer, $len);
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
- // 如果连接断开
|
|
|
|
|
|
|
+ // Connection closed?
|
|
|
if(!is_resource($this->_socket) || feof($this->_socket))
|
|
if(!is_resource($this->_socket) || feof($this->_socket))
|
|
|
{
|
|
{
|
|
|
- // status统计发送失败次数
|
|
|
|
|
self::$statistics['send_fail']++;
|
|
self::$statistics['send_fail']++;
|
|
|
- // 如果有设置失败回调,则执行
|
|
|
|
|
if($this->onError)
|
|
if($this->onError)
|
|
|
{
|
|
{
|
|
|
try
|
|
try
|
|
@@ -266,27 +251,22 @@ class TcpConnection extends ConnectionInterface
|
|
|
exit(250);
|
|
exit(250);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- // 销毁连接
|
|
|
|
|
$this->destroy();
|
|
$this->destroy();
|
|
|
return false;
|
|
return false;
|
|
|
}
|
|
}
|
|
|
- // 连接未断开,发送失败,则把所有数据放入发送缓冲区
|
|
|
|
|
$this->_sendBuffer = $send_buffer;
|
|
$this->_sendBuffer = $send_buffer;
|
|
|
}
|
|
}
|
|
|
- // 监听对端可写事件
|
|
|
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
|
- // 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调
|
|
|
|
|
|
|
+ // Check if the send buffer is full.
|
|
|
$this->checkBufferIsFull();
|
|
$this->checkBufferIsFull();
|
|
|
return null;
|
|
return null;
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
- // 缓冲区已经标记为满,仍然然有数据发送,则丢弃数据包
|
|
|
|
|
|
|
+ // Buffer has been marked as full but still has data to send the packet is discarded.
|
|
|
if($this->maxSendBufferSize <= strlen($this->_sendBuffer))
|
|
if($this->maxSendBufferSize <= strlen($this->_sendBuffer))
|
|
|
{
|
|
{
|
|
|
- // 为status命令统计发送失败次数
|
|
|
|
|
self::$statistics['send_fail']++;
|
|
self::$statistics['send_fail']++;
|
|
|
- // 如果有设置失败回调,则执行
|
|
|
|
|
if($this->onError)
|
|
if($this->onError)
|
|
|
{
|
|
{
|
|
|
try
|
|
try
|
|
@@ -301,15 +281,14 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
return false;
|
|
return false;
|
|
|
}
|
|
}
|
|
|
- // 将数据放入放缓冲区
|
|
|
|
|
$this->_sendBuffer .= $send_buffer;
|
|
$this->_sendBuffer .= $send_buffer;
|
|
|
- // 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调
|
|
|
|
|
|
|
+ // Check if the send buffer is full.
|
|
|
$this->checkBufferIsFull();
|
|
$this->checkBufferIsFull();
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 获得对端ip
|
|
|
|
|
|
|
+ * Get remote IP.
|
|
|
* @return string
|
|
* @return string
|
|
|
*/
|
|
*/
|
|
|
public function getRemoteIp()
|
|
public function getRemoteIp()
|
|
@@ -323,7 +302,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 获得对端端口
|
|
|
|
|
|
|
+ * Get remote port.
|
|
|
* @return int
|
|
* @return int
|
|
|
*/
|
|
*/
|
|
|
public function getRemotePort()
|
|
public function getRemotePort()
|
|
@@ -336,7 +315,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 暂停接收数据,一般用于控制上传流量
|
|
|
|
|
|
|
+ * Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
|
public function pauseRecv()
|
|
public function pauseRecv()
|
|
@@ -346,7 +325,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 恢复接收数据,一般用户控制上传流量
|
|
|
|
|
|
|
+ * Resumes reading after a call to pauseRecv.
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
|
public function resumeRecv()
|
|
public function resumeRecv()
|
|
@@ -360,7 +339,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 当socket可读时的回调
|
|
|
|
|
|
|
+ * Base read handler.
|
|
|
* @param resource $socket
|
|
* @param resource $socket
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
@@ -378,23 +357,23 @@ class TcpConnection extends ConnectionInterface
|
|
|
$this->_recvBuffer .= $buffer;
|
|
$this->_recvBuffer .= $buffer;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 没有读到数据时检查连接是否断开
|
|
|
|
|
|
|
+ // Check connection closed.
|
|
|
if(!$read_data && (!is_resource($socket) || feof($socket)))
|
|
if(!$read_data && (!is_resource($socket) || feof($socket)))
|
|
|
{
|
|
{
|
|
|
$this->destroy();
|
|
$this->destroy();
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 如果设置了协议
|
|
|
|
|
|
|
+ // If the application layer protocol has been set up.
|
|
|
if($this->protocol)
|
|
if($this->protocol)
|
|
|
{
|
|
{
|
|
|
$parser = $this->protocol;
|
|
$parser = $this->protocol;
|
|
|
while($this->_recvBuffer !== '' && !$this->_isPaused)
|
|
while($this->_recvBuffer !== '' && !$this->_isPaused)
|
|
|
{
|
|
{
|
|
|
- // 当前包的长度已知
|
|
|
|
|
|
|
+ // The current packet length is known.
|
|
|
if($this->_currentPackageLength)
|
|
if($this->_currentPackageLength)
|
|
|
{
|
|
{
|
|
|
- // 数据不够一个包
|
|
|
|
|
|
|
+ // Data is not enough for a package.
|
|
|
if($this->_currentPackageLength > strlen($this->_recvBuffer))
|
|
if($this->_currentPackageLength > strlen($this->_recvBuffer))
|
|
|
{
|
|
{
|
|
|
break;
|
|
break;
|
|
@@ -402,22 +381,22 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
- // 获得当前包长
|
|
|
|
|
|
|
+ // Get current package length.
|
|
|
$this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
|
|
$this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
|
|
|
- // 数据不够,无法获得包长
|
|
|
|
|
|
|
+ // The packet length is unknown.
|
|
|
if($this->_currentPackageLength === 0)
|
|
if($this->_currentPackageLength === 0)
|
|
|
{
|
|
{
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
elseif($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize)
|
|
elseif($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize)
|
|
|
{
|
|
{
|
|
|
- // 数据不够一个包
|
|
|
|
|
|
|
+ // Data is not enough for a package.
|
|
|
if($this->_currentPackageLength > strlen($this->_recvBuffer))
|
|
if($this->_currentPackageLength > strlen($this->_recvBuffer))
|
|
|
{
|
|
{
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- // 包错误
|
|
|
|
|
|
|
+ // Wrong package.
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
echo 'error package. package_length='.var_export($this->_currentPackageLength, true);
|
|
echo 'error package. package_length='.var_export($this->_currentPackageLength, true);
|
|
@@ -426,9 +405,9 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 数据足够一个包长
|
|
|
|
|
|
|
+ // The data is enough for a packet.
|
|
|
self::$statistics['total_request']++;
|
|
self::$statistics['total_request']++;
|
|
|
- // 当前包长刚好等于buffer的长度
|
|
|
|
|
|
|
+ // The current packet length is equal to the length of the buffer.
|
|
|
if(strlen($this->_recvBuffer) === $this->_currentPackageLength)
|
|
if(strlen($this->_recvBuffer) === $this->_currentPackageLength)
|
|
|
{
|
|
{
|
|
|
$one_request_buffer = $this->_recvBuffer;
|
|
$one_request_buffer = $this->_recvBuffer;
|
|
@@ -436,12 +415,12 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
- // 从缓冲区中获取一个完整的包
|
|
|
|
|
|
|
+ // Get a full package from the buffer.
|
|
|
$one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
|
|
$one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
|
|
|
- // 将当前包从接受缓冲区中去掉
|
|
|
|
|
|
|
+ // Remove the current package from the receive buffer.
|
|
|
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
|
|
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
|
|
|
}
|
|
}
|
|
|
- // 重置当前包长为0
|
|
|
|
|
|
|
+ // Reset the current packet length to 0.
|
|
|
$this->_currentPackageLength = 0;
|
|
$this->_currentPackageLength = 0;
|
|
|
if(!$this->onMessage)
|
|
if(!$this->onMessage)
|
|
|
{
|
|
{
|
|
@@ -449,7 +428,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
try
|
|
try
|
|
|
{
|
|
{
|
|
|
- // 处理数据包
|
|
|
|
|
|
|
+ // Decode request buffer before Emiting onMessage callback.
|
|
|
call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
|
|
call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
|
|
|
}
|
|
}
|
|
|
catch(\Exception $e)
|
|
catch(\Exception $e)
|
|
@@ -466,7 +445,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 没有设置协议,则直接把接收的数据当做一个包处理
|
|
|
|
|
|
|
+ // Applications protocol is not set.
|
|
|
self::$statistics['total_request']++;
|
|
self::$statistics['total_request']++;
|
|
|
if(!$this->onMessage)
|
|
if(!$this->onMessage)
|
|
|
{
|
|
{
|
|
@@ -482,12 +461,12 @@ class TcpConnection extends ConnectionInterface
|
|
|
echo $e;
|
|
echo $e;
|
|
|
exit(250);
|
|
exit(250);
|
|
|
}
|
|
}
|
|
|
- // 清空缓冲区
|
|
|
|
|
|
|
+ // Clean receive buffer.
|
|
|
$this->_recvBuffer = '';
|
|
$this->_recvBuffer = '';
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * socket可写时的回调
|
|
|
|
|
|
|
+ * Base write handler.
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
|
public function baseWrite()
|
|
public function baseWrite()
|
|
@@ -497,7 +476,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
{
|
|
{
|
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
|
$this->_sendBuffer = '';
|
|
$this->_sendBuffer = '';
|
|
|
- // 发送缓冲区的数据被发送完毕,尝试触发onBufferDrain回调
|
|
|
|
|
|
|
+ // Try to emit onBufferDrain callback when the send buffer becomes empty.
|
|
|
if($this->onBufferDrain)
|
|
if($this->onBufferDrain)
|
|
|
{
|
|
{
|
|
|
try
|
|
try
|
|
@@ -510,7 +489,6 @@ class TcpConnection extends ConnectionInterface
|
|
|
exit(250);
|
|
exit(250);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- // 如果连接状态为关闭,则销毁连接
|
|
|
|
|
if($this->_status === self::STATUS_CLOSING)
|
|
if($this->_status === self::STATUS_CLOSING)
|
|
|
{
|
|
{
|
|
|
$this->destroy();
|
|
$this->destroy();
|
|
@@ -521,7 +499,6 @@ class TcpConnection extends ConnectionInterface
|
|
|
{
|
|
{
|
|
|
$this->_sendBuffer = substr($this->_sendBuffer, $len);
|
|
$this->_sendBuffer = substr($this->_sendBuffer, $len);
|
|
|
}
|
|
}
|
|
|
- // 可写但是写失败,说明连接断开
|
|
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
self::$statistics['send_fail']++;
|
|
self::$statistics['send_fail']++;
|
|
@@ -530,7 +507,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 管道重定向
|
|
|
|
|
|
|
+ * This method pulls all the data out of a readable stream, and writes it to the supplied destination.
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
|
public function pipe($dest)
|
|
public function pipe($dest)
|
|
@@ -555,7 +532,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 从缓冲区中消费掉$length长度的数据
|
|
|
|
|
|
|
+ * Remove $length of data from receive buffer.
|
|
|
* @param int $length
|
|
* @param int $length
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
@@ -565,7 +542,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 关闭连接
|
|
|
|
|
|
|
+ * Close connection.
|
|
|
* @param mixed $data
|
|
* @param mixed $data
|
|
|
* @void
|
|
* @void
|
|
|
*/
|
|
*/
|
|
@@ -590,7 +567,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 获得socket连接
|
|
|
|
|
|
|
+ * Get the real socket.
|
|
|
* @return resource
|
|
* @return resource
|
|
|
*/
|
|
*/
|
|
|
public function getSocket()
|
|
public function getSocket()
|
|
@@ -599,7 +576,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 检查发送缓冲区是否已满,如果满了尝试触发onBufferFull回调
|
|
|
|
|
|
|
+ * Check whether the send buffer is full.
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
|
protected function checkBufferIsFull()
|
|
protected function checkBufferIsFull()
|
|
@@ -621,29 +598,28 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
/**
|
|
/**
|
|
|
- * 销毁连接
|
|
|
|
|
|
|
+ * Destroy connection.
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
|
public function destroy()
|
|
public function destroy()
|
|
|
{
|
|
{
|
|
|
- // 避免重复调用
|
|
|
|
|
|
|
+ // Avoid repeated calls.
|
|
|
if($this->_status === self::STATUS_CLOSED)
|
|
if($this->_status === self::STATUS_CLOSED)
|
|
|
{
|
|
{
|
|
|
return false;
|
|
return false;
|
|
|
}
|
|
}
|
|
|
- // 删除事件监听
|
|
|
|
|
|
|
+ // Remove event listener.
|
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
|
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
|
- // 关闭socket
|
|
|
|
|
|
|
+ // Close socket.
|
|
|
@fclose($this->_socket);
|
|
@fclose($this->_socket);
|
|
|
- // 从连接中删除
|
|
|
|
|
|
|
+ // Remove from worker->connections.
|
|
|
if($this->worker)
|
|
if($this->worker)
|
|
|
{
|
|
{
|
|
|
unset($this->worker->connections[$this->_id]);
|
|
unset($this->worker->connections[$this->_id]);
|
|
|
}
|
|
}
|
|
|
- // 标记该连接已经关闭
|
|
|
|
|
$this->_status = self::STATUS_CLOSED;
|
|
$this->_status = self::STATUS_CLOSED;
|
|
|
- // 触发onClose回调
|
|
|
|
|
|
|
+ // Try to emit onClose callback.
|
|
|
if($this->onClose)
|
|
if($this->onClose)
|
|
|
{
|
|
{
|
|
|
try
|
|
try
|
|
@@ -656,17 +632,16 @@ class TcpConnection extends ConnectionInterface
|
|
|
exit(250);
|
|
exit(250);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- // 清理回调,避免内存泄露
|
|
|
|
|
|
|
+ // Cleaning up the callback to avoid memory leaks.
|
|
|
$this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
|
|
$this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * 析构函数
|
|
|
|
|
|
|
+ * Destruct.
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
|
public function __destruct()
|
|
public function __destruct()
|
|
|
{
|
|
{
|
|
|
- // 统计数据
|
|
|
|
|
self::$statistics['connection_count']--;
|
|
self::$statistics['connection_count']--;
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|