Parcourir la source

AsyncTcpConnection suport reconnect in onError/onClose callback

walkor il y a 9 ans
Parent
commit
eb7acbb441
4 fichiers modifiés avec 44 ajouts et 33 suppressions
  1. 18 4
      Connection/AsyncTcpConnection.php
  2. 16 6
      Connection/TcpConnection.php
  3. 9 22
      Connection/UdpConnection.php
  4. 1 1
      Worker.php

+ 18 - 4
Connection/AsyncTcpConnection.php

@@ -41,7 +41,7 @@ class AsyncTcpConnection extends TcpConnection
      *
      * @var int
      */
-    protected $_status = self::STATUS_CONNECTING;
+    protected $_status = self::STATUS_INITIAL;
 
     /**
      * Remote host.
@@ -111,14 +111,23 @@ class AsyncTcpConnection extends TcpConnection
      */
     public function connect()
     {
+         if ($this->_status !== self::STATUS_INITIAL && $this->_status !== self::STATUS_CLOSING && $this->_status !== self::STATUS_CLOSED) {
+            return;
+        }
+        $this->_status = self::STATUS_CONNECTING;
         $this->_connectStartTime = microtime(true);
         // Open socket connection asynchronously.
         $this->_socket = stream_socket_client("{$this->transport}://{$this->_remoteAddress}", $errno, $errstr, 0,
             STREAM_CLIENT_ASYNC_CONNECT);
         // If failed attempt to emit onError callback.
         if (!$this->_socket) {
-            $this->_status = self::STATUS_CLOSED;
             $this->emitError(WORKERMAN_CONNECT_FAIL, $errstr);
+            if ($this->_status === self::STATUS_CLOSING) {
+                $this->destroy();
+            }
+            if ($this->_status === self::STATUS_CLOSED) {
+                $this->onConnect = null;
+            }
             return;
         }
         // Add socket to global event loop waiting connection is successfully established or faild. 
@@ -144,6 +153,7 @@ class AsyncTcpConnection extends TcpConnection
      */
     protected function emitError($code, $msg)
     {
+        $this->_status = self::STATUS_CLOSING;
         if ($this->onError) {
             try {
                 call_user_func($this->onError, $this, $code, $msg);
@@ -201,8 +211,12 @@ class AsyncTcpConnection extends TcpConnection
         } else {
             // Connection failed.
             $this->emitError(WORKERMAN_CONNECT_FAIL, 'connect ' . $this->_remoteAddress . ' fail after ' . round(microtime(true) - $this->_connectStartTime, 4) . ' seconds');
-            $this->destroy();
-            $this->onConnect = null;
+            if ($this->_status === self::STATUS_CLOSING) {
+                $this->destroy();
+            }
+            if ($this->_status === self::STATUS_CLOSED) {
+                $this->onConnect = null;
+            }
         }
     }
 }

+ 16 - 6
Connection/TcpConnection.php

@@ -30,6 +30,13 @@ class TcpConnection extends ConnectionInterface
     const READ_BUFFER_SIZE = 65535;
 
     /**
+     * Status initial.
+     *
+     * @var int
+     */
+    const STATUS_INITIAL = 0;
+
+    /**
      * Status connecting.
      *
      * @var int
@@ -221,7 +228,7 @@ class TcpConnection extends ConnectionInterface
      * Sends data on the connection.
      *
      * @param string $send_buffer
-     * @param bool   $raw
+     * @param bool  $raw
      * @return void|bool|null
      */
     public function send($send_buffer, $raw = false)
@@ -235,7 +242,7 @@ class TcpConnection extends ConnectionInterface
             }
         }
 
-        if ($this->_status === self::STATUS_CONNECTING) {
+        if ($this->_status === self::STATUS_INITIAL || $this->_status === self::STATUS_CONNECTING) {
             $this->_sendBuffer .= $send_buffer;
             return null;
         } elseif ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
@@ -529,15 +536,16 @@ class TcpConnection extends ConnectionInterface
      * Close connection.
      *
      * @param mixed $data
+     * @param bool $raw
      * @return void
      */
-    public function close($data = null)
+    public function close($data = null, $raw = false)
     {
         if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
             return;
         } else {
             if ($data !== null) {
-                $this->send($data);
+                $this->send($data, $raw);
             }
             $this->_status = self::STATUS_CLOSING;
         }
@@ -611,8 +619,10 @@ class TcpConnection extends ConnectionInterface
                 exit(250);
             }
         }
-        // Cleaning up the callback to avoid memory leaks.
-        $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
+        if ($this->_status === self::STATUS_CLOSED) {
+            // Cleaning up the callback to avoid memory leaks.
+            $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
+        }
     }
 
     /**

+ 9 - 22
Connection/UdpConnection.php

@@ -34,20 +34,6 @@ class UdpConnection extends ConnectionInterface
     protected $_socket = null;
 
     /**
-     * Remote ip.
-     *
-     * @var string
-     */
-    protected $_remoteIp = '';
-
-    /**
-     * Remote port.
-     *
-     * @var int
-     */
-    protected $_remotePort = 0;
-
-    /**
      * Remote address.
      *
      * @var string
@@ -92,10 +78,11 @@ class UdpConnection extends ConnectionInterface
      */
     public function getRemoteIp()
     {
-        if (!$this->_remoteIp) {
-            list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
+        $pos = strrpos($this->_remoteAddress, ':');
+        if ($pos) {
+            return trim(substr($this->_remoteAddress, 0, $pos), '[]');
         }
-        return $this->_remoteIp;
+        return '';
     }
 
     /**
@@ -105,10 +92,10 @@ class UdpConnection extends ConnectionInterface
      */
     public function getRemotePort()
     {
-        if (!$this->_remotePort) {
-            list($this->_remoteIp, $this->_remotePort) = explode(':', $this->_remoteAddress, 2);
+        if ($this->_remoteAddress) {
+            return (int)substr(strrchr($this->_remoteAddress, ':'), 1);
         }
-        return $this->_remotePort;
+        return 0;
     }
 
     /**
@@ -117,10 +104,10 @@ class UdpConnection extends ConnectionInterface
      * @param mixed $data
      * @return bool
      */
-    public function close($data = null)
+    public function close($data = null, $raw = false)
     {
         if ($data !== null) {
-            $this->send($data);
+            $this->send($data, $raw);
         }
         return true;
     }

+ 1 - 1
Worker.php

@@ -33,7 +33,7 @@ class Worker
      *
      * @var string
      */
-    const VERSION = '3.3.3';
+    const VERSION = '3.3.4';
 
     /**
      * Status starting.