Przeglądaj źródła

add TcpConnection::$id and add Gateway::$router

walkor 10 lat temu
rodzic
commit
b76df130df

+ 25 - 4
GatewayWorker/Gateway.php

@@ -55,6 +55,12 @@ class Gateway extends Worker
     public $pingData = '';
     
     /**
+     * 路由函数
+     * @var callback
+     */
+    public $router = null;
+    
+    /**
      * 保存客户端的所有connection对象
      * @var array
      */
@@ -123,6 +129,8 @@ class Gateway extends Worker
     {
         parent::__construct($socket_name, $context_option);
         
+        $this->router = array("\\GatewayWorker\\Gateway", 'routerRand');
+        
         $backrace = debug_backtrace();
         $this->_appInitPath = dirname($backrace[0]['file']);
     }
@@ -219,11 +227,11 @@ class Gateway extends Worker
         $gateway_data['cmd'] = $cmd;
         $gateway_data['body'] = $body;
         $gateway_data['ext_data'] = $connection->session;
-        // 随机选择一个worker处理
-        $key = array_rand($this->_workerConnections);
-        if($key)
+        if($this->_workerConnections)
         {
-            if(false === $this->_workerConnections[$key]->send($gateway_data))
+            // 调用路由函数,选择一个worker把请求转发给它
+            $worker_connection = call_user_func($this->router, $this->_workerConnections, $connection, $cmd, $body);
+            if(false === $worker_connection->send($gateway_data))
             {
                 $msg = "SendBufferToWorker fail. May be the send buffer are overflow";
                 $this->log($msg);
@@ -247,6 +255,19 @@ class Gateway extends Worker
     }
     
     /**
+     * 随机路由,返回worker connection对象
+     * @param array $worker_connections
+     * @param TcpConnection $client_connection
+     * @param int $cmd
+     * @param mixed $buffer
+     * @return TcpConnection
+     */
+    public static function routerRand($worker_connections, $client_connection, $cmd, $buffer)
+    {
+        return $worker_connections[array_rand($worker_connections)];
+    }
+    
+    /**
      * 保存客户端连接的gateway通讯地址
      * @param int $global_client_id
      * @param string $address

+ 1 - 0
Workerman/Connection/AsyncTcpConnection.php

@@ -49,6 +49,7 @@ class AsyncTcpConnection extends TcpConnection
             }
         }
         $this->_remoteAddress = substr($address, 2);
+        $this->id = self::$_idRecorder++;
     }
     
     public function connect()

+ 13 - 0
Workerman/Connection/TcpConnection.php

@@ -88,6 +88,12 @@ class TcpConnection extends ConnectionInterface
     public $worker = null;
     
     /**
+     * 连接的id,一个自增整数
+     * @var int
+     */
+    public $id = 0;
+    
+    /**
      * 发送缓冲区大小,当发送缓冲区满时,会尝试触发onBufferFull回调(如果有设置的话)
      * 如果没设置onBufferFull回调,由于发送缓冲区满,则后续发送的数据将被丢弃,
      * 直到发送缓冲区有空的位置
@@ -106,6 +112,12 @@ class TcpConnection extends ConnectionInterface
     public static $maxPackageSize = 10485760;
     
     /**
+     * id 记录器
+     * @var int
+     */
+    protected static $_idRecorder = 1;
+    
+    /**
      * 实际的socket资源
      * @var resource
      */
@@ -167,6 +179,7 @@ class TcpConnection extends ConnectionInterface
      */
     public function __construct($socket)
     {
+        $this->id = self::$_idRecorder++;
         $this->_socket = $socket;
         stream_set_blocking($this->_socket, 0);
         Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));