Jelajahi Sumber

Update TcpConnection.php

mouyong 3 tahun lalu
induk
melakukan
2df35c9803
1 mengubah file dengan 126 tambahan dan 103 penghapusan
  1. 126 103
      src/Connection/TcpConnection.php

+ 126 - 103
src/Connection/TcpConnection.php

@@ -11,9 +11,11 @@
  * @link      http://www.workerman.net/
  * @license   http://www.opensource.org/licenses/mit-license.php MIT License
  */
+
 namespace Workerman\Connection;
 
 use Workerman\Events\EventInterface;
+use Workerman\Protocols\Http\Request;
 use Workerman\Worker;
 
 /**
@@ -28,7 +30,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      *
      * @var int
      */
-    const READ_BUFFER_SIZE = 65535;
+    const READ_BUFFER_SIZE = 87380;
 
     /**
      * Status initial.
@@ -171,7 +173,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      * @var int
      */
     public $maxPackageSize = 1048576;
-    
+
     /**
      * Default maximum acceptable packet size.
      *
@@ -187,6 +189,13 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
     protected static $_idRecorder = 1;
 
     /**
+     * Cache.
+     *
+     * @var bool.
+     */
+    protected static $_enableCache = true;
+
+    /**
      * Socket
      *
      * @var resource
@@ -247,32 +256,32 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      *
      * @var array
      */
-    public static $connections = array();
+    public static $connections = [];
 
     /**
      * Status to string.
      *
      * @var array
      */
