Browse Source

optimizations

walkor 2 năm trước cách đây
mục cha
commit
d51038e2b6

+ 1 - 0
src/Connection/AsyncTcpConnection.php

@@ -180,6 +180,7 @@ class AsyncTcpConnection extends TcpConnection
         $this->maxPackageSize = self::$defaultMaxPackageSize;
         $this->maxPackageSize = self::$defaultMaxPackageSize;
         $this->contextOption = $contextOption;
         $this->contextOption = $contextOption;
         static::$connections[$this->realId] = $this;
         static::$connections[$this->realId] = $this;
+        $this->context = new \stdClass;
     }
     }
 
 
     /**
     /**

+ 4 - 4
src/Protocols/Http.php

@@ -248,7 +248,7 @@ class Http
      */
      */
     protected static function sendStream(TcpConnection $connection, $handler, $offset = 0, $length = 0)
     protected static function sendStream(TcpConnection $connection, $handler, $offset = 0, $length = 0)
     {
     {
-        $connection->bufferFull = false;
+        $connection->context->bufferFull = false;
         $connection->context->streamSending = true;
         $connection->context->streamSending = true;
         if ($offset !== 0) {
         if ($offset !== 0) {
             \fseek($handler, $offset);
             \fseek($handler, $offset);
@@ -257,7 +257,7 @@ class Http
         // Read file content from disk piece by piece and send to client.
         // Read file content from disk piece by piece and send to client.
         $doWrite = function () use ($connection, $handler, $length, $offsetEnd) {
         $doWrite = function () use ($connection, $handler, $length, $offsetEnd) {
             // Send buffer not full.
             // Send buffer not full.
-            while ($connection->bufferFull === false) {
+            while ($connection->context->bufferFull === false) {
                 // Read from disk.
                 // Read from disk.
                 $size = 1024 * 1024;
                 $size = 1024 * 1024;
                 if ($length !== 0) {
                 if ($length !== 0) {
@@ -284,11 +284,11 @@ class Http
         };
         };
         // Send buffer full.
         // Send buffer full.
         $connection->onBufferFull = function ($connection) {
         $connection->onBufferFull = function ($connection) {
-            $connection->bufferFull = true;
+            $connection->context->bufferFull = true;
         };
         };
         // Send buffer drain.
         // Send buffer drain.
         $connection->onBufferDrain = function ($connection) use ($doWrite) {
         $connection->onBufferDrain = function ($connection) use ($doWrite) {
-            $connection->bufferFull = false;
+            $connection->context->bufferFull = false;
             $doWrite();
             $doWrite();
         };
         };
         $doWrite();
         $doWrite();

+ 5 - 9
src/Protocols/Websocket.php

@@ -82,12 +82,14 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             $opcode = $firstbyte & 0xf;
             $opcode = $firstbyte & 0xf;
             switch ($opcode) {
             switch ($opcode) {
                 case 0x0:
                 case 0x0:
-                    break;
                 // Blob type.
                 // Blob type.
                 case 0x1:
                 case 0x1:
-                    break;
                 // Arraybuffer type.
                 // Arraybuffer type.
                 case 0x2:
                 case 0x2:
+                // Ping package.
+                case 0x9:
+                // Pong package.
+                case 0xa:
                     break;
                     break;
                 // Close package.
                 // Close package.
                 case 0x8:
                 case 0x8:
@@ -104,12 +106,6 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                         $connection->close("\x88\x02\x03\xe8", true);
                         $connection->close("\x88\x02\x03\xe8", true);
                     }
                     }
                     return 0;
                     return 0;
-                // Ping package.
-                case 0x9:
-                    break;
-                // Pong package.
-                case 0xa:
-                    break;
                 // Wrong opcode.
                 // Wrong opcode.
                 default :
                 default :
                     Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n");
                     Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n");
@@ -326,7 +322,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
      * @param TcpConnection $connection
      * @param TcpConnection $connection
      * @return int
      * @return int
      */
      */
-    public static function dealHandshake($buffer, TcpConnection $connection)
+    public static function dealHandshake($buffer, ConnectionInterface $connection)
     {
     {
         // HTTP protocol.
         // HTTP protocol.
         if (0 === \strpos($buffer, 'GET')) {
         if (0 === \strpos($buffer, 'GET')) {

+ 46 - 60
src/Protocols/Ws.php

@@ -47,12 +47,12 @@ class Ws
      */
      */
     public static function input($buffer, ConnectionInterface $connection)
     public static function input($buffer, ConnectionInterface $connection)
     {
     {
-        if (empty($connection->handshakeStep)) {
+        if (empty($connection->context->handshakeStep)) {
             Worker::safeEcho("recv data before handshake. Buffer:" . \bin2hex($buffer) . "\n");
             Worker::safeEcho("recv data before handshake. Buffer:" . \bin2hex($buffer) . "\n");
             return false;
             return false;
         }
         }
         // Recv handshake response
         // Recv handshake response
-        if ($connection->handshakeStep === 1) {
+        if ($connection->context->handshakeStep === 1) {
             return self::dealHandshake($buffer, $connection);
             return self::dealHandshake($buffer, $connection);
         }
         }
         $recvLen = \strlen($buffer);
         $recvLen = \strlen($buffer);
@@ -60,9 +60,9 @@ class Ws
             return 0;
             return 0;
         }
         }
         // Buffer websocket frame data.
         // Buffer websocket frame data.
-        if ($connection->websocketCurrentFrameLength) {
+        if ($connection->context->websocketCurrentFrameLength) {
             // We need more frame data.
             // We need more frame data.
-            if ($connection->websocketCurrentFrameLength > $recvLen) {
+            if ($connection->context->websocketCurrentFrameLength > $recvLen) {
                 // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
                 // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
                 return 0;
                 return 0;
             }
             }
@@ -84,12 +84,14 @@ class Ws
 
 
             switch ($opcode) {
             switch ($opcode) {
                 case 0x0:
                 case 0x0:
-                    break;
                 // Blob type.
                 // Blob type.
                 case 0x1:
                 case 0x1:
-                    break;
                 // Arraybuffer type.
                 // Arraybuffer type.
                 case 0x2:
                 case 0x2:
+                // Ping package.
+                case 0x9:
+                    // Pong package.
+                case 0xa:
                     break;
                     break;
                 // Close package.
                 // Close package.
                 case 0x8:
                 case 0x8:
@@ -105,12 +107,6 @@ class Ws
                         $connection->close();
                         $connection->close();
                     }
                     }
                     return 0;
                     return 0;
-                // Ping package.
-                case 0x9:
-                    break;
-                // Pong package.
-                case 0xa:
-                    break;
                 // Wrong opcode.
                 // Wrong opcode.
                 default :
                 default :
                     Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n");
                     Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n");
@@ -134,7 +130,7 @@ class Ws
                 $currentFrameLength = $dataLen + 2;
                 $currentFrameLength = $dataLen + 2;
             }
             }
 
 
-            $totalPackageSize = \strlen($connection->websocketDataBuffer) + $currentFrameLength;
+            $totalPackageSize = \strlen($connection->context->websocketDataBuffer) + $currentFrameLength;
             if ($totalPackageSize > $connection->maxPackageSize) {
             if ($totalPackageSize > $connection->maxPackageSize) {
                 Worker::safeEcho("error package. package_length=$totalPackageSize\n");
                 Worker::safeEcho("error package. package_length=$totalPackageSize\n");
                 $connection->close();
                 $connection->close();
@@ -187,21 +183,21 @@ class Ws
                 }
                 }
                 return $currentFrameLength;
                 return $currentFrameLength;
             } else {
             } else {
-                $connection->websocketCurrentFrameLength = $currentFrameLength;
+                $connection->context->websocketCurrentFrameLength = $currentFrameLength;
             }
             }
         }
         }
         // Received just a frame length data.
         // Received just a frame length data.
-        if ($connection->websocketCurrentFrameLength === $recvLen) {
+        if ($connection->context->websocketCurrentFrameLength === $recvLen) {
             self::decode($buffer, $connection);
             self::decode($buffer, $connection);
-            $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
-            $connection->websocketCurrentFrameLength = 0;
+            $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
+            $connection->context->websocketCurrentFrameLength = 0;
             return 0;
             return 0;
         } // The length of the received data is greater than the length of a frame.
         } // The length of the received data is greater than the length of a frame.
-        elseif ($connection->websocketCurrentFrameLength < $recvLen) {
-            self::decode(\substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
-            $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
-            $currentFrameLength = $connection->websocketCurrentFrameLength;
-            $connection->websocketCurrentFrameLength = 0;
+        elseif ($connection->context->websocketCurrentFrameLength < $recvLen) {
+            self::decode(\substr($buffer, 0, $connection->context->websocketCurrentFrameLength), $connection);
+            $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
+            $currentFrameLength = $connection->context->websocketCurrentFrameLength;
+            $connection->context->websocketCurrentFrameLength = 0;
             // Continue to read next frame.
             // Continue to read next frame.
             return self::input(\substr($buffer, $currentFrameLength), $connection);
             return self::input(\substr($buffer, $currentFrameLength), $connection);
         } // The length of the received data is less than the length of a frame.
         } // The length of the received data is less than the length of a frame.
@@ -223,7 +219,7 @@ class Ws
             $connection->websocketType = self::BINARY_TYPE_BLOB;
             $connection->websocketType = self::BINARY_TYPE_BLOB;
         }
         }
         $payload = (string)$payload;
         $payload = (string)$payload;
-        if (empty($connection->handshakeStep)) {
+        if (empty($connection->context->handshakeStep)) {
             static::sendHandshake($connection);
             static::sendHandshake($connection);
         }
         }
         $mask = 1;
         $mask = 1;
@@ -246,9 +242,9 @@ class Ws
         // append payload to frame:
         // append payload to frame:
         $maskKey = \str_repeat($maskKey, \floor($length / 4)) . \substr($maskKey, 0, $length % 4);
         $maskKey = \str_repeat($maskKey, \floor($length / 4)) . \substr($maskKey, 0, $length % 4);
         $frame .= $payload ^ $maskKey;
         $frame .= $payload ^ $maskKey;
-        if ($connection->handshakeStep === 1) {
+        if ($connection->context->handshakeStep === 1) {
             // If buffer has already full then discard the current package.
             // If buffer has already full then discard the current package.
-            if (\strlen($connection->tmpWebsocketData) > $connection->maxSendBufferSize) {
+            if (\strlen($connection->context->tmpWebsocketData) > $connection->maxSendBufferSize) {
                 if ($connection->onError) {
                 if ($connection->onError) {
                     try {
                     try {
                         ($connection->onError)($connection, ConnectionInterface::SEND_FAIL, 'send buffer full and drop package');
                         ($connection->onError)($connection, ConnectionInterface::SEND_FAIL, 'send buffer full and drop package');
@@ -258,9 +254,9 @@ class Ws
                 }
                 }
                 return '';
                 return '';
             }
             }
-            $connection->tmpWebsocketData = $connection->tmpWebsocketData . $frame;
+            $connection->context->tmpWebsocketData = $connection->context->tmpWebsocketData . $frame;
             // Check buffer is full.
             // Check buffer is full.
-            if ($connection->maxSendBufferSize <= \strlen($connection->tmpWebsocketData)) {
+            if ($connection->maxSendBufferSize <= \strlen($connection->context->tmpWebsocketData)) {
                 if ($connection->onBufferFull) {
                 if ($connection->onBufferFull) {
                     try {
                     try {
                         ($connection->onBufferFull)($connection);
                         ($connection->onBufferFull)($connection);
@@ -292,13 +288,13 @@ class Ws
         } else {
         } else {
             $decodedData = \substr($bytes, 2);
             $decodedData = \substr($bytes, 2);
         }
         }
-        if ($connection->websocketCurrentFrameLength) {
-            $connection->websocketDataBuffer .= $decodedData;
-            return $connection->websocketDataBuffer;
+        if ($connection->context->websocketCurrentFrameLength) {
+            $connection->context->websocketDataBuffer .= $decodedData;
+            return $connection->context->websocketDataBuffer;
         } else {
         } else {
-            if ($connection->websocketDataBuffer !== '') {
-                $decodedData = $connection->websocketDataBuffer . $decodedData;
-                $connection->websocketDataBuffer = '';
+            if ($connection->context->websocketDataBuffer !== '') {
+                $decodedData = $connection->context->websocketDataBuffer . $decodedData;
+                $connection->context->websocketDataBuffer = '';
             }
             }
             return $decodedData;
             return $decodedData;
         }
         }
@@ -321,10 +317,10 @@ class Ws
      */
      */
     public static function onClose($connection)
     public static function onClose($connection)
     {
     {
-        $connection->handshakeStep = null;
-        $connection->websocketCurrentFrameLength = 0;
-        $connection->tmpWebsocketData = '';
-        $connection->websocketDataBuffer = '';
+        $connection->context->handshakeStep = null;
+        $connection->context->websocketCurrentFrameLength = 0;
+        $connection->context->tmpWebsocketData = '';
+        $connection->context->websocketDataBuffer = '';
         if (!empty($connection->websocketPingTimer)) {
         if (!empty($connection->websocketPingTimer)) {
             Timer::del($connection->websocketPingTimer);
             Timer::del($connection->websocketPingTimer);
             $connection->websocketPingTimer = null;
             $connection->websocketPingTimer = null;
@@ -337,9 +333,9 @@ class Ws
      * @param TcpConnection $connection
      * @param TcpConnection $connection
      * @return void
      * @return void
      */
      */
-    public static function sendHandshake(TcpConnection $connection)
+    public static function sendHandshake(ConnectionInterface $connection)
     {
     {
-        if (!empty($connection->handshakeStep)) {
+        if (!empty($connection->context->handshakeStep)) {
             return;
             return;
         }
         }
         // Get Host.
         // Get Host.
@@ -364,14 +360,14 @@ class Ws
             "Connection: Upgrade\r\n" .
             "Connection: Upgrade\r\n" .
             "Upgrade: websocket\r\n" .
             "Upgrade: websocket\r\n" .
             (isset($connection->websocketOrigin) ? "Origin: " . $connection->websocketOrigin . "\r\n" : '') .
             (isset($connection->websocketOrigin) ? "Origin: " . $connection->websocketOrigin . "\r\n" : '') .
-            (isset($connection->WSClientProtocol) ? "Sec-WebSocket-Protocol: " . $connection->WSClientProtocol . "\r\n" : '') .
+            (isset($connection->websocketClientProtocol) ? "Sec-WebSocket-Protocol: " . $connection->websocketClientProtocol . "\r\n" : '') .
             "Sec-WebSocket-Version: 13\r\n" .
             "Sec-WebSocket-Version: 13\r\n" .
             "Sec-WebSocket-Key: " . $connection->websocketSecKey . $userHeaderStr . "\r\n\r\n";
             "Sec-WebSocket-Key: " . $connection->websocketSecKey . $userHeaderStr . "\r\n\r\n";
         $connection->send($header, true);
         $connection->send($header, true);
-        $connection->handshakeStep = 1;
-        $connection->websocketCurrentFrameLength = 0;
-        $connection->websocketDataBuffer = '';
-        $connection->tmpWebsocketData = '';
+        $connection->context->handshakeStep = 1;
+        $connection->context->websocketCurrentFrameLength = 0;
+        $connection->context->websocketDataBuffer = '';
+        $connection->context->tmpWebsocketData = '';
     }
     }
 
 
     /**
     /**
@@ -381,7 +377,7 @@ class Ws
      * @param TcpConnection $connection
      * @param TcpConnection $connection
      * @return int
      * @return int
      */
      */
-    public static function dealHandshake($buffer, TcpConnection $connection)
+    public static function dealHandshake($buffer, ConnectionInterface $connection)
     {
     {
         $pos = \strpos($buffer, "\r\n\r\n");
         $pos = \strpos($buffer, "\r\n\r\n");
         if ($pos) {
         if ($pos) {
@@ -402,10 +398,10 @@ class Ws
 
 
             // Get WebSocket subprotocol (if specified by server)
             // Get WebSocket subprotocol (if specified by server)
             if (\preg_match("/Sec-WebSocket-Protocol: *(.*?)\r\n/i", $buffer, $match)) {
             if (\preg_match("/Sec-WebSocket-Protocol: *(.*?)\r\n/i", $buffer, $match)) {
-                $connection->WSServerProtocol = \trim($match[1]);
+                $connection->websocketServerProtocol = \trim($match[1]);
             }
             }
 
 
-            $connection->handshakeStep = 2;
+            $connection->context->handshakeStep = 2;
             $handshakeResponseLength = $pos + 4;
             $handshakeResponseLength = $pos + 4;
             // Try to emit onWebSocketConnect callback.
             // Try to emit onWebSocketConnect callback.
             if (isset($connection->onWebSocketConnect)) {
             if (isset($connection->onWebSocketConnect)) {
@@ -426,9 +422,9 @@ class Ws
             }
             }
 
 
             $connection->consumeRecvBuffer($handshakeResponseLength);
             $connection->consumeRecvBuffer($handshakeResponseLength);
-            if (!empty($connection->tmpWebsocketData)) {
-                $connection->send($connection->tmpWebsocketData, true);
-                $connection->tmpWebsocketData = '';
+            if (!empty($connection->context->tmpWebsocketData)) {
+                $connection->send($connection->context->tmpWebsocketData, true);
+                $connection->context->tmpWebsocketData = '';
             }
             }
             if (\strlen($buffer) > $handshakeResponseLength) {
             if (\strlen($buffer) > $handshakeResponseLength) {
                 return self::input(\substr($buffer, $handshakeResponseLength), $connection);
                 return self::input(\substr($buffer, $handshakeResponseLength), $connection);
@@ -437,14 +433,4 @@ class Ws
         return 0;
         return 0;
     }
     }
 
 
-    public static function WSSetProtocol($connection, $params)
-    {
-        $connection->WSClientProtocol = $params[0];
-    }
-
-    public static function WSGetServerProtocol($connection)
-    {
-        return (\property_exists($connection, 'WSServerProtocol') ? $connection->WSServerProtocol : null);
-    }
-
 }
 }

+ 12 - 16
src/Worker.php

@@ -342,7 +342,7 @@ class Worker
     protected $socketName = '';
     protected $socketName = '';
 
 
     /**
     /**
-     * parse from _socketName avoid parse again in master or worker
+     * parse from socketName avoid parse again in master or worker
      * LocalSocket The format is like tcp://0.0.0.0:8080
      * LocalSocket The format is like tcp://0.0.0.0:8080
      * @var string
      * @var string
      */
      */
@@ -545,6 +545,7 @@ class Worker
      * Run all worker instances.
      * Run all worker instances.
      *
      *
      * @return void
      * @return void
+     * @throws Exception
      */
      */
     public static function runAll()
     public static function runAll()
     {
     {
@@ -843,12 +844,12 @@ class Worker
     public static function getUiColumns()
     public static function getUiColumns()
     {
     {
         return [
         return [
-            'proto'     =>  'transport',
-            'user'      =>  'user',
-            'worker'    =>  'name',
-            'socket'    =>  'socket',
-            'processes' =>  'count',
-            'state'    =>  'state',
+            'proto'     => 'transport',
+            'user'      => 'user',
+            'worker'    => 'name',
+            'socket'    => 'socket',
+            'processes' => 'count',
+            'state'     => 'state',
         ];
         ];
     }
     }
 
 
@@ -866,7 +867,7 @@ class Worker
             $totalLength += static::$$key + static::UI_SAFE_LENGTH;
             $totalLength += static::$$key + static::UI_SAFE_LENGTH;
         }
         }
 
 
-        //keep beauty when show less colums
+        //Keep beauty when show less columns
         !\defined('LINE_VERSIOIN_LENGTH') && \define('LINE_VERSIOIN_LENGTH', 0);
         !\defined('LINE_VERSIOIN_LENGTH') && \define('LINE_VERSIOIN_LENGTH', 0);
         $totalLength <= LINE_VERSIOIN_LENGTH && $totalLength = LINE_VERSIOIN_LENGTH;
         $totalLength <= LINE_VERSIOIN_LENGTH && $totalLength = LINE_VERSIOIN_LENGTH;
 
 
@@ -965,7 +966,6 @@ class Worker
                     }
                     }
                     static::safeEcho("\nPress Ctrl+C to quit.\n\n");
                     static::safeEcho("\nPress Ctrl+C to quit.\n\n");
                 }
                 }
-                exit(0);
             case 'connections':
             case 'connections':
                 if (\is_file($statisticsFile) && \is_writable($statisticsFile)) {
                 if (\is_file($statisticsFile) && \is_writable($statisticsFile)) {
                     \unlink($statisticsFile);
                     \unlink($statisticsFile);
@@ -1302,10 +1302,6 @@ class Worker
             return static::$eventLoopClass;
             return static::$eventLoopClass;
         }
         }
 
 
-        if (!class_exists(\Swoole\Event::class, false)) {
-            unset(static::$availableEventLoops['swoole']);
-        }
-
         $loopName = '';
         $loopName = '';
         foreach (static::$availableEventLoops as $name => $class) {
         foreach (static::$availableEventLoops as $name => $class) {
             if (\extension_loaded($name)) {
             if (\extension_loaded($name)) {
@@ -1658,7 +1654,7 @@ class Worker
                         unset(static::$pidMap[$workerId][$pid]);
                         unset(static::$pidMap[$workerId][$pid]);
 
 
                         // Mark id is available.
                         // Mark id is available.
-                        $id                              = static::getId($workerId, $pid);
+                        $id = static::getId($workerId, $pid);
                         static::$idMap[$workerId][$id] = 0;
                         static::$idMap[$workerId][$id] = 0;
 
 
                         break;
                         break;
@@ -1721,6 +1717,7 @@ class Worker
      * Execute reload.
      * Execute reload.
      *
      *
      * @return void
      * @return void
+     * @throws Exception
      */
      */
     protected static function reload()
     protected static function reload()
     {
     {
@@ -2082,7 +2079,6 @@ class Worker
      */
      */
     protected static function getErrorType($type)
     protected static function getErrorType($type)
     {
     {
-
         return self::ERROR_TYPE[$type] ?? '';
         return self::ERROR_TYPE[$type] ?? '';
     }
     }
 
 
@@ -2289,7 +2285,7 @@ class Worker
      */
      */
     protected function parseSocketAddress() {
     protected function parseSocketAddress() {
         if (!$this->socketName) {
         if (!$this->socketName) {
-            return;
+            return null;
         }
         }
         // Get the application layer communication protocol and listening address.
         // Get the application layer communication protocol and listening address.
         list($scheme, $address) = \explode(':', $this->socketName, 2);
         list($scheme, $address) = \explode(':', $this->socketName, 2);