walkor преди 2 години
родител
ревизия
37265a0d18
променени са 1 файла, в които са добавени 105 реда и са изтрити 95 реда
  1. 105 95
      src/Connection/AsyncTcpConnection.php

+ 105 - 95
src/Connection/AsyncTcpConnection.php

@@ -15,9 +15,32 @@
 namespace Workerman\Connection;
 
 use Exception;
+use stdClass;
 use Throwable;
 use Workerman\Timer;
 use Workerman\Worker;
+use function class_exists;
+use function explode;
+use function function_exists;
+use function is_resource;
+use function method_exists;
+use function microtime;
+use function parse_url;
+use function socket_import_stream;
+use function socket_set_option;
+use function stream_context_create;
+use function stream_set_blocking;
+use function stream_set_read_buffer;
+use function stream_socket_client;
+use function stream_socket_get_name;
+use function ucfirst;
+use const DIRECTORY_SEPARATOR;
+use const PHP_INT_MAX;
+use const SO_KEEPALIVE;
+use const SOL_SOCKET;
+use const SOL_TCP;
+use const STREAM_CLIENT_ASYNC_CONNECT;
+use const TCP_NODELAY;
 
 /**
  * AsyncTcpConnection.
@@ -25,82 +48,85 @@ use Workerman\Worker;
 class AsyncTcpConnection extends TcpConnection
 {
     /**
+     * PHP built-in protocols.
+     *
+     * @var array<string,string>
+     */
+    const BUILD_IN_TRANSPORTS = [
+        'tcp' => 'tcp',
+        'udp' => 'udp',
+        'unix' => 'unix',
+        'ssl' => 'ssl',
+        'sslv2' => 'sslv2',
+        'sslv3' => 'sslv3',
+        'tls' => 'tls'
+    ];
+    /**
      * Emitted when socket connection is successfully established.
      *
      * @var ?callable
      */
     public $onConnect = null;
-
     /**
      * Emitted when websocket handshake completed (Only work when protocol is ws).
      *
      * @var ?callable
      */
     public $onWebSocketConnect = null;
-
     /**
      * Transport layer protocol.
      *
      * @var string
      */
     public string $transport = 'tcp';
-
     /**
      * Socks5 proxy.
      *
      * @var string
      */
     public string $proxySocks5 = '';
-
     /**
      * Http proxy.
      *
      * @var string
      */
     public string $proxyHttp = '';
-
     /**
      * Status.
      *
      * @var int
      */
     protected int $status = self::STATUS_INITIAL;
-
     /**
      * Remote host.
      *
      * @var string
      */
     protected string $remoteHost = '';
-
     /**
      * Remote port.
      *
      * @var int
      */
     protected int $remotePort = 80;
-
     /**
      * Connect start time.
      *
      * @var float
      */
     protected float $connectStartTime = 0;
-
     /**
      * Remote URI.
      *
      * @var string
      */
     protected string $remoteURI = '';
-
     /**
      * Context option.
      *
      * @var array
      */
     protected array $contextOption = [];
-
     /**
      * Reconnect timer.
      *
@@ -108,22 +134,6 @@ class AsyncTcpConnection extends TcpConnection
      */
     protected int $reconnectTimer = 0;
 