-    public static $_statusToString = array(
-        self::STATUS_INITIAL     => 'INITIAL',
-        self::STATUS_CONNECTING  => 'CONNECTING',
+    public static $_statusToString = [
+        self::STATUS_INITIAL => 'INITIAL',
+        self::STATUS_CONNECTING => 'CONNECTING',
         self::STATUS_ESTABLISHED => 'ESTABLISHED',
-        self::STATUS_CLOSING     => 'CLOSING',
-        self::STATUS_CLOSED      => 'CLOSED',
-    );
+        self::STATUS_CLOSING => 'CLOSING',
+        self::STATUS_CLOSED => 'CLOSED',
+    ];
 
     /**
      * Construct.
      *
      * @param resource $socket
-     * @param string   $remote_address
+     * @param string $remote_address
      */
     public function __construct($socket, $remote_address = '')
     {
         ++self::$statistics['connection_count'];
         $this->id = $this->_id = self::$_idRecorder++;
-        if(self::$_idRecorder === \PHP_INT_MAX){
+        if (self::$_idRecorder === \PHP_INT_MAX) {
             self::$_idRecorder = 0;
         }
         $this->_socket = $socket;
@@ -281,10 +290,10 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
         if (\function_exists('stream_set_read_buffer')) {
             \stream_set_read_buffer($this->_socket, 0);
         }
-        Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
-        $this->maxSendBufferSize        = self::$defaultMaxSendBufferSize;
-        $this->maxPackageSize           = self::$defaultMaxPackageSize;
-        $this->_remoteAddress           = $remote_address;
+        Worker::$globalEvent->onReadable($this->_socket, [$this, 'baseRead']);
+        $this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
+        $this->maxPackageSize = self::$defaultMaxPackageSize;
+        $this->_remoteAddress = $remote_address;
         static::$connections[$this->id] = $this;
     }
 
@@ -307,7 +316,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      * Sends data on the connection.
      *
      * @param mixed $send_buffer
-     * @param bool  $raw
+     * @param bool $raw
      * @return bool|null
      */
     public function send($send_buffer, $raw = false)
@@ -318,8 +327,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
 
         // Try to call protocol::encode($send_buffer) before sending.
         if (false === $raw && $this->protocol !== null) {
-            $parser      = $this->protocol;
-            $send_buffer = $parser::encode($send_buffer, $this);
+            $send_buffer = $this->protocol::encode($send_buffer, $this);
             if ($send_buffer === '') {
                 return;
             }
@@ -340,7 +348,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
         // Attempt to send data directly.
         if ($this->_sendBuffer === '') {
             if ($this->transport === 'ssl') {
-                Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
+                Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseWrite']);
                 $this->_sendBuffer = $send_buffer;
                 $this->checkBufferWillFull();
                 return;
@@ -348,9 +356,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
             $len = 0;
             try {
                 $len = @\fwrite($this->_socket, $send_buffer);
-            } catch (\Exception $e) {
-                Worker::log($e);
-            } catch (\Error $e) {
+            } catch (\Throwable $e) {
                 Worker::log($e);
             }
             // send successful.
@@ -368,10 +374,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
                     ++self::$statistics['send_fail'];
                     if ($this->onError) {
                         try {
-                            \call_user_func($this->onError, $this, \WORKERMAN_SEND_FAIL, 'client closed');
-                        } catch (\Exception $e) {
-                            Worker::stopAll(250, $e);
-                        } catch (\Error $e) {
+                            ($this->onError)($this, static::SEND_FAIL, 'client closed');
+                        } catch (\Throwable $e) {
                             Worker::stopAll(250, $e);
                         }
                     }
@@ -380,7 +384,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
                 }
                 $this->_sendBuffer = $send_buffer;
             }
-            Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
+            Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseWrite']);
             // Check if the send buffer will be full.
             $this->checkBufferWillFull();
             return;
@@ -405,7 +409,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
     {
         $pos = \strrpos($this->_remoteAddress, ':');
         if ($pos) {
-            return (string) \substr($this->_remoteAddress, 0, $pos);
+            return (string)\substr($this->_remoteAddress, 0, $pos);
         }
         return '';
     }
@@ -418,7 +422,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
     public function getRemotePort()
     {
         if ($this->_remoteAddress) {
-            return (int) \substr(\strrchr($this->_remoteAddress, ':'), 1);
+            return (int)\substr(\strrchr($this->_remoteAddress, ':'), 1);
         }
         return 0;
     }
@@ -529,7 +533,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      */
     public function pauseRecv()
     {
-        Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
+        Worker::$globalEvent->offReadable($this->_socket);
         $this->_isPaused = true;
     }
 
@@ -541,14 +545,13 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
     public function resumeRecv()
     {
         if ($this->_isPaused === true) {
-            Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
+            Worker::$globalEvent->onReadable($this->_socket, [$this, 'baseRead']);
             $this->_isPaused = false;
             $this->baseRead($this->_socket, false);
         }
     }
 
 
-
     /**
      * Base read handler.
      *
@@ -558,12 +561,13 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      */
     public function baseRead($socket, $check_eof = true)
     {
+        static $requests = [];
         // SSL handshake.
         if ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true) {
             if ($this->doSslHandshake($socket)) {
                 $this->_sslHandshakeCompleted = true;
                 if ($this->_sendBuffer) {
-                    Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
+                    Worker::$globalEvent->onWritable($socket, [$this, 'baseWrite']);
                 }
             } else {
                 return;
@@ -573,7 +577,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
         $buffer = '';
         try {
             $buffer = @\fread($socket, self::READ_BUFFER_SIZE);
-        } catch (\Exception $e) {} catch (\Error $e) {}
+        } catch (\Throwable $e) {
+        }
 
         // Check connection closed.
         if ($buffer === '' || $buffer === false) {
@@ -583,12 +588,32 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
             }
         } else {
             $this->bytesRead += \strlen($buffer);
-            $this->_recvBuffer .= $buffer;
+            if ($this->_recvBuffer === '') {
+                if (static::$_enableCache && !isset($requests[512]) && isset($requests[$buffer])) {
+                    ++self::$statistics['total_request'];
+                    $request = $requests[$buffer];
+                    if ($request instanceof Request) {
+                        $request = clone $request;
+                        $requests[$buffer] = $request;
+                        $request->connection = $this;
+                        $this->__request = $request;
+                        $request->properties = [];
+                    }
+                    try {
+                        ($this->onMessage)($this, $request);
+                    } catch (\Throwable $e) {
+                        Worker::stopAll(250, $e);
+                    }
+                    return;
+                }
+                $this->_recvBuffer = $buffer;
+            } else {
+                $this->_recvBuffer .= $buffer;
+            }
         }
 
         // If the application layer protocol has been set up.
         if ($this->protocol !== null) {
-            $parser = $this->protocol;
             while ($this->_recvBuffer !== '' && !$this->_isPaused) {
                 // The current packet length is known.
                 if ($this->_currentPackageLength) {
@@ -599,8 +624,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
                 } else {
                     // Get current package length.
                     try {
-                        $this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
-                    } catch (\Exception $e) {} catch (\Error $e) {}
+                        $this->_currentPackageLength = $this->protocol::input($this->_recvBuffer, $this);
+                    } catch (\Throwable $e) {
+                    }
                     // The packet length is unknown.
                     if ($this->_currentPackageLength === 0) {
                         break;
@@ -620,9 +646,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
                 // The data is enough for a packet.
                 ++self::$statistics['total_request'];
                 // The current packet length is equal to the length of the buffer.
-                if (\strlen($this->_recvBuffer) === $this->_currentPackageLength) {
+                if ($one = \strlen($this->_recvBuffer) === $this->_currentPackageLength) {
                     $one_request_buffer = $this->_recvBuffer;
-                    $this->_recvBuffer  = '';
+                    $this->_recvBuffer = '';
                 } else {
                     // Get a full package from the buffer.
                     $one_request_buffer = \substr($this->_recvBuffer, 0, $this->_currentPackageLength);
@@ -631,15 +657,17 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
                 }
                 // Reset the current packet length to 0.
                 $this->_currentPackageLength = 0;
-                if (!$this->onMessage) {
-                    continue;
-                }
                 try {
                     // Decode request buffer before Emitting onMessage callback.
-                    \call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
-                } catch (\Exception $e) {
-                    Worker::stopAll(250, $e);
-                } catch (\Error $e) {
+                    $request = $this->protocol::decode($one_request_buffer, $this);
+                    if (static::$_enableCache && (!\is_object($request) || $request instanceof Request) && $one && !isset($one_request_buffer[512])) {
+                        $requests[$one_request_buffer] = $request;
+                        if (\count($requests) > 512) {
+                            unset($requests[\key($requests)]);
+                        }
+                    }
+                    ($this->onMessage)($this, $request);
+                } catch (\Throwable $e) {
                     Worker::stopAll(250, $e);
                 }
             }
@@ -652,15 +680,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
 
         // Applications protocol is not set.
         ++self::$statistics['total_request'];
-        if (!$this->onMessage) {
-            $this->_recvBuffer = '';
-            return;
-        }
         try {
-            \call_user_func($this->onMessage, $this, $this->_recvBuffer);
-        } catch (\Exception $e) {
-            Worker::stopAll(250, $e);
-        } catch (\Error $e) {
+            ($this->onMessage)($this, $this->_recvBuffer);
+        } catch (\Throwable $e) {
             Worker::stopAll(250, $e);
         }
         // Clean receive buffer.
@@ -674,7 +696,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      */
     public function baseWrite()
     {
-        \set_error_handler(function(){});
+        \set_error_handler(function () {
+        });
         if ($this->transport === 'ssl') {
             $len = @\fwrite($this->_socket, $this->_sendBuffer, 8192);
         } else {
@@ -683,15 +706,13 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
         \restore_error_handler();
         if ($len === \strlen($this->_sendBuffer)) {
             $this->bytesWritten += $len;
-            Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
+            Worker::$globalEvent->offWritable($this->_socket);
             $this->_sendBuffer = '';
             // Try to emit onBufferDrain callback when the send buffer becomes empty.
             if ($this->onBufferDrain) {
                 try {
-                    \call_user_func($this->onBufferDrain, $this);
-                } catch (\Exception $e) {
-                    Worker::stopAll(250, $e);
-                } catch (\Error $e) {
+                    ($this->onBufferDrain)($this);
+                } catch (\Throwable $e) {
                     Worker::stopAll(250, $e);
                 }
             }
@@ -715,31 +736,32 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      * @param resource $socket
      * @return bool
      */
-    public function doSslHandshake($socket){
+    public function doSslHandshake($socket)
+    {
         if (\feof($socket)) {
             $this->destroy();
             return false;
         }
         $async = $this instanceof AsyncTcpConnection;
-        
+
         /**
-          *  We disabled ssl3 because https://blog.qualys.com/ssllabs/2014/10/15/ssl-3-is-dead-killed-by-the-poodle-attack.
-          *  You can enable ssl3 by the codes below.
-          */
+         *  We disabled ssl3 because https://blog.qualys.com/ssllabs/2014/10/15/ssl-3-is-dead-killed-by-the-poodle-attack.
+         *  You can enable ssl3 by the codes below.
+         */
         /*if($async){
             $type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT | STREAM_CRYPTO_METHOD_SSLv3_CLIENT;
         }else{
             $type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER | STREAM_CRYPTO_METHOD_SSLv3_SERVER;
         }*/
-        
-        if($async){
+
+        if ($async) {
             $type = \STREAM_CRYPTO_METHOD_SSLv2_CLIENT | \STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
-        }else{
+        } else {
             $type = \STREAM_CRYPTO_METHOD_SSLv2_SERVER | \STREAM_CRYPTO_METHOD_SSLv23_SERVER;
         }
-        
+
         // Hidden error.
-        \set_error_handler(function($errno, $errstr, $file){
+        \set_error_handler(function ($errno, $errstr, $file) {
             if (!Worker::$daemonize) {
                 Worker::safeEcho("SSL handshake error: $errstr \n");
             }
@@ -756,10 +778,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
         }
         if (isset($this->onSslHandshake)) {
             try {
-                \call_user_func($this->onSslHandshake, $this);
-            } catch (\Exception $e) {
-                Worker::stopAll(250, $e);
-            } catch (\Error $e) {
+                ($this->onSslHandshake)($this);
+            } catch (\Throwable $e) {
                 Worker::stopAll(250, $e);
             }
         }
@@ -774,14 +794,14 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      */
     public function pipe(self $dest)
     {
-        $source              = $this;
-        $this->onMessage     = function ($source, $data) use ($dest) {
+        $source = $this;
+        $this->onMessage = function ($source, $data) use ($dest) {
             $dest->send($data);
         };
-        $this->onClose       = function ($source) use ($dest) {
+        $this->onClose = function ($source) use ($dest) {
             $dest->close();
         };
-        $dest->onBufferFull  = function ($dest) use ($source) {
+        $dest->onBufferFull = function ($dest) use ($source) {
             $source->pauseRecv();
         };
         $dest->onBufferDrain = function ($dest) use ($source) {
@@ -809,7 +829,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      */
     public function close($data = null, $raw = false)
     {
-        if($this->_status === self::STATUS_CONNECTING){
+        if ($this->_status === self::STATUS_CONNECTING) {
             $this->destroy();
             return;
         }
@@ -823,7 +843,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
         }
 
         $this->_status = self::STATUS_CLOSING;
-        
+
         if ($this->_sendBuffer === '') {
             $this->destroy();
         } else {
@@ -851,10 +871,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
         if ($this->maxSendBufferSize <= \strlen($this->_sendBuffer)) {
             if ($this->onBufferFull) {
                 try {
-                    \call_user_func($this->onBufferFull, $this);
-                } catch (\Exception $e) {
-                    Worker::stopAll(250, $e);
-                } catch (\Error $e) {
+                    ($this->onBufferFull)($this);
+                } catch (\Throwable $e) {
                     Worker::stopAll(250, $e);
                 }
             }
@@ -872,10 +890,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
         if ($this->maxSendBufferSize <= \strlen($this->_sendBuffer)) {
             if ($this->onError) {
                 try {
-                    \call_user_func($this->onError, $this, \WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
-                } catch (\Exception $e) {
-                    Worker::stopAll(250, $e);
-                } catch (\Error $e) {
+                    ($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
+                } catch (\Throwable $e) {
                     Worker::stopAll(250, $e);
                 }
             }
@@ -883,7 +899,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
         }
         return false;
     }
-    
+
     /**
      * Whether send buffer is Empty.
      *
@@ -891,7 +907,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
      */
     public function bufferIsEmpty()
     {
-    	return empty($this->_sendBuffer);
+        return empty($this->_sendBuffer);
     }
 
     /**
@@ -906,32 +922,29 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
             return;
         }
         // Remove event listener.
-        Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
-        Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
+        Worker::$globalEvent->offReadable($this->_socket);
+        Worker::$globalEvent->offWritable($this->_socket);
 
         // Close socket.
         try {
             @\fclose($this->_socket);
-        } catch (\Exception $e) {} catch (\Error $e) {}
+        } catch (\Throwable $e) {
+        }
 
         $this->_status = self::STATUS_CLOSED;
         // Try to emit onClose callback.
         if ($this->onClose) {
             try {
-                \call_user_func($this->onClose, $this);
-            } catch (\Exception $e) {
-                Worker::stopAll(250, $e);
-            } catch (\Error $e) {
+                ($this->onClose)($this);
+            } catch (\Throwable $e) {
                 Worker::stopAll(250, $e);
             }
         }
         // Try to emit protocol::onClose
         if ($this->protocol && \method_exists($this->protocol, 'onClose')) {
             try {
-                \call_user_func(array($this->protocol, 'onClose'), $this);
-            } catch (\Exception $e) {
-                Worker::stopAll(250, $e);
-            } catch (\Error $e) {
+                ([$this->protocol, 'onClose'])($this);
+            } catch (\Throwable $e) {
                 Worker::stopAll(250, $e);
             }
         }
@@ -950,6 +963,16 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
     }
 
     /**
+     * Enable or disable Cache.
+     *
+     * @param mixed $value
+     */
+    public static function enableCache($value)
+    {
+        static::$_enableCache = (bool)$value;
+    }
+
+    /**
      * Destruct.
      *
      * @return void
@@ -967,7 +990,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable, \S
                 Worker::log('worker[' . \posix_getpid() . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
             }
 
-            if(0 === self::$statistics['connection_count']) {
+            if (0 === self::$statistics['connection_count']) {
                 Worker::stopAll();
             }
         }