Pārlūkot izejas kodu

gateway worker 间通信支持tcp

walkor 11 gadi atpakaļ
vecāks
revīzija
81fddbed81

+ 123 - 13
applications/ChatDemo/Bootstrap/Gateway.php

@@ -5,7 +5,7 @@
  * 1、监听客户端连接
  * 2、监听后端回应并转发回应给前端
  * 
- * @author walkor <worker-man@qq.com>
+ * @author walkor <workerman.net>
  * 
  */
 define('ROOT_DIR', realpath(__DIR__.'/../'));
@@ -18,7 +18,7 @@ class Gateway extends Man\Core\SocketWorker
      * 内部通信socket
      * @var resouce
      */
-    protected $innerMainSocket = null;
+    protected $innerMainSocket_udp = null;
     
     /**
      * 内网ip
@@ -72,24 +72,28 @@ class Gateway extends Man\Core\SocketWorker
             $this->notice($this->workerName.'.lan_ip not set');
             $this->lanIp = '127.0.0.1';
         }
-        $error_no = 0;
-        $error_msg = '';
-        $this->innerMainSocket = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no, $error_msg, STREAM_SERVER_BIND);
-        if(!$this->innerMainSocket)
+        $error_no_udp = $error_no_tcp = 0;
+        $error_msg_udp = $error_msg_tcp = '';
+        $this->innerMainSocket_udp = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no_udp, $error_msg_udp, STREAM_SERVER_BIND);
+        $this->innerMainSocket_tcp = stream_socket_server("tcp://".$this->lanIp.':'.$this->lanPort, $error_no_tcp, $error_msg_tcp, STREAM_SERVER_BIND);
+        if(!$this->innerMainSocket_udp || !$this->innerMainSocket_tcp)
         {
-            $this->notice('create innerMainSocket fail and exit '.$error_no . ':'.$error_msg);
+            $this->notice('create innerMainSocket udp or tcp fail and exit '.$error_msg_udp.$error_msg_tcp);
             sleep(1);
             exit(0);
         }
         else
         {
-            stream_set_blocking($this->innerMainSocket , 0);
+            stream_set_blocking($this->innerMainSocket_udp , 0);
+            stream_set_blocking($this->innerMainSocket_tcp , 0);
         }
         
-        $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort);
+        $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
+        $this->registerAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
         
         // 添加读udp事件
-        $this->event->add($this->innerMainSocket,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
+        $this->event->add($this->innerMainSocket_udp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
+        $this->event->add($this->innerMainSocket_udp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTCP'));
         
         // 初始化到worker的通信地址
         $this->initWorkerAddresses();
@@ -104,16 +108,15 @@ class Gateway extends Man\Core\SocketWorker
      * 存储全局的通信地址 
      * @param string $address
      */
-    protected function registerAddress($address)
+    protected function registerAddress($address, $protocol)
     {
         \Man\Core\Lib\Mutex::get();
-        $key = 'GLOBAL_GATEWAY_ADDRESS';
+        $key = 'GLOBAL_GATEWAY_ADDRESS-' . $protocol;
         $addresses_list = Store::get($key);
         if(empty($addresses_list))
         {
             $addresses_list = array();
         }
-        
         $addresses_list[$address] = $address;
         Store::set($key, $addresses_list);
         \Man\Core\Lib\Mutex::release();
@@ -141,6 +144,113 @@ class Gateway extends Man\Core\SocketWorker
         $this->innerDealProcess($data);
     }
     
