Bläddra i källkod

gateway worker 间长连接

walkor 11 år sedan
förälder
incheckning
8478a40047

+ 112 - 2
applications/ChatDemo/Bootstrap/BusinessWorker.php

@@ -9,14 +9,65 @@
 define('ROOT_DIR', realpath(__DIR__.'/../'));
 require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
 require_once ROOT_DIR . '/Event.php';
+require_once ROOT_DIR . '/Lib/APLog.php';
 
 class BusinessWorker extends Man\Core\SocketWorker
 {
+    /**
+     * BusinessWorker 实例
+     * @var BusinessWorker
+     */
+    protected static $instance = null;
+    
+    /**
+     * 与gateway的连接
+     * ['ip:port' => conn, 'ip:port' => conn, ...]
+     * @var array
+     */
+    protected static $gatewayConnections = array();
+    
+    /**
+     * 进程启动时初始化
+     * @see Man\Core.SocketWorker::onStart()
+     */
+    protected function onStart()
+    {
+        // 定时检查与gateway进程的连接
+        \Man\Core\Lib\Task::init($this->event);
+        \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
+        self::$instance = $this;
+    }
+    
+    /**
+     * 获取BusinessWorker实例
+     * @return BusinessWorker
+     */
+    public static function instance()
+    {
+        return $this;
+    }
+    
+    /**
+     * 获取与gateway的连接
+     */
+    public static function getGatewayConnections()
+    {
+        return self::$gatewayConnections;
+    }
+    
+    /**
+     * 检查请求是否完整
+     * @see Man\Core.SocketWorker::dealInput()
+     */
     public function dealInput($recv_str)
     {
         return GatewayProtocol::input($recv_str); 
     }
 
+    /**
+     * 处理请求
+     * @see Man\Core.SocketWorker::dealProcess()
+     */
     public function dealProcess($recv_str)
     {
         $pack = new GatewayProtocol($recv_str);
@@ -26,7 +77,6 @@ class BusinessWorker extends Man\Core\SocketWorker
         Context::$local_port = $pack->header['local_port'];
         Context::$socket_id = $pack->header['socket_id'];
         Context::$uid = $pack->header['uid'];
-        Context::$protocol = $this->protocol;
         switch($pack->header['cmd'])
         {
             case GatewayProtocol::CMD_ON_CONNECTION:
@@ -42,8 +92,69 @@ class BusinessWorker extends Man\Core\SocketWorker
         Context::clear();
         return $ret;
     }
+    
+    /**
+     * 定时检查gateway通信端口
+     */
+    public function checkGatewayConnections()
+    {
+        $key = 'GLOBAL_GATEWAY_ADDRESS';
+        $addresses_list = Store::get($key);
+        if(empty($addresses_list))
+        {
+            return;
+        }
+       
+        foreach($addresses_list as $addr)
+        {
+            if(!isset(self::$gatewayConnections[$addr]))
+            {
+                $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
+                if(!$conn)
+                {
+                    $this->notice($errstr);
+                    continue;
+                }
+                self::$gatewayConnections[$addr] = $conn;
+                stream_set_blocking(self::$gatewayConnections[$addr], 0);
+                
+                $fd = (int) self::$gatewayConnections[$addr];
+                $this->connections[$fd] = self::$gatewayConnections[$addr];
+                $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
+                $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
+            }
+        }
+    }
+    
+    /**
+     * 关闭连接
+     * @see Man\Core.SocketWorker::closeClient()
+     */
+    protected function closeClient($fd)
+    {
+        foreach(self::$gatewayConnections as $con)
+        {
+            $the_fd = (int) $con;
+            if($the_fd == $fd)
+            {
+                unset(self::$gatewayConnections[$fd]);
+            }
+        }
+        parent::closeClient($fd);
+    }
+    
+    /**
+     * 向客户端发送数据
+     * @see Man\Core.SocketWorker::sendToClient()
+     */
+    public function sendToClient($buffer, $fd)
+    {
+        $this->currentDealFd = (int)$fd;
+        parent::sendToClient($buffer);
+    }
 }
 
+
 /**
  * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
  * @author walkor
@@ -58,7 +169,6 @@ class Context
     public static $client_ip;
     public static $client_port;
     public static $uid;
-    public static $protocol;
     public static function clear()
     {
         self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;

+ 25 - 20
applications/ChatDemo/Bootstrap/Gateway.php

@@ -59,6 +59,13 @@ class Gateway extends Man\Core\SocketWorker
     protected $workerAddresses = array();
     
     /**
+     * 与worker的连接
+     * [fd=>fd, $fd=>fd, ..]
+     * @var array
+     */
+    protected $workerConnections = array();
+    
+    /**
      * gateway 发送心跳时间间隔 单位:秒 ,0表示不发送心跳,在配置中设置
      * @var integer
      */
@@ -110,10 +117,9 @@ class Gateway extends Man\Core\SocketWorker
         }
         
         // 注册套接字