-
-    /**
-     * PHP built-in protocols.
-     *
-     * @var array<string,string>
-     */
-    const BUILD_IN_TRANSPORTS = [
-        'tcp' => 'tcp',
-        'udp' => 'udp',
-        'unix' => 'unix',
-        'ssl' => 'ssl',
-        'sslv2' => 'sslv2',
-        'sslv3' => 'sslv3',
-        'tls' => 'tls'
-    ];
-
     /**
      * Construct.
      *
@@ -133,14 +143,14 @@ class AsyncTcpConnection extends TcpConnection
      */
     public function __construct(string $remoteAddress, array $contextOption = [])
     {
-        $addressInfo = \parse_url($remoteAddress);
+        $addressInfo = parse_url($remoteAddress);
         if (!$addressInfo) {
-            list($scheme, $this->remoteAddress) = \explode(':', $remoteAddress, 2);
+            list($scheme, $this->remoteAddress) = explode(':', $remoteAddress, 2);
             if ('unix' === strtolower($scheme)) {
                 $this->remoteAddress = substr($remoteAddress, strpos($remoteAddress, '/') + 2);
             }
             if (!$this->remoteAddress) {
-                Worker::safeEcho(new \Exception('bad remote_address'));
+                Worker::safeEcho(new Exception('bad remote_address'));
             }
         } else {
             if (!isset($addressInfo['port'])) {
@@ -164,16 +174,16 @@ class AsyncTcpConnection extends TcpConnection
         }
 
         $this->id = $this->realId = self::$idRecorder++;
-        if (\PHP_INT_MAX === self::$idRecorder) {
+        if (PHP_INT_MAX === self::$idRecorder) {
             self::$idRecorder = 0;
         }
         // Check application layer protocol class.
         if (!isset(self::BUILD_IN_TRANSPORTS[$scheme])) {
-            $scheme = \ucfirst($scheme);
+            $scheme = ucfirst($scheme);
             $this->protocol = '\\Protocols\\' . $scheme;
-            if (!\class_exists($this->protocol)) {
+            if (!class_exists($this->protocol)) {
                 $this->protocol = "\\Workerman\\Protocols\\$scheme";
-                if (!\class_exists($this->protocol)) {
+                if (!class_exists($this->protocol)) {
                     throw new Exception("class \\Protocols\\$scheme not exist");
                 }
             }
@@ -187,7 +197,28 @@ class AsyncTcpConnection extends TcpConnection
         $this->maxPackageSize = self::$defaultMaxPackageSize;
         $this->contextOption = $contextOption;
         static::$connections[$this->realId] = $this;
-        $this->context = new \stdClass;
+        $this->context = new stdClass;
+    }
+
+    /**
+     * Reconnect.
+     *
+     * @param int $after
+     * @return void
+     * @throws Throwable
+     */
+    public function reconnect(int $after = 0)
+    {
+        $this->status = self::STATUS_INITIAL;
+        static::$connections[$this->realId] = $this;
+        if ($this->reconnectTimer) {
+            Timer::del($this->reconnectTimer);
+        }
+        if ($after > 0) {
+            $this->reconnectTimer = Timer::add($after, [$this, 'connect'], null, false);
+            return;
+        }
+        $this->connect();
     }
 
     /**
@@ -208,7 +239,7 @@ class AsyncTcpConnection extends TcpConnection
         }
 
         $this->status = self::STATUS_CONNECTING;
-        $this->connectStartTime = \microtime(true);
+        $this->connectStartTime = microtime(true);
         if ($this->transport !== 'unix') {
             if (!$this->remotePort) {
                 $this->remotePort = $this->transport === 'ssl' ? 443 : 80;
@@ -217,36 +248,36 @@ class AsyncTcpConnection extends TcpConnection
             // Open socket connection asynchronously.
             if ($this->proxySocks5) {
                 $this->contextOption['ssl']['peer_name'] = $this->remoteHost;
-                $context = \stream_context_create($this->contextOption);
-                $this->socket = \stream_socket_client("tcp://{$this->proxySocks5}", $errno, $errstr, 0, \STREAM_CLIENT_ASYNC_CONNECT, $context);
+                $context = stream_context_create($this->contextOption);
+                $this->socket = stream_socket_client("tcp://$this->proxySocks5", $errno, $err_str, 0, STREAM_CLIENT_ASYNC_CONNECT, $context);
                 fwrite($this->socket, chr(5) . chr(1) . chr(0));
                 fread($this->socket, 512);
                 fwrite($this->socket, chr(5) . chr(1) . chr(0) . chr(3) . chr(strlen($this->remoteHost)) . $this->remoteHost . pack("n", $this->remotePort));
                 fread($this->socket, 512);
             } else if ($this->proxyHttp) {
                 $this->contextOption['ssl']['peer_name'] = $this->remoteHost;
-                $context = \stream_context_create($this->contextOption);
-                $this->socket = \stream_socket_client("tcp://{$this->proxyHttp}", $errno, $errstr, 0, \STREAM_CLIENT_ASYNC_CONNECT, $context);
-                $str = "CONNECT {$this->remoteHost}:{$this->remotePort} HTTP/1.1\n";
-                $str .= "Host: {$this->remoteHost}:{$this->remotePort}\n";
+                $context = stream_context_create($this->contextOption);
+                $this->socket = stream_socket_client("tcp://$this->proxyHttp", $errno, $err_str, 0, STREAM_CLIENT_ASYNC_CONNECT, $context);
+                $str = "CONNECT $this->remoteHost:$this->remotePort HTTP/1.1\n";
+                $str .= "Host: $this->remoteHost:$this->remotePort\n";
                 $str .= "Proxy-Connection: keep-alive\n";
                 fwrite($this->socket, $str);
                 fread($this->socket, 512);
             } else if ($this->contextOption) {
-                $context = \stream_context_create($this->contextOption);
-                $this->socket = \stream_socket_client("tcp://{$this->remoteHost}:{$this->remotePort}",
-                    $errno, $errstr, 0, \STREAM_CLIENT_ASYNC_CONNECT, $context);
+                $context = stream_context_create($this->contextOption);
+                $this->socket = stream_socket_client("tcp://$this->remoteHost:$this->remotePort",
+                    $errno, $err_str, 0, STREAM_CLIENT_ASYNC_CONNECT, $context);
             } else {
-                $this->socket = \stream_socket_client("tcp://{$this->remoteHost}:{$this->remotePort}",
-                    $errno, $errstr, 0, \STREAM_CLIENT_ASYNC_CONNECT);
+                $this->socket = stream_socket_client("tcp://$this->remoteHost:$this->remotePort",
+                    $errno, $err_str, 0, STREAM_CLIENT_ASYNC_CONNECT);
             }
         } else {
-            $this->socket = \stream_socket_client("{$this->transport}://{$this->remoteAddress}", $errno, $errstr, 0,
-                \STREAM_CLIENT_ASYNC_CONNECT);
+            $this->socket = stream_socket_client("$this->transport://$this->remoteAddress", $errno, $err_str, 0,
+                STREAM_CLIENT_ASYNC_CONNECT);
         }
         // If failed attempt to emit onError callback.
-        if (!$this->socket || !\is_resource($this->socket)) {
-            $this->emitError(static::CONNECT_FAIL, $errstr);
+        if (!$this->socket || !is_resource($this->socket)) {
+            $this->emitError(static::CONNECT_FAIL, $err_str);
             if ($this->status === self::STATUS_CLOSING) {
                 $this->destroy();
             }
@@ -258,30 +289,29 @@ class AsyncTcpConnection extends TcpConnection
         // Add socket to global event loop waiting connection is successfully established or faild.
         $this->eventLoop->onWritable($this->socket, [$this, 'checkConnection']);
         // For windows.
-        if (\DIRECTORY_SEPARATOR === '\\' && \method_exists($this->eventLoop, 'onExcept')) {
+        if (DIRECTORY_SEPARATOR === '\\' && method_exists($this->eventLoop, 'onExcept')) {
             $this->eventLoop->onExcept($this->socket, [$this, 'checkConnection']);
         }
     }
 
     /**
-     * Reconnect.
+     * Try to emit onError callback.
      *
-     * @param int $after
+     * @param int $code
+     * @param mixed $msg
      * @return void
      * @throws Throwable
      */
-    public function reconnect(int $after = 0)
+    protected function emitError(int $code, mixed $msg)
     {
-        $this->status = self::STATUS_INITIAL;
-        static::$connections[$this->realId] = $this;
-        if ($this->reconnectTimer) {
-            Timer::del($this->reconnectTimer);
-        }
-        if ($after > 0) {
-            $this->reconnectTimer = Timer::add($after, [$this, 'connect'], null, false);
-            return;
+        $this->status = self::STATUS_CLOSING;
+        if ($this->onError) {
+            try {
+                ($this->onError)($this, $code, $msg);
+            } catch (Throwable $e) {
+                $this->error($e);
+            }
         }
-        $this->connect();
     }
 
     /**
@@ -316,26 +346,6 @@ class AsyncTcpConnection extends TcpConnection
     }
 
     /**
-     * Try to emit onError callback.
-     *
-     * @param int $code
-     * @param mixed $msg
-     * @return void
-     * @throws Throwable
-     */
-    protected function emitError(int $code, mixed $msg)
-    {
-        $this->status = self::STATUS_CLOSING;
-        if ($this->onError) {
-            try {
-                ($this->onError)($this, $code, $msg);
-            } catch (Throwable $e) {
-                $this->error($e);
-            }
-        }
-    }
-
-    /**
      * Check connection is successfully established or faild.
      *
      * @return void
@@ -344,7 +354,7 @@ class AsyncTcpConnection extends TcpConnection
     public function checkConnection()
     {
         // Remove EV_EXPECT for windows.
-        if (\DIRECTORY_SEPARATOR === '\\' && \method_exists($this->eventLoop, 'offExcept')) {
+        if (DIRECTORY_SEPARATOR === '\\' && method_exists($this->eventLoop, 'offExcept')) {
             $this->eventLoop->offExcept($this->socket);
         }
         // Remove write listener.
@@ -355,18 +365,18 @@ class AsyncTcpConnection extends TcpConnection
         }
 
         // Check socket state.
-        if ($address = \stream_socket_get_name($this->socket, true)) {
+        if ($address = stream_socket_get_name($this->socket, true)) {
             // Nonblocking.
-            \stream_set_blocking($this->socket, false);
+            stream_set_blocking($this->socket, false);
             // Compatible with hhvm
-            if (\function_exists('stream_set_read_buffer')) {
-                \stream_set_read_buffer($this->socket, 0);
+            if (function_exists('stream_set_read_buffer')) {
+                stream_set_read_buffer($this->socket, 0);
             }
             // Try to open keepalive for tcp and disable Nagle algorithm.
-            if (\function_exists('socket_import_stream') && $this->transport === 'tcp') {
-                $rawSocket = \socket_import_stream($this->socket);
-                \socket_set_option($rawSocket, \SOL_SOCKET, \SO_KEEPALIVE, 1);
-                \socket_set_option($rawSocket, \SOL_TCP, \TCP_NODELAY, 1);
+            if (function_exists('socket_import_stream') && $this->transport === 'tcp') {
+                $rawSocket = socket_import_stream($this->socket);
+                socket_set_option($rawSocket, SOL_SOCKET, SO_KEEPALIVE, 1);
+                socket_set_option($rawSocket, SOL_TCP, TCP_NODELAY, 1);
             }
             // SSL handshake.
             if ($this->transport === 'ssl') {
@@ -395,7 +405,7 @@ class AsyncTcpConnection extends TcpConnection
                 }
             }
             // Try to emit protocol::onConnect
-            if ($this->protocol && \method_exists($this->protocol, 'onConnect')) {
+            if ($this->protocol && method_exists($this->protocol, 'onConnect')) {
                 try {
                     [$this->protocol, 'onConnect']($this);
                 } catch (Throwable $e) {
@@ -405,7 +415,7 @@ class AsyncTcpConnection extends TcpConnection
         } else {
 
             // Connection failed.
-            $this->emitError(static::CONNECT_FAIL, 'connect ' . $this->remoteAddress . ' fail after ' . round(\microtime(true) - $this->connectStartTime, 4) . ' seconds');
+            $this->emitError(static::CONNECT_FAIL, 'connect ' . $this->remoteAddress . ' fail after ' . round(microtime(true) - $this->connectStartTime, 4) . ' seconds');
             if ($this->status === self::STATUS_CLOSING) {
                 $this->destroy();
             }