+    
+    public function acceptTcp($socket, $null_one = null, $null_two = null)
+    {
+        // 获得一个连接
+        $new_connection = @stream_socket_accept($socket, 0);
+        // 可能是惊群效应
+        if(false === $new_connection)
+        {
+            return false;
+        }
+    
+        // 连接的fd序号
+        $fd = (int) $new_connection;
+        $this->connections[$fd] = $new_connection;
+        $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>GatewayProtocol::HEAD_LEN);
+    
+        // 非阻塞
+        stream_set_blocking($this->connections[$fd], 0);
+        $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'recvTcp'), $fd);
+        return $new_connection;
+    }
+    
+    public function dealInnerInput($buffer)
+    {
+        return GatewayProtocol::input($buffer);
+    }
+    
+    /**
+     * 处理受到的数据
+     * @param event_buffer $event_buffer
+     * @param int $fd
+     * @return void
+     */
+    public function recvTcp($connection, $flag, $fd = null)
+    {
+        $this->currentDealFd = $fd;
+        $buffer = stream_socket_recvfrom($connection, $this->recvBuffers[$fd]['remain_len']);
+        // 出错了
+        if('' == $buffer && '' == ($buffer = fread($connection, $this->recvBuffers[$fd]['remain_len'])))
+        {
+            if(!feof($connection))
+            {
+                return;
+            }
+            
+            // 如果该链接对应的buffer有数据,说明发生错误
+            if(!empty($this->recvBuffers[$fd]['buf']))
+            {
+                $this->statusInfo['send_fail']++;
+                $this->notice("INNER_CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
+            }
+    
+            // 关闭链接
+            $this->closeClient($fd);
+            if($this->workerStatus == self::STATUS_SHUTDOWN)
+            {
+                $this->stop();
+            }
+            return;
+        }
+    
+        $this->recvBuffers[$fd]['buf'] .= $buffer;
+    
+        $remain_len = $this->dealInnerInput($this->recvBuffers[$fd]['buf']);
+        // 包接收完毕
+        if(0 === $remain_len)
+        {
+            // 执行处理
+            try{
+                // 业务处理
+                $this->innerDealProcess($this->recvBuffers[$fd]['buf']);
+            }
+            catch(\Exception $e)
+            {
+                $this->notice('CODE:' . $e->getCode() . ' MESSAGE:' . $e->getMessage()."\n".$e->getTraceAsString()."\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
+                $this->statusInfo['throw_exception'] ++;
+            }
+    
+            // 关闭链接
+            if(empty($this->sendBuffers[$fd]))
+            {
+                $this->closeClient($fd);
+            }
+        }
+        // 出错
+        else if(false === $remain_len)
+        {
+            // 出错
+            $this->statusInfo['packet_err']++;
+            $this->notice("INNER_PACKET_ERROR\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
+            $this->closeClient($fd);
+        }
+        else
+        {
+            $this->recvBuffers[$fd]['remain_len'] = $remain_len;
+        }
+    
+        // 检查是否是关闭状态或者是否到达请求上限
+        if($this->workerStatus == self::STATUS_SHUTDOWN )
+        {
+            // 停止服务
+            $this->stop();
+            // EXIT_WAIT_TIME秒后退出进程
+            pcntl_alarm(self::EXIT_WAIT_TIME);
+        }
+    }
+    
     protected function initWorkerAddresses()
     {
         $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.game_worker');

+ 3 - 1
applications/ChatDemo/Bootstrap/Worker.php

@@ -3,7 +3,7 @@
  * 
  * 处理具体逻辑
  * 
- * @author walkor <worker-man@qq.com>
+ * @author walkor <workerman.net>
  * 
  */
 define('ROOT_DIR', realpath(__DIR__.'/../'));
@@ -26,6 +26,7 @@ class Worker extends Man\Core\SocketWorker
         Context::$local_port = $pack->header['local_port'];
         Context::$socket_id = $pack->header['socket_id'];
         Context::$uid = $pack->header['uid'];
+        Context::$protocol = $this->protocol;
         switch($pack->header['cmd'])
         {
             case GatewayProtocol::CMD_ON_CONNECTION:
@@ -57,6 +58,7 @@ class Context
     public static $client_ip;
     public static $client_port;
     public static $uid;
+    public static $protocol;
     public static function clear()
     {
         self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;

+ 4 - 4
applications/ChatDemo/Lib/Gateway.php

@@ -3,7 +3,7 @@
  * 
  * 数据发送相关
  * sendToAll sendToUid
- * @author walkor <worker-man@qq.com>
+ * @author walkor <workerman.net>
  * 
  */
 
@@ -28,7 +28,7 @@ class GateWay
        $pack->header['uid'] = Context::$uid;
        $pack->body = (string)$message;
        $buffer = $pack->getBuffer();
-       $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS');
+       $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS-' . Context::$protocol);
        foreach($all_addresses as $address)
        {
            self::sendToGateway($address, $buffer);
@@ -123,7 +123,7 @@ class GateWay
        $pack->header['uid'] = empty($uid) ? 0 : $uid;
        $pack->body = (string)$message;
         
-       return self::sendToGateway("udp://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
+       return self::sendToGateway("{Context::$protocol}://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
    }
    
    
@@ -151,7 +151,7 @@ class GateWay
        $pack->header['uid'] = $uid ? $uid : 0;
        $pack->body = (string)$message;
        
-       return self::sendToGateway("udp://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
+       return self::sendToGateway("{Context::$protocol}://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
    }
    
    /**

+ 1 - 1
applications/ChatDemo/Lib/Store.php

@@ -3,7 +3,7 @@
  * 
  * 这里用php数组文件来存储数据,
  * 为了获取高性能需要用类似memcache、redis的存储
- * @author walkor <worker-man@qq.com>
+ * @author walkor <workerman.net>
  * 
  */
 

+ 1 - 1
applications/ChatDemo/Protocols/GatewayProtocol.php

@@ -17,7 +17,7 @@
  * }
  * 
  * 
- * @author walkor <worker-man@qq.com>
+ * @author walkor <workerman.net>
  */
 
 class GatewayProtocol