-        $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
-        $this->registerAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
+        $this->registerAddress($this->lanIp.':'.$this->lanPort);
         
-        // 添加读udp事件
+        // 添加读udp/tcp事件
         $this->event->add($this->innerMainSocketUdp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
         $this->event->add($this->innerMainSocketTcp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTcp'));
         
@@ -155,10 +161,10 @@ class Gateway extends Man\Core\SocketWorker
      * 存储全局的通信地址 
      * @param string $address
      */
-    protected function registerAddress($address, $protocol)
+    protected function registerAddress($address)
     {
         \Man\Core\Lib\Mutex::get();
-        $key = 'GLOBAL_GATEWAY_ADDRESS-' . $protocol;
+        $key = 'GLOBAL_GATEWAY_ADDRESS';
         $addresses_list = Store::get($key);
         if(empty($addresses_list))
         {
@@ -173,10 +179,10 @@ class Gateway extends Man\Core\SocketWorker
      * 删除全局的通信地址
      * @param string $address
      */
-    protected function unregisterAddress($address, $protocol)
+    protected function unregisterAddress($address)
     {
         \Man\Core\Lib\Mutex::get();
-        $key = 'GLOBAL_GATEWAY_ADDRESS-' . $protocol;
+        $key = 'GLOBAL_GATEWAY_ADDRESS';
         $addresses_list = Store::get($key);
         if(empty($addresses_list))
         {
@@ -228,6 +234,7 @@ class Gateway extends Man\Core\SocketWorker
         // 非阻塞
         stream_set_blocking($this->connections[$fd], 0);
         $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'recvTcp'), $fd);
+        $this->workerConnections[$fd] = $fd;
         return $new_connection;
     }
     
@@ -262,7 +269,7 @@ class Gateway extends Man\Core\SocketWorker
             }
     
             // 关闭链接
-            $this->closeClient($fd);
+            $this->closeInnerClient($fd);
             if($this->workerStatus == self::STATUS_SHUTDOWN)
             {
                 $this->stop();
@@ -286,12 +293,6 @@ class Gateway extends Man\Core\SocketWorker
                 $this->notice('CODE:' . $e->getCode() . ' MESSAGE:' . $e->getMessage()."\n".$e->getTraceAsString()."\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
                 $this->statusInfo['throw_exception'] ++;
             }
-    
-            // 关闭链接
-            if(empty($this->sendBuffers[$fd]))
-            {
-                $this->closeClient($fd);
-            }
         }
         // 出错
         else if(false === $remain_len)
@@ -299,7 +300,7 @@ class Gateway extends Man\Core\SocketWorker
             // 出错
             $this->statusInfo['packet_err']++;
             $this->notice("INNER_PACKET_ERROR\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
-            $this->closeClient($fd);
+            $this->closeInnerClient($fd);
         }
         else
         {
@@ -420,6 +421,12 @@ class Gateway extends Man\Core\SocketWorker
         parent::closeClient($fd);
     }
     
+    protected function closeInnerClient($fd)
+    {
+        unset($this->workerConnections[$fd]);
+        parent::closeClient($fd);
+    }
+    
     public function dealProcess($recv_str)
     {
         // 判断用户是否认证过
@@ -452,9 +459,8 @@ class Gateway extends Man\Core\SocketWorker
     
     protected function sendBufferToWorker($bin_data)
     {
-        $client = stream_socket_client($this->workerAddresses[array_rand($this->workerAddresses)]);
-        $len = fwrite($client, $bin_data);
-        return $len == strlen($bin_data);
+        $this->currentDealFd = array_rand($this->workerConnections);
+        return $this->sendToClient($bin_data);
     }
     
     protected function notice($str, $display=true)
@@ -469,8 +475,7 @@ class Gateway extends Man\Core\SocketWorker
     
     public function onStop()
     {
-        $this->unregisterAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
-        $this->unregisterAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
+        $this->unregisterAddress($this->lanIp.':'.$this->lanPort);
         foreach($this->connUidMap as $uid)
         {
             Store::delete($uid);

+ 12 - 22
applications/ChatDemo/Lib/Gateway.php

@@ -28,14 +28,9 @@ class GateWay
        $pack->header['uid'] = Context::$uid;
        $pack->body = (string)$message;
        $buffer = $pack->getBuffer();
-       if(empty(Context::$protocol))
+       foreach(BusinessWorker::getGatewayConnections() as $con)
        {
-           Context::$protocol = 'tcp';
-       }
-       $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS-' . Context::$protocol);
-       foreach($all_addresses as $address)
-       {
-           self::sendToGateway($address, $buffer);
+           BusinessWorker::instance()->sendToClient($buffer, $con);
        }
    }
    
@@ -127,12 +122,7 @@ class GateWay
        $pack->header['uid'] = empty($uid) ? 0 : $uid;
        $pack->body = (string)$message;
        
-       if(empty(Context::$protocol))
-       {
-           Context::$protocol = 'tcp';
-       }
-        
-       return self::sendToGateway(Context::$protocol."://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
+       return self::sendToGateway("{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
    }
    
    
@@ -160,12 +150,7 @@ class GateWay
        $pack->header['uid'] = $uid ? $uid : 0;
        $pack->body = (string)$message;
        
-       if(empty(Context::$protocol))
-       {
-           Context::$protocol = 'tcp';
-       }
-       
-       return self::sendToGateway(Context::$protocol."://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
+       return self::sendToGateway("{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
    }
    
    /**
@@ -212,8 +197,13 @@ class GateWay
     */
    public static function sendToGateway($address, $buffer)
    {
-       $client = stream_socket_client($address);
-       $len = stream_socket_sendto($client, $buffer);
-       return $len == strlen($buffer);
+       $connections = BusinessWorker::instance()->getGatewayConnections();
+       if(!isset($connections[$address]))
+       {
+           $e = new \Exception("sendToGateway($address, $buffer) fail; getGatewayConnections:".json_encode($connections));
+           APLog::add($e->__toString());
+           return false;
+       }
+       return BusinessWorker::instance()->sendToClient($buffer, $connections[$address]);
    }
 }

+ 32 - 0
applications/ChatDemo/Lib/Log.php

@@ -0,0 +1,32 @@
+<?php
+/**
+ * 
+ * 日志类
+ * 
+* @author walkor <workerman.net>
+ */
+class APLog
+{
+    /**
+     * 添加日志
+     * @param string $msg
+     * @return void
+     */
+    public static function add($msg)
+    {
+        $log_dir = ROOT_DIR. '/Logs/'.date('Y-m-d');
+        umask(0);
+        // 没有log目录创建log目录
+        if(!is_dir($log_dir))
+        {
+            mkdir($log_dir,  0777, true);
+        }
+        if(!is_readable($log_dir))
+        {
+            return false;
+        }
+        
+        $log_file = $log_dir . "/applications.log";
+        file_put_contents($log_file, date('Y-m-d H:i:s') . " " . $msg . "\n", FILE_APPEND);
+    }
+}