|
@@ -8,33 +8,35 @@ use Workerman\Worker;
|
|
|
use \Exception;
|
|
use \Exception;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * async connection
|
|
|
|
|
|
|
+ * 异步tcp连接类
|
|
|
* @author walkor<walkor@workerman.net>
|
|
* @author walkor<walkor@workerman.net>
|
|
|
*/
|
|
*/
|
|
|
class AsyncTcpConnection extends TcpConnection
|
|
class AsyncTcpConnection extends TcpConnection
|
|
|
{
|
|
{
|
|
|
/**
|
|
/**
|
|
|
- * status
|
|
|
|
|
|
|
+ * 连接状态 连接中
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
protected $_status = self::STATUS_CONNECTING;
|
|
protected $_status = self::STATUS_CONNECTING;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * when connect success , onConnect will be run
|
|
|
|
|
|
|
+ * 当连接成功时,如果设置了连接成功回调,则执行
|
|
|
* @var callback
|
|
* @var callback
|
|
|
*/
|
|
*/
|
|
|
public $onConnect = null;
|
|
public $onConnect = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * create a connection
|
|
|
|
|
|
|
+ * 构造函数,创建连接
|
|
|
* @param resource $socket
|
|
* @param resource $socket
|
|
|
* @param EventInterface $event
|
|
* @param EventInterface $event
|
|
|
*/
|
|
*/
|
|
|
public function __construct($remote_address)
|
|
public function __construct($remote_address)
|
|
|
{
|
|
{
|
|
|
|
|
+ // 获得协议及远程地址
|
|
|
list($scheme, $address) = explode(':', $remote_address, 2);
|
|
list($scheme, $address) = explode(':', $remote_address, 2);
|
|
|
if($scheme != 'tcp')
|
|
if($scheme != 'tcp')
|
|
|
{
|
|
{
|
|
|
|
|
+ // 判断协议类是否存在
|
|
|
$scheme = ucfirst($scheme);
|
|
$scheme = ucfirst($scheme);
|
|
|
$this->protocol = '\\Protocols\\'.$scheme;
|
|
$this->protocol = '\\Protocols\\'.$scheme;
|
|
|
if(!class_exists($this->protocol))
|
|
if(!class_exists($this->protocol))
|
|
@@ -46,16 +48,24 @@ class AsyncTcpConnection extends TcpConnection
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
+ // 创建异步连接
|
|
|
$this->_socket = stream_socket_client("tcp:$address", $errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT);
|
|
$this->_socket = stream_socket_client("tcp:$address", $errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT);
|
|
|
|
|
+ // 如果失败尝试触发失败回调(如果有回调的话)
|
|
|
if(!$this->_socket)
|
|
if(!$this->_socket)
|
|
|
{
|
|
{
|
|
|
$this->emitError(WORKERMAN_CONNECT_FAIL, $errstr);
|
|
$this->emitError(WORKERMAN_CONNECT_FAIL, $errstr);
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+ // 监听连接可写事件(可写意味着连接已经建立或者已经出错)
|
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'checkConnection'));
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'checkConnection'));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 尝试触发失败回调
|
|
|
|
|
+ * @param int $code
|
|
|
|
|
+ * @param string $msg
|
|
|
|
|
+ * @return void
|
|
|
|
|
+ */
|
|
|
protected function emitError($code, $msg)
|
|
protected function emitError($code, $msg)
|
|
|
{
|
|
{
|
|
|
if($this->onError)
|
|
if($this->onError)
|
|
@@ -70,20 +80,32 @@ class AsyncTcpConnection extends TcpConnection
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 检查连接状态,连接成功还是失败
|
|
|
|
|
+ * @param resource $socket
|
|
|
|
|
+ * @return void
|
|
|
|
|
+ */
|
|
|
public function checkConnection($socket)
|
|
public function checkConnection($socket)
|
|
|
{
|
|
{
|
|
|
|
|
+ // 删除连接可写监听
|
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
|
- // php bug ?
|
|
|
|
|
|
|
+ // 需要判断两次连接是否已经断开
|
|
|
if(!feof($this->_socket) && !feof($this->_socket))
|
|
if(!feof($this->_socket) && !feof($this->_socket))
|
|
|
{
|
|
{
|
|
|
|
|
+ // 设置非阻塞
|
|
|
stream_set_blocking($this->_socket, 0);
|
|
stream_set_blocking($this->_socket, 0);
|
|
|
|
|
+ // 监听可读事件
|
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
|
|
|
|
|
+ // 如果发送缓冲区有数据则执行发送
|
|
|
if($this->_sendBuffer)
|
|
if($this->_sendBuffer)
|
|
|
{
|
|
{
|
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
|
}
|
|
}
|
|
|
|
|
+ // 标记状态为连接已经建立
|
|
|
$this->_status = self::STATUS_ESTABLISH;
|
|
$this->_status = self::STATUS_ESTABLISH;
|
|
|
|
|
+ // 为status 命令统计数据
|
|
|
ConnectionInterface::$statistics['connection_count']++;
|
|
ConnectionInterface::$statistics['connection_count']++;
|
|
|
|
|
+ // 如果有设置onConnect回调,则执行
|
|
|
if($this->onConnect)
|
|
if($this->onConnect)
|
|
|
{
|
|
{
|
|
|
try
|
|
try
|
|
@@ -99,63 +121,76 @@ class AsyncTcpConnection extends TcpConnection
|
|
|
}
|
|
}
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
- $this->emitError(WORKERMAN_CONNECT_FAIL, 'connect fail, maybe timedout');
|
|
|
|
|
|
|
+ // 连接未建立成功
|
|
|
|
|
+ $this->emitError(WORKERMAN_CONNECT_FAIL, 'connect fail');
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * send buffer to client
|
|
|
|
|
|
|
+ * 发送数据给对方
|
|
|
* @param string $send_buffer
|
|
* @param string $send_buffer
|
|
|
* @return void|boolean
|
|
* @return void|boolean
|
|
|
*/
|
|
*/
|
|
|
public function send($send_buffer)
|
|
public function send($send_buffer)
|
|
|
{
|
|
{
|
|
|
|
|
+ // 如果有设置协议,则用协议编码
|
|
|
if($this->protocol)
|
|
if($this->protocol)
|
|
|
{
|
|
{
|
|
|
$parser = $this->protocol;
|
|
$parser = $this->protocol;
|
|
|
$send_buffer = $parser::encode($send_buffer, $this);
|
|
$send_buffer = $parser::encode($send_buffer, $this);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ // 如果当前状态是连接中,则把数据放入发送缓冲区
|
|
|
if($this->_status === self::STATUS_CONNECTING)
|
|
if($this->_status === self::STATUS_CONNECTING)
|
|
|
{
|
|
{
|
|
|
$this->_sendBuffer .= $send_buffer;
|
|
$this->_sendBuffer .= $send_buffer;
|
|
|
return null;
|
|
return null;
|
|
|
}
|
|
}
|
|
|
|
|
+ // 如果当前连接是关闭中,则返回false
|
|
|
elseif($this->_status == self::STATUS_CLOSED)
|
|
elseif($this->_status == self::STATUS_CLOSED)
|
|
|
{
|
|
{
|
|
|
return false;
|
|
return false;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+ // 如果发送缓冲区无数据,则尝试直接发送
|
|
|
if($this->_sendBuffer === '')
|
|
if($this->_sendBuffer === '')
|
|
|
{
|
|
{
|
|
|
|
|
+ // 直接发送,得到已经发送(写入socket写缓冲区)的字节数
|
|
|
$len = @fwrite($this->_socket, $send_buffer);
|
|
$len = @fwrite($this->_socket, $send_buffer);
|
|
|
|
|
+ // 如果已经发送出去的长度刚好为要发送数据的长度,则说明数据发送成功
|
|
|
if($len === strlen($send_buffer))
|
|
if($len === strlen($send_buffer))
|
|
|
{
|
|
{
|
|
|
return true;
|
|
return true;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+ // 数据只发送了一部分,则将剩余的数据放入发送缓冲区
|
|
|
if($len > 0)
|
|
if($len > 0)
|
|
|
{
|
|
{
|
|
|
$this->_sendBuffer = substr($send_buffer, $len);
|
|
$this->_sendBuffer = substr($send_buffer, $len);
|
|
|
}
|
|
}
|
|
|
|
|
+ // 发送出现异常
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
|
|
+ // 如果连接关闭
|
|
|
if(feof($this->_socket))
|
|
if(feof($this->_socket))
|
|
|
{
|
|
{
|
|
|
|
|
+ // status命令 统计发送失败次数
|
|
|
self::$statistics['send_fail']++;
|
|
self::$statistics['send_fail']++;
|
|
|
|
|
+ // 如果有设置失败回到,则执行
|
|
|
if($this->onError)
|
|
if($this->onError)
|
|
|
{
|
|
{
|
|
|
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client close');
|
|
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client close');
|
|
|
}
|
|
}
|
|
|
|
|
+ // 销毁本实例
|
|
|
$this->destroy();
|
|
$this->destroy();
|
|
|
return false;
|
|
return false;
|
|
|
}
|
|
}
|
|
|
|
|
+ // 连接未关闭,则将整个数据放入发送缓冲区
|
|
|
$this->_sendBuffer = $send_buffer;
|
|
$this->_sendBuffer = $send_buffer;
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+ // 监听可写事件,将发送缓冲区的数据发送给对方(写到socket发送缓冲区)
|
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
|
return null;
|
|
return null;
|
|
|
}
|
|
}
|
|
|
|
|
+ // 发送缓冲区有数据,则直接将数据放入发送缓冲区
|
|
|
else
|
|
else
|
|
|
{
|
|
{
|
|
|
$this->_sendBuffer .= $send_buffer;
|
|
$this->_sendBuffer .= $send_buffer;
|