Pārlūkot izejas kodu

Update TcpConnection.php

mouyong 3 gadi atpakaļ
vecāks
revīzija
ab813dbc1e
1 mainītis faili ar 137 papildinājumiem un 127 dzēšanām
  1. 137 127
      src/Connection/TcpConnection.php

+ 137 - 127
src/Connection/TcpConnection.php

@@ -1,4 +1,5 @@
 <?php
+
 /**
  * This file is part of workerman.
  *
@@ -11,24 +12,25 @@
  * @link      http://www.workerman.net/
  * @license   http://www.opensource.org/licenses/mit-license.php MIT License
  */
-
 namespace Workerman\Connection;
 
 use Workerman\Events\EventInterface;
-use Workerman\Protocols\Http\Request;
 use Workerman\Worker;
+use \Exception;
+use JsonSerializable;
+use Serializable;
 
 /**
  * TcpConnection.
  */
-class TcpConnection extends ConnectionInterface
+class TcpConnection extends ConnectionInterface implements JsonSerializable, Serializable
 {
     /**
      * Read buffer size.
      *
      * @var int
      */
-    const READ_BUFFER_SIZE = 87380;
+    const READ_BUFFER_SIZE = 65535;
 
     /**
      * Status initial.
@@ -171,7 +173,7 @@ class TcpConnection extends ConnectionInterface
      * @var int
      */
     public $maxPackageSize = 1048576;
-
+    
     /**
      * Default maximum acceptable packet size.
      *
@@ -187,13 +189,6 @@ class TcpConnection extends ConnectionInterface
     protected static $_idRecorder = 1;
 
     /**
-     * Cache.
-     *
-     * @var bool.
-     */
-    protected static $_enableCache = true;
-
-    /**
      * Socket
      *
      * @var resource
@@ -254,32 +249,32 @@ class TcpConnection extends ConnectionInterface
      *
      * @var array
      */
-    public static $connections = [];
+    public static $connections = array();
 
     /**
      * Status to string.
      *
      * @var array
      */
-    public static $_statusToString = [
-        self::STATUS_INITIAL => 'INITIAL',
-        self::STATUS_CONNECTING => 'CONNECTING',
+    public static $_statusToString = array(
+        self::STATUS_INITIAL     => 'INITIAL',
+        self::STATUS_CONNECTING  => 'CONNECTING',
         self::STATUS_ESTABLISHED => 'ESTABLISHED',
-        self::STATUS_CLOSING => 'CLOSING',
-        self::STATUS_CLOSED => 'CLOSED',
-    ];
+        self::STATUS_CLOSING     => 'CLOSING',
+        self::STATUS_CLOSED      => 'CLOSED',
+    );
 
     /**
      * Construct.
      *
      * @param resource $socket
-     * @param string $remote_address
+     * @param string   $remote_address
      */
     public function __construct($socket, $remote_address = '')
     {
         ++self::$statistics['connection_count'];
         $this->id = $this->_id = self::$_idRecorder++;
-        if (self::$_idRecorder === \PHP_INT_MAX) {
+        if(self::$_idRecorder === \PHP_INT_MAX){
             self::$_idRecorder = 0;
         }
         $this->_socket = $socket;
@@ -288,10 +283,10 @@ class TcpConnection extends ConnectionInterface
         if (\function_exists('stream_set_read_buffer')) {
             \stream_set_read_buffer($this->_socket, 0);
         }
-        Worker::$globalEvent->onReadable($this->_socket, [$this, 'baseRead']);
-        $this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
-        $this->maxPackageSize = self::$defaultMaxPackageSize;
-        $this->_remoteAddress = $remote_address;
+        Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
+        $this->maxSendBufferSize        = self::$defaultMaxSendBufferSize;
+        $this->maxPackageSize           = self::$defaultMaxPackageSize;
+        $this->_remoteAddress           = $remote_address;
         static::$connections[$this->id] = $this;
     }
 
@@ -314,7 +309,7 @@ class TcpConnection extends ConnectionInterface
      * Sends data on the connection.
      *
      * @param mixed $send_buffer
-     * @param bool $raw
+     * @param bool  $raw
      * @return bool|null
      */
     public function send($send_buffer, $raw = false)
@@ -325,7 +320,8 @@ class TcpConnection extends ConnectionInterface
 
         // Try to call protocol::encode($send_buffer) before sending.
         if (false === $raw && $this->protocol !== null) {
-            $send_buffer = $this->protocol::encode($send_buffer, $this);
+            $parser      = $this->protocol;
+            $send_buffer = $parser::encode($send_buffer, $this);
             if ($send_buffer === '') {
                 return;
             }
@@ -346,7 +342,7 @@ class TcpConnection extends ConnectionInterface
         // Attempt to send data directly.
         if ($this->_sendBuffer === '') {
             if ($this->transport === 'ssl') {
-                Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseWrite']);
+                Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
                 $this->_sendBuffer = $send_buffer;
                 $this->checkBufferWillFull();
                 return;
@@ -354,7 +350,9 @@ class TcpConnection extends ConnectionInterface
             $len = 0;
             try {
                 $len = @\fwrite($this->_socket, $send_buffer);
-            } catch (\Throwable $e) {
+            } catch (\Exception $e) {
+                Worker::log($e);
+            } catch (\Error $e) {
                 Worker::log($e);
             }
             // send successful.
@@ -372,8 +370,10 @@ class TcpConnection extends ConnectionInterface
                     ++self::$statistics['send_fail'];
                     if ($this->onError) {
                         try {
-                            ($this->onError)($this, static::SEND_FAIL, 'client closed');
-                        } catch (\Throwable $e) {
+                            \call_user_func($this->onError, $this, \WORKERMAN_SEND_FAIL, 'client closed');
+                        } catch (\Exception $e) {
+                            Worker::stopAll(250, $e);
+                        } catch (\Error $e) {
                             Worker::stopAll(250, $e);
                         }
                     }
@@ -382,7 +382,7 @@ class TcpConnection extends ConnectionInterface
                 }
                 $this->_sendBuffer = $send_buffer;
             }
-            Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseWrite']);
+            Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
             // Check if the send buffer will be full.
             $this->checkBufferWillFull();
             return;
@@ -407,7 +407,7 @@ class TcpConnection extends ConnectionInterface
     {
         $pos = \strrpos($this->_remoteAddress, ':');
         if ($pos) {
-            return (string)\substr($this->_remoteAddress, 0, $pos);
+            return (string) \substr($this->_remoteAddress, 0, $pos);
         }
         return '';
     }
@@ -420,7 +420,7 @@ class TcpConnection extends ConnectionInterface
     public function getRemotePort()
     {
         if ($this->_remoteAddress) {
-            return (int)\substr(\strrchr($this->_remoteAddress, ':'), 1);
+            return (int) \substr(\strrchr($this->_remoteAddress, ':'), 1);
         }
         return 0;
     }
@@ -531,7 +531,7 @@ class TcpConnection extends ConnectionInterface
      */
     public function pauseRecv()
     {
-        Worker::$globalEvent->offReadable($this->_socket);
+        Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
         $this->_isPaused = true;
     }
 
@@ -543,13 +543,14 @@ class TcpConnection extends ConnectionInterface
     public function resumeRecv()
     {
         if ($this->_isPaused === true) {
-            Worker::$globalEvent->onReadable($this->_socket, [$this, 'baseRead']);
+            Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
             $this->_isPaused = false;
             $this->baseRead($this->_socket, false);
         }
     }
 
 
+
     /**
      * Base read handler.
      *
@@ -559,13 +560,12 @@ class TcpConnection extends ConnectionInterface
      */
     public function baseRead($socket, $check_eof = true)
     {
-        static $requests = [];
         // SSL handshake.
         if ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true) {
             if ($this->doSslHandshake($socket)) {
                 $this->_sslHandshakeCompleted = true;
                 if ($this->_sendBuffer) {
-                    Worker::$globalEvent->onWritable($socket, [$this, 'baseWrite']);
+                    Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
                 }
             } else {
                 return;
@@ -575,8 +575,7 @@ class TcpConnection extends ConnectionInterface
         $buffer = '';
         try {
             $buffer = @\fread($socket, self::READ_BUFFER_SIZE);
-        } catch (\Throwable $e) {
-        }
+        } catch (\Exception $e) {} catch (\Error $e) {}
 
         // Check connection closed.
         if ($buffer === '' || $buffer === false) {
@@ -586,32 +585,12 @@ class TcpConnection extends ConnectionInterface
             }
         } else {
             $this->bytesRead += \strlen($buffer);
-            if ($this->_recvBuffer === '') {
-                if (static::$_enableCache && !isset($requests[512]) && isset($requests[$buffer])) {
-                    ++self::$statistics['total_request'];
-                    $request = $requests[$buffer];
-                    if ($request instanceof Request) {
-                        $request = clone $request;
-                        $requests[$buffer] = $request;
-                        $request->connection = $this;
-                        $this->__request = $request;
-                        $request->properties = [];
-                    }
-                    try {
-                        ($this->onMessage)($this, $request);
-                    } catch (\Throwable $e) {
-                        Worker::stopAll(250, $e);
-                    }
-                    return;
-                }
-                $this->_recvBuffer = $buffer;
-            } else {
-                $this->_recvBuffer .= $buffer;
-            }
+            $this->_recvBuffer .= $buffer;
         }
 
         // If the application layer protocol has been set up.
         if ($this->protocol !== null) {
+            $parser = $this->protocol;
             while ($this->_recvBuffer !== '' && !$this->_isPaused) {
                 // The current packet length is known.
                 if ($this->_currentPackageLength) {
@@ -622,9 +601,8 @@ class TcpConnection extends ConnectionInterface
                 } else {
                     // Get current package length.
                     try {
-                        $this->_currentPackageLength = $this->protocol::input($this->_recvBuffer, $this);
-                    } catch (\Throwable $e) {
-                    }
+                        $this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
+                    } catch (\Exception $e) {} catch (\Error $e) {}
                     // The packet length is unknown.
                     if ($this->_currentPackageLength === 0) {
                         break;
@@ -644,9 +622,9 @@ class TcpConnection extends ConnectionInterface
                 // The data is enough for a packet.
                 ++self::$statistics['total_request'];
                 // The current packet length is equal to the length of the buffer.
-                if ($one = \strlen($this->_recvBuffer) === $this->_currentPackageLength) {
+                if (\strlen($this->_recvBuffer) === $this->_currentPackageLength) {
                     $one_request_buffer = $this->_recvBuffer;
-                    $this->_recvBuffer = '';
+                    $this->_recvBuffer  = '';
                 } else {
                     // Get a full package from the buffer.
                     $one_request_buffer = \substr($this->_recvBuffer, 0, $this->_currentPackageLength);
@@ -655,17 +633,15 @@ class TcpConnection extends ConnectionInterface
                 }
                 // Reset the current packet length to 0.
                 $this->_currentPackageLength = 0;
+                if (!$this->onMessage) {
+                    continue;
+                }
                 try {
                     // Decode request buffer before Emitting onMessage callback.
-                    $request = $this->protocol::decode($one_request_buffer, $this);
-                    if (static::$_enableCache && (!\is_object($request) || $request instanceof Request) && $one && !isset($one_request_buffer[512])) {
-                        $requests[$one_request_buffer] = $request;
-                        if (\count($requests) > 512) {
-                            unset($requests[\key($requests)]);
-                        }
-                    }
-                    ($this->onMessage)($this, $request);
-                } catch (\Throwable $e) {
+                    \call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
+                } catch (\Exception $e) {
+                    Worker::stopAll(250, $e);
+                } catch (\Error $e) {
                     Worker::stopAll(250, $e);
                 }
             }
@@ -678,9 +654,15 @@ class TcpConnection extends ConnectionInterface
 
         // Applications protocol is not set.
         ++self::$statistics['total_request'];
+        if (!$this->onMessage) {
+            $this->_recvBuffer = '';
+            return;
+        }
         try {
-            ($this->onMessage)($this, $this->_recvBuffer);
-        } catch (\Throwable $e) {
+            \call_user_func($this->onMessage, $this, $this->_recvBuffer);
+        } catch (\Exception $e) {
+            Worker::stopAll(250, $e);
+        } catch (\Error $e) {
             Worker::stopAll(250, $e);
         }
         // Clean receive buffer.
@@ -694,8 +676,7 @@ class TcpConnection extends ConnectionInterface
      */
     public function baseWrite()
     {
-        \set_error_handler(function () {
-        });
+        \set_error_handler(function(){});
         if ($this->transport === 'ssl') {
             $len = @\fwrite($this->_socket, $this->_sendBuffer, 8192);
         } else {
@@ -704,13 +685,15 @@ class TcpConnection extends ConnectionInterface
         \restore_error_handler();
         if ($len === \strlen($this->_sendBuffer)) {
             $this->bytesWritten += $len;
-            Worker::$globalEvent->offWritable($this->_socket);
+            Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
             $this->_sendBuffer = '';
             // Try to emit onBufferDrain callback when the send buffer becomes empty.
             if ($this->onBufferDrain) {
                 try {
-                    ($this->onBufferDrain)($this);
-                } catch (\Throwable $e) {
+                    \call_user_func($this->onBufferDrain, $this);
+                } catch (\Exception $e) {
+                    Worker::stopAll(250, $e);
+                } catch (\Error $e) {
                     Worker::stopAll(250, $e);
                 }
             }
@@ -734,32 +717,31 @@ class TcpConnection extends ConnectionInterface
      * @param resource $socket
      * @return bool
      */
-    public function doSslHandshake($socket)
-    {
+    public function doSslHandshake($socket){
         if (\feof($socket)) {
             $this->destroy();
             return false;
         }
         $async = $this instanceof AsyncTcpConnection;
-
+        
         /**
-         *  We disabled ssl3 because https://blog.qualys.com/ssllabs/2014/10/15/ssl-3-is-dead-killed-by-the-poodle-attack.
-         *  You can enable ssl3 by the codes below.
-         */
+          *  We disabled ssl3 because https://blog.qualys.com/ssllabs/2014/10/15/ssl-3-is-dead-killed-by-the-poodle-attack.
+          *  You can enable ssl3 by the codes below.
+          */
         /*if($async){
             $type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT | STREAM_CRYPTO_METHOD_SSLv3_CLIENT;
         }else{
             $type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER | STREAM_CRYPTO_METHOD_SSLv3_SERVER;
         }*/
-
-        if ($async) {
+        
+        if($async){
             $type = \STREAM_CRYPTO_METHOD_SSLv2_CLIENT | \STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
-        } else {
+        }else{
             $type = \STREAM_CRYPTO_METHOD_SSLv2_SERVER | \STREAM_CRYPTO_METHOD_SSLv23_SERVER;
         }
-
+        
         // Hidden error.
-        \set_error_handler(function ($errno, $errstr, $file) {
+        \set_error_handler(function($errno, $errstr, $file){
             if (!Worker::$daemonize) {
                 Worker::safeEcho("SSL handshake error: $errstr \n");
             }
@@ -776,8 +758,10 @@ class TcpConnection extends ConnectionInterface
         }
         if (isset($this->onSslHandshake)) {
             try {
-                ($this->onSslHandshake)($this);
-            } catch (\Throwable $e) {
+                \call_user_func($this->onSslHandshake, $this);
+            } catch (\Exception $e) {
+                Worker::stopAll(250, $e);
+            } catch (\Error $e) {
                 Worker::stopAll(250, $e);
             }
         }
@@ -792,14 +776,14 @@ class TcpConnection extends ConnectionInterface
      */
     public function pipe(self $dest)
     {
-        $source = $this;
-        $this->onMessage = function ($source, $data) use ($dest) {
+        $source              = $this;
+        $this->onMessage     = function ($source, $data) use ($dest) {
             $dest->send($data);
         };
-        $this->onClose = function ($source) use ($dest) {
+        $this->onClose       = function ($source) use ($dest) {
             $dest->close();
         };
-        $dest->onBufferFull = function ($dest) use ($source) {
+        $dest->onBufferFull  = function ($dest) use ($source) {
             $source->pauseRecv();
         };
         $dest->onBufferDrain = function ($dest) use ($source) {
@@ -827,7 +811,7 @@ class TcpConnection extends ConnectionInterface
      */
     public function close($data = null, $raw = false)
     {
-        if ($this->_status === self::STATUS_CONNECTING) {
+        if($this->_status === self::STATUS_CONNECTING){
             $this->destroy();
             return;
         }
@@ -841,7 +825,7 @@ class TcpConnection extends ConnectionInterface
         }
 
         $this->_status = self::STATUS_CLOSING;
-
+        
         if ($this->_sendBuffer === '') {
             $this->destroy();
         } else {
@@ -869,8 +853,10 @@ class TcpConnection extends ConnectionInterface
         if ($this->maxSendBufferSize <= \strlen($this->_sendBuffer)) {
             if ($this->onBufferFull) {
                 try {
-                    ($this->onBufferFull)($this);
-                } catch (\Throwable $e) {
+                    \call_user_func($this->onBufferFull, $this);
+                } catch (\Exception $e) {
+                    Worker::stopAll(250, $e);
+                } catch (\Error $e) {
                     Worker::stopAll(250, $e);
                 }
             }
@@ -888,8 +874,10 @@ class TcpConnection extends ConnectionInterface
         if ($this->maxSendBufferSize <= \strlen($this->_sendBuffer)) {
             if ($this->onError) {
                 try {
-                    ($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
-                } catch (\Throwable $e) {
+                    \call_user_func($this->onError, $this, \WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
+                } catch (\Exception $e) {
+                    Worker::stopAll(250, $e);
+                } catch (\Error $e) {
                     Worker::stopAll(250, $e);
                 }
             }
@@ -897,7 +885,7 @@ class TcpConnection extends ConnectionInterface
         }
         return false;
     }
-
+    
     /**
      * Whether send buffer is Empty.
      *
@@ -905,7 +893,7 @@ class TcpConnection extends ConnectionInterface
      */
     public function bufferIsEmpty()
     {
-        return empty($this->_sendBuffer);
+    	return empty($this->_sendBuffer);
     }
 
     /**
@@ -920,29 +908,32 @@ class TcpConnection extends ConnectionInterface
             return;
         }
         // Remove event listener.
-        Worker::$globalEvent->offReadable($this->_socket);
-        Worker::$globalEvent->offWritable($this->_socket);
+        Worker::$globalEvent->del($this->_socket, EventInterface::EV_READ);
+        Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
 
         // Close socket.
         try {
             @\fclose($this->_socket);
-        } catch (\Throwable $e) {
-        }
+        } catch (\Exception $e) {} catch (\Error $e) {}
 
         $this->_status = self::STATUS_CLOSED;
         // Try to emit onClose callback.
         if ($this->onClose) {
             try {
-                ($this->onClose)($this);
-            } catch (\Throwable $e) {
+                \call_user_func($this->onClose, $this);
+            } catch (\Exception $e) {
+                Worker::stopAll(250, $e);
+            } catch (\Error $e) {
                 Worker::stopAll(250, $e);
             }
         }
         // Try to emit protocol::onClose
         if ($this->protocol && \method_exists($this->protocol, 'onClose')) {
             try {
-                ([$this->protocol, 'onClose'])($this);
-            } catch (\Throwable $e) {
+                \call_user_func(array($this->protocol, 'onClose'), $this);
+            } catch (\Exception $e) {
+                Worker::stopAll(250, $e);
+            } catch (\Error $e) {
                 Worker::stopAll(250, $e);
             }
         }
@@ -961,16 +952,6 @@ class TcpConnection extends ConnectionInterface
     }
 
     /**
-     * Enable or disable Cache.
-     *
-     * @param mixed $value
-     */
-    public static function enableCache($value)
-    {
-        static::$_enableCache = (bool)$value;
-    }
-
-    /**
      * Destruct.
      *
      * @return void
@@ -988,9 +969,38 @@ class TcpConnection extends ConnectionInterface
                 Worker::log('worker[' . \posix_getpid() . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
             }
 
-            if (0 === self::$statistics['connection_count']) {
+            if(0 === self::$statistics['connection_count']) {
                 Worker::stopAll();
             }
         }
     }
+
+    public function jsonSerialize()
+    {
+        return [
+            'transport' => $this->transport,
+            'id' => $this->id,
+            'status' => $this->getStatus(),
+            'getRemoteIp' => $this->getRemoteIp(),
+            'remotePort' => $this->getRemotePort(),
+            'getRemoteAddress' => $this->getRemoteAddress(),
+            'getLocalIp' => $this->getLocalIp(),
+            'getLocalPort' => $this->getLocalPort(),
+            'getLocalAddress' => $this->getLocalAddress(),
+            'isIpV4' => $this->isIpV4(),
+            'isIpV6' => $this->isIpV6(),
+        ];
+    }
+
+    public function serialize()
+    {
+        return serialize($this->jsonSerialize());
+    }
+
+    public function unserialize(string $data)
+    {
+        // 仅仅打印信息,不做操作,进程数据不可进行改变
+        var_export(sprintf("unserialize %s \n", get_class($this)));
+        var_export(unserialize($data));
+    }
 }