|
|
@@ -7,28 +7,81 @@ use \Workerman\Protocols\GatewayProtocol;
|
|
|
use \GatewayWorker\Lib\Lock;
|
|
|
use \GatewayWorker\Lib\Store;
|
|
|
|
|
|
+/**
|
|
|
+ *
|
|
|
+ * Gateway,基于Worker开发
|
|
|
+ * 用于转发客户端的数据给Worker处理,以及转发Worker的数据给客户端
|
|
|
+ *
|
|
|
+ * @author walkor<walkor@workerman.net>
|
|
|
+ *
|
|
|
+ */
|
|
|
class Gateway extends Worker
|
|
|
{
|
|
|
+ /**
|
|
|
+ * 本机ip
|
|
|
+ * @var 单机部署默认127.0.0.1,如果是分布式部署,需要设置成本机ip
|
|
|
+ */
|
|
|
public $lanIp = '127.0.0.1';
|
|
|
|
|
|
+ /**
|
|
|
+ * gateway内部通讯起始端口,每个gateway实例应该都不同,步长1000
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
public $startPort = 2000;
|
|
|
|
|
|
+ /**
|
|
|
+ * 是否可以平滑重启,gateway不能平滑重启,否则会导致连接断开
|
|
|
+ * @var bool
|
|
|
+ */
|
|
|
public $reloadable = false;
|
|
|
|
|
|
+ /**
|
|
|
+ * 心跳时间间隔
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
public $pingInterval = 0;
|
|
|
|
|
|
+ /**
|
|
|
+ * $pingNotResponseLimit*$pingInterval时间内,客户端未发送任何数据,断开客户端连接
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
public $pingNotResponseLimit = 0;
|
|
|
|
|
|
+ /**
|
|
|
+ * 服务端向客户端发送的心跳数据
|
|
|
+ * @var string
|
|
|
+ */
|
|
|
public $pingData = '';
|
|
|
|
|
|
+ /**
|
|
|
+ * 保存客户端的所有connection对象
|
|
|
+ * @var array
|
|
|
+ */
|
|
|
protected $_clientConnections = array();
|
|
|
|
|
|
+ /**
|
|
|
+ * 保存所有worker的内部连接的connection对象
|
|
|
+ * @var array
|
|
|
+ */
|
|
|
protected $_workerConnections = array();
|
|
|
|
|
|
+ /**
|
|
|
+ * gateway内部监听worker内部连接的worker
|
|
|
+ * @var Worker
|
|
|
+ */
|
|
|
protected $_innerTcpWorker = null;
|
|
|
|
|
|
+ /**
|
|
|
+ * gateway内部监听udp数据的worker
|
|
|
+ * @var Worker
|
|
|
+ */
|
|
|
protected $_innerUdpWorker = null;
|
|
|
|
|
|
+ /**
|
|
|
+ * 构造函数
|
|
|
+ * @param string $socket_name
|
|
|
+ * @param array $context_option
|
|
|
+ */
|
|
|
public function __construct($socket_name, $context_option = array())
|
|
|
{
|
|
|
$this->onWorkerStart = array($this, 'onWorkerStart');
|
|
|
@@ -42,15 +95,27 @@ class Gateway extends Worker
|
|
|
$this->_appInitPath = dirname($backrace[0]['file']);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 当客户端发来数据时,转发给worker处理
|
|
|
+ * @param TcpConnection $connection
|
|
|
+ * @param mixed $data
|
|
|
+ */
|
|
|
public function onClientMessage($connection, $data)
|
|
|
{
|
|
|
$connection->pingNotResponseCount = 0;
|
|
|
$this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 当客户端连接上来时,初始化一些客户端的数据
|
|
|
+ * 包括全局唯一的client_id、初始化session等
|
|
|
+ * @param unknown_type $connection
|
|
|
+ */
|
|
|
public function onClientConnect($connection)
|
|
|
{
|
|
|
+ // 分配一个全局唯一的client_id
|
|
|
$connection->globalClientId = $this->createGlobalClientId();
|
|
|
+ // 保存该连接的内部通讯的数据包报头,避免每次重新初始化
|
|
|
$connection->gatewayHeader = array(
|
|
|
'local_ip' => $this->lanIp,
|
|
|
'local_port' => $this->lanPort,
|
|
|
@@ -58,23 +123,35 @@ class Gateway extends Worker
|
|
|
'client_port'=>$connection->getRemotePort(),
|
|
|
'client_id'=>$connection->globalClientId,
|
|
|
);
|
|
|
+ // 连接的session
|
|
|
$connection->session = '';
|
|
|
+ // 该连接的心跳参数
|
|
|
$connection->pingNotResponseCount = 0;
|
|
|
+ // 保存客户端连接connection对象
|
|
|
$this->_clientConnections[$connection->globalClientId] = $connection;
|
|
|
+ // 保存该连接的内部gateway通讯地址
|
|
|
$address = $this->lanIp.':'.$this->lanPort;
|
|
|
$this->storeClientAddress($connection->globalClientId, $address);
|
|
|
+ // 如果设置了Event::onConnect,则通知worker进程,让worker执行onConnect
|
|
|
if(method_exists('Event','onConnect'))
|
|
|
{
|
|
|
$this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $connection);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 发送数据给worker进程
|
|
|
+ * @param int $cmd
|
|
|
+ * @param TcpConnection $connection
|
|
|
+ * @param mixed $body
|
|
|
+ */
|
|
|
protected function sendToWorker($cmd, $connection, $body = '')
|
|
|
{
|
|
|
$gateway_data = $connection->gatewayHeader;
|
|
|
$gateway_data['cmd'] = $cmd;
|
|
|
$gateway_data['body'] = $body;
|
|
|
$gateway_data['ext_data'] = $connection->session;
|
|
|
+ // 随机选择一个worker处理
|
|
|
$key = array_rand($this->_workerConnections);
|
|
|
if($key)
|
|
|
{
|
|
|
@@ -85,6 +162,7 @@ class Gateway extends Worker
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
+ // 没有可用的worker
|
|
|
else
|
|
|
{
|
|
|
$msg = "endBufferToWorker fail. the connections between Gateway and BusinessWorker are not ready";
|
|
|
@@ -95,8 +173,10 @@ class Gateway extends Worker
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * 保存客户端连接的gateway通讯地址
|
|
|
* @param int $global_client_id
|
|
|
* @param string $address
|
|
|
+ * @return bool
|
|
|
*/
|
|
|
protected function storeClientAddress($global_client_id, $address)
|
|
|
{
|
|
|
@@ -113,18 +193,36 @@ class Gateway extends Worker
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 删除客户端gateway通讯地址
|
|
|
+ * @param int $global_client_id
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
protected function delClientAddress($global_client_id)
|
|
|
{
|
|
|
Store::instance('gateway')->delete('gateway-'.$global_client_id);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 当客户端关闭时
|
|
|
+ * @param unknown_type $connection
|
|
|
+ */
|
|
|
public function onClientClose($connection)
|
|
|
{
|
|
|
- $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);
|
|
|
+ // 尝试通知worker,触发Event::onClose
|
|
|
+ if(method_exists('Event','onClose'))
|
|
|
+ {
|
|
|
+ $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);
|
|
|
+ }
|
|
|
+ // 清理连接的数据
|
|
|
$this->delClientAddress($connection->globalClientId);
|
|
|
unset($this->_clientConnections[$connection->globalClientId]);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 创建一个workerman集群全局唯一的client_id
|
|
|
+ * @return int|false
|
|
|
+ */
|
|
|
protected function createGlobalClientId()
|
|
|
{
|
|
|
$global_socket_key = 'GLOBAL_SOCKET_ID_KEY';
|
|
|
@@ -149,32 +247,40 @@ class Gateway extends Worker
|
|
|
return $global_client_id;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 当Gateway启动的时候触发的回调函数
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
public function onWorkerStart()
|
|
|
{
|
|
|
+ // 分配一个内部通讯端口
|
|
|
$this->lanPort = $this->startPort - posix_getppid() + posix_getpid();
|
|
|
-
|
|
|
if($this->lanPort<0 || $this->lanPort >=65535)
|
|
|
{
|
|
|
$this->lanPort = rand($this->startPort, 65535);
|
|
|
}
|
|
|
|
|
|
+ // 如果有设置心跳,则定时执行
|
|
|
if($this->pingInterval > 0)
|
|
|
{
|
|
|
Timer::add($this->pingInterval, array($this, 'ping'));
|
|
|
}
|
|
|
|
|
|
+ // 初始化gateway内部的监听,用于监听worker的连接已经连接上发来的数据
|
|
|
$this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
|
|
|
$this->_innerTcpWorker->listen();
|
|
|
$this->_innerUdpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
|
|
|
$this->_innerUdpWorker->transport = 'udp';
|
|
|
$this->_innerUdpWorker->listen();
|
|
|
|
|
|
+ // 设置内部监听的相关回调
|
|
|
$this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');
|
|
|
$this->_innerUdpWorker->onMessage = array($this, 'onWorkerMessage');
|
|
|
|
|
|
$this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
|
|
|
$this->_innerTcpWorker->onClose = array($this, 'onWorkerClose');
|
|
|
|
|
|
+ // 注册gateway的内部通讯地址,worker去连这个地址,以便gateway与worker之间建立起TCP长连接
|
|
|
if(!$this->registerAddress())
|
|
|
{
|
|
|
$this->log('registerAddress fail and exit');
|
|
|
@@ -182,31 +288,45 @@ class Gateway extends Worker
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 当worker通过内部通讯端口连接到gateway时
|
|
|
+ * @param TcpConnection $connection
|
|
|
+ */
|
|
|
public function onWorkerConnect($connection)
|
|
|
{
|
|
|
$connection->remoteAddress = $connection->getRemoteIp().':'.$connection->getRemotePort();
|
|
|
$this->_workerConnections[$connection->remoteAddress] = $connection;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 当worker发来数据时
|
|
|
+ * @param TcpConnection $connection
|
|
|
+ * @param mixed $data
|
|
|
+ * @throws \Exception
|
|
|
+ */
|
|
|
public function onWorkerMessage($connection, $data)
|
|
|
{
|
|
|
$cmd = $data['cmd'];
|
|
|
switch($cmd)
|
|
|
{
|
|
|
- // 向某客户端发送数据
|
|
|
+ // 向某客户端发送数据,Gateway::sendToClient($client_id, $message);
|
|
|
case GatewayProtocol::CMD_SEND_TO_ONE:
|
|
|
if(isset($this->_clientConnections[$data['client_id']]))
|
|
|
{
|
|
|
$this->_clientConnections[$data['client_id']]->send($data['body']);
|
|
|
}
|
|
|
break;
|
|
|
+ // 关闭客户端连接,Gateway::closeClient($client_id);
|
|
|
case GatewayProtocol::CMD_KICK:
|
|
|
if(isset($this->_clientConnections[$data['client_id']]))
|
|
|
{
|
|
|
$this->_clientConnections[$data['client_id']]->close();
|
|
|
}
|
|
|
break;
|
|
|
+ // 广播, Gateway::sendToAll($message, $client_id_array)
|
|
|
case GatewayProtocol::CMD_SEND_TO_ALL:
|
|
|
+ // $client_id_array不为空时,只广播给$client_id_array指定的客户端
|
|
|
if($data['ext_data'])
|
|
|
{
|
|
|
$client_id_array = unpack('N*', $data['ext_data']);
|
|
|
@@ -218,6 +338,7 @@ class Gateway extends Worker
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ // $client_id_array为空时,广播给所有在线客户端
|
|
|
else
|
|
|
{
|
|
|
foreach($this->_clientConnections as $client_connection)
|
|
|
@@ -226,16 +347,19 @@ class Gateway extends Worker
|
|
|
}
|
|
|
}
|
|
|
break;
|
|
|
+ // 更新客户端session
|
|
|
case GatewayProtocol::CMD_UPDATE_SESSION:
|
|
|
if(isset($this->_clientConnections[$data['client_id']]))
|
|
|
{
|
|
|
$this->_clientConnections[$data['client_id']]->session = $data['ext_data'];
|
|
|
}
|
|
|
break;
|
|
|
+ // 获得客户端在线状态 Gateway::getOnlineStatus()
|
|
|
case GatewayProtocol::CMD_GET_ONLINE_STATUS:
|
|
|
$online_status = json_encode(array_keys($this->_clientConnections));
|
|
|
$connection->send($online_status);
|
|
|
break;
|
|
|
+ // 判断某个client_id是否在线 Gateway::isOnline($client_id)
|
|
|
case GatewayProtocol::CMD_IS_ONLINE:
|
|
|
$connection->send((int)isset($this->_clientConnections[$data['client_id']]));
|
|
|
break;
|
|
|
@@ -245,15 +369,20 @@ class Gateway extends Worker
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 当worker连接关闭时
|
|
|
+ * @param TcpConnection $connection
|
|
|
+ */
|
|
|
public function onWorkerClose($connection)
|
|
|
{
|
|
|
- $this->log("{$connection->remoteAddress} CLOSE INNER_CONNECTION\n");
|
|
|
+ //$this->log("{$connection->remoteAddress} CLOSE INNER_CONNECTION\n");
|
|
|
unset($this->_workerConnections[$connection->remoteAddress]);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 存储全局的通信地址
|
|
|
+ * 存储当前Gateway的内部通信地址
|
|
|
* @param string $address
|
|
|
+ * @return bool
|
|
|
*/
|
|
|
protected function registerAddress()
|
|
|
{
|
|
|
@@ -269,6 +398,7 @@ class Gateway extends Worker
|
|
|
$this->log($msg);
|
|
|
return false;
|
|
|
}
|
|
|
+ // 为保证原子性,需要加锁
|
|
|
Lock::get();
|
|
|
$addresses_list = $store->get($key);
|
|
|
if(empty($addresses_list))
|
|
|
@@ -291,8 +421,9 @@ class Gateway extends Worker
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 删除全局的通信地址
|
|
|
+ * 删除当前Gateway的内部通信地址
|
|
|
* @param string $address
|
|
|
+ * @return bool
|
|
|
*/
|
|
|
protected function unregisterAddress()
|
|
|
{
|
|
|
@@ -307,6 +438,7 @@ class Gateway extends Worker
|
|
|
$this->log($msg);
|
|
|
return false;
|
|
|
}
|
|
|
+ // 为保证原子性,需要加锁
|
|
|
Lock::get();
|
|
|
$addresses_list = $store->get($key);
|
|
|
if(empty($addresses_list))
|
|
|
@@ -329,9 +461,13 @@ class Gateway extends Worker
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 心跳逻辑
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
public function ping()
|
|
|
{
|
|
|
- // 关闭未回复心跳的连接
|
|
|
+ // 遍历所有客户端连接
|
|
|
foreach($this->_clientConnections as $connection)
|
|
|
{
|
|
|
// 上次发送的心跳还没有回复次数大于限定值就断开
|
|
|
@@ -345,6 +481,10 @@ class Gateway extends Worker
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 当gateway关闭时触发,清理数据
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
public function onWorkerStop()
|
|
|
{
|
|
|
$this->unregisterAddress();
|