ソースを参照

AsyncTcpConnection add connect method

walkor 10 年 前
コミット
2fc5c82ca5
2 ファイル変更24 行追加10 行削除
  1. 11 3
      GatewayWorker/BusinessWorker.php
  2. 13 7
      Workerman/Connection/AsyncTcpConnection.php

+ 11 - 3
GatewayWorker/BusinessWorker.php

@@ -32,6 +32,12 @@ class BusinessWorker extends Worker
     public $gatewayConnections = array();
     
     /**
+     * 正在连接的gateway内部通讯地址
+     * @var array
+     */
+    protected $_connectingGatewayAddress = array();
+    
+    /**
      * 连接失败gateway内部通讯地址
      * @var array
      */
@@ -153,7 +159,7 @@ class BusinessWorker extends Worker
      */
     public function onClose($connection)
     {
-        unset($this->gatewayConnections[$connection->remoteAddress]);
+        unset($this->gatewayConnections[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
     }
 
     /**
@@ -171,7 +177,7 @@ class BusinessWorker extends Worker
         }
         foreach($addresses_list as $addr)
         {
-            if(!isset($this->gatewayConnections[$addr]))
+            if(!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddress[$addr]) && !isset($this->_badGatewayAddress[$addr]))
             {
                 $gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
                 $gateway_connection->remoteAddress = $addr;
@@ -179,6 +185,8 @@ class BusinessWorker extends Worker
                 $gateway_connection->onMessage = array($this, 'onGatewayMessage');
                 $gateway_connection->onClose = array($this, 'onClose');
                 $gateway_connection->onError = array($this, 'onError');
+                $gateway_connection->connect();
+                $this->_connectingGatewayAddress[$addr] = 1;
             }
         }
     }
@@ -192,7 +200,7 @@ class BusinessWorker extends Worker
     public function onConnectGateway($connection)
     {
         $this->gatewayConnections[$connection->remoteAddress] = $connection;
-        unset($this->_badGatewayAddress[$connection->remoteAddress]);
+        unset($this->_badGatewayAddress[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
     }
     
     /**

+ 13 - 7
Workerman/Connection/AsyncTcpConnection.php

@@ -13,11 +13,6 @@ use \Exception;
  */
 class AsyncTcpConnection extends TcpConnection
 {
-    /**
-     * 连接状态 连接中
-     * @var int
-     */
-    protected $_status = self::STATUS_CONNECTING;
     
     /**
      * 当连接成功时,如果设置了连接成功回调,则执行
@@ -26,13 +21,18 @@ class AsyncTcpConnection extends TcpConnection
     public $onConnect = null;
     
     /**
+     * 连接状态 连接中
+     * @var int
+     */
+    protected $_status = self::STATUS_CONNECTING;
+    
+    /**
      * 构造函数,创建连接
      * @param resource $socket
      * @param EventInterface $event
      */
     public function __construct($remote_address)
     {
-        // 获得协议及远程地址
         list($scheme, $address) = explode(':', $remote_address, 2);
         if($scheme != 'tcp')
         {
@@ -48,11 +48,17 @@ class AsyncTcpConnection extends TcpConnection
                 }
             }
         }
+        $this->_remoteAddress = substr($address, 2);
+    }
+    
+    public function connect()
+    {
         // 创建异步连接
-        $this->_socket = stream_socket_client("tcp:$address", $errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT);
+        $this->_socket = stream_socket_client("tcp://{$this->_remoteAddress}", $errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT);
         // 如果失败尝试触发失败回调(如果有回调的话)
         if(!$this->_socket)
         {
+            $this->_status = self::STATUS_CLOSED;
             $this->emitError(WORKERMAN_CONNECT_FAIL, $errstr);
             return;
         }