walkor 9 년 전
부모
커밋
9afdc412f7
2개의 변경된 파일213개의 추가작업 그리고 129개의 파일을 삭제
  1. 13 7
      Protocols/Websocket.php
  2. 200 122
      Protocols/Ws.php

+ 13 - 7
Protocols/Websocket.php

@@ -149,7 +149,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                 if ($head_len > $recv_len) {
                     return 0;
                 }
-                $pack     = unpack('ntotal_len', substr($buffer, 2, 2));
+                $pack     = unpack('nn/ntotal_len', $buffer);
                 $data_len = $pack['total_len'];
             } else {
                 if ($data_len === 127) {
@@ -157,8 +157,8 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                     if ($head_len > $recv_len) {
                         return 0;
                     }
-                    $arr      = unpack('N2', substr($buffer, 2, 8));
-                    $data_len = $arr[1] * 4294967296 + $arr[2];
+                    $arr      = unpack('n/N2c', $buffer);
+                    $data_len = $arr['c1']*4294967296 + $arr['c2'];
                 }
             }
             $current_frame_length = $head_len + $data_len;
@@ -170,7 +170,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
         }
 
         // Received just a frame length data.
-        if ($connection->websocketCurrentFrameLength == $recv_len) {
+        if ($connection->websocketCurrentFrameLength === $recv_len) {
             self::decode($buffer, $connection);
             $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
             $connection->websocketCurrentFrameLength = 0;
@@ -258,8 +258,10 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             $connection->websocketDataBuffer .= $decoded;
             return $connection->websocketDataBuffer;
         } else {
-            $decoded                         = $connection->websocketDataBuffer . $decoded;
-            $connection->websocketDataBuffer = '';
+            if ($connection->websocketDataBuffer !== '') {
+                $decoded                         = $connection->websocketDataBuffer . $decoded;
+                $connection->websocketDataBuffer = '';
+            }
             return $decoded;
         }
     }
@@ -280,6 +282,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             if (!$heder_end_pos) {
                 return 0;
             }
+            $header_length = $heder_end_pos + 4;
 
             // Get Sec-WebSocket-Key.
             $Sec_WebSocket_Key = '';
@@ -308,7 +311,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             // Current websocket frame data.
             $connection->websocketCurrentFrameBuffer = '';
             // Consume handshake data.
-            $connection->consumeRecvBuffer(strlen($buffer));
+            $connection->consumeRecvBuffer($header_length);
             // Send handshake response.
             $connection->send($handshake_message, true);
 
@@ -332,6 +335,9 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                 }
                 $_GET = $_COOKIE = $_SERVER = array();
             }
+            if (strlen($buffer) > $header_length) {
+                return self::input(substr($buffer, $header_length), $connection);
+            } 
             return 0;
         } // Is flash policy-file-request.
         elseif (0 === strpos($buffer, '<polic')) {

+ 200 - 122
Protocols/Ws.php

@@ -7,6 +7,13 @@ namespace Workerman\Protocols;
 class Ws
 {
     /**
+     * Minimum head length of websocket protocol.
+     *
+     * @var int
+     */
+    const MIN_HEAD_LEN = 2;
+
+    /**
      * Websocket blob type.
      *
      * @var string
@@ -35,124 +42,131 @@ class Ws
         }
         // Recv handshake response
         if ($connection->handshakeStep === 1) {
-            $pos = strpos($buffer, "\r\n\r\n");
-            if ($pos) {
-                // handshake complete
-                $connection->handshakeStep = 2;
-                $handshake_respnse_length = $pos + 4;
-                // Try to emit onWebSocketConnect callback.
-                if (isset($connection->onWebSocketConnect)) {
-                    try {
-                        call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_respnse_length));
-                    } catch (\Exception $e) {
-                        echo $e;
-                        exit(250);
-                    }
-                }
-                // Headbeat.
-                if (!empty($connection->websocketPingInterval)) {
-                    $connection->websocketPingTimer = \Workerman\Lib\Timer::add($connection->websocketPingInterval, function() use ($connection){
-                        if (false === $connection->send(pack('H*', '8900'), true)) {
-                            \Workerman\Lib\Timer::del($connection->websocketPingTimer);
-                        }
-                    });
-                }
-
-                $connection->consumeRecvBuffer($handshake_respnse_length);
-                if (!empty($connection->tmpWebsocketData)) {
-                    $connection->send($connection->tmpWebsocketData, true);
-                }
-                if (strlen($buffer > $handshake_respnse_length)) {
-                    return self::input(substr($buffer, $handshake_respnse_length));
-                }
-            }
-            return 0;
+            return self::dealHandshake($buffer, $connection);
         }
-        if (strlen($buffer) < 2) {
+        $recv_len = strlen($buffer);
+        if ($recv_len < self::MIN_HEAD_LEN) {
             return 0;
         }
-        $opcode = ord($buffer[0]) & 0xf;
-        $data_len = ord($buffer[1]) & 127;
-        switch ($opcode) {
-            // Continue.
-            case 0x0:
-                break;
-            // Blob type.
-            case 0x1:
-                break;
-            // Arraybuffer type.
-            case 0x2:
-                break;
-            // Close package.
-            case 0x8:
-                // Try to emit onWebSocketClose callback.
-                if (isset($connection->onWebSocketClose)) {
-                    try {
-                        call_user_func($connection->onWebSocketClose, $connection);
-                    } catch (\Exception $e) {
-                        echo $e;
-                        exit(250);
-                    }
-                } else {
-                    // Close connection.
-                    $connection->close();
-                }
+        // Buffer websocket frame data.
+        if ($connection->websocketCurrentFrameLength) {
+            // We need more frame data.
+            if ($connection->websocketCurrentFrameLength > $recv_len) {
+                // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
                 return 0;
-            // Ping package.
-            case 0x9:
-                // Try to emit onWebSocketPing callback.
-                if (isset($connection->onWebSocketPing)) {
-                    try {
-                        call_user_func($connection->onWebSocketPing, $connection);
-                    } catch (\Exception $e) {
-                        echo $e;
-                        exit(250);
+            }
+        } else {
+            $data_len     = ord($buffer[1]) & 127;
+            $firstbyte    = ord($buffer[0]);
+            $is_fin_frame = $firstbyte >> 7;
+            $opcode       = $firstbyte & 0xf;
+            switch ($opcode) {
+                case 0x0:
+                    break;
+                // Blob type.
+                case 0x1:
+                    break;
+                // Arraybuffer type.
+                case 0x2:
+                    break;
+                // Close package.
+                case 0x8:
+                    // Try to emit onWebSocketClose callback.
+                    if (isset($connection->onWebSocketClose)) {
+                        try {
+                            call_user_func($connection->onWebSocketClose, $connection);
+                        } catch (\Exception $e) {
+                            echo $e;
+                            exit(250);
+                        }
+                    } // Close connection.
+                    else {
+                        $connection->close();
                     }
-                } else {
-                    // Send pong package to remote.
-                    $connection->send(pack('H*', '8a00'), true);
-                }
-                // Consume data from receive buffer.
-                if (!$data_len) {
-                    $connection->consumeRecvBuffer(2);
                     return 0;
-                }
-                break;
-            // Pong package.
-            case 0xa:
-                // Try to emit onWebSocketPong callback.
-                if (isset($connection->onWebSocketPong)) {
-                    try {
-                        call_user_func($connection->onWebSocketPong, $connection);
-                    } catch (\Exception $e) {
-                        echo $e;
-                        exit(250);
+                // Ping package.
+                case 0x9:
+                    // Try to emit onWebSocketPing callback.
+                    if (isset($connection->onWebSocketPing)) {
+                        try {
+                            call_user_func($connection->onWebSocketPing, $connection);
+                        } catch (\Exception $e) {
+                            echo $e;
+                            exit(250);
+                        }
+                    } // Send pong package to client.
+                    else {
+                        $connection->send(pack('H*', '8a00'), true);
+                    }
+                    // Consume data from receive buffer.
+                    if (!$data_len) {
+                        $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
+                        return 0;
+                    }
+                    break;
+                // Pong package.
+                case 0xa:
+                    // Try to emit onWebSocketPong callback.
+                    if (isset($connection->onWebSocketPong)) {
+                        try {
+                            call_user_func($connection->onWebSocketPong, $connection);
+                        } catch (\Exception $e) {
+                            echo $e;
+                            exit(250);
+                        }
                     }
+                    //  Consume data from receive buffer.
+                    if (!$data_len) {
+                        $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
+                        return 0;
+                    }
+                    break;
+                // Wrong opcode. 
+                default :
+                    echo "error opcode $opcode and close websocket connection\n";
+                    $connection->close();
+                    return 0;
+            }
+            // Calculate packet length.
+            if ($data_len === 126) {
+                if (strlen($buffer) < 6) {
+                    return 0;
                 }
-                //  Consume data from receive buffer.
-                if (!$data_len) {
-                    $connection->consumeRecvBuffer(2);
+                $pack = unpack('nn/ntotal_len', $buffer);
+                $current_frame_length = $pack['total_len'] + 4;
+            } else if ($data_len === 127) {
+                if (strlen($buffer) < 10) {
                     return 0;
                 }
-                break;
-        }
-
-        if ($data_len === 126) {
-            if (strlen($buffer) < 6) {
-                return 0;
+                $arr = unpack('n/N2c', $buffer);
+                $current_frame_length = $arr['c1']*4294967296 + $arr['c2'] + 10;
+            } else {
+                $current_frame_length = $data_len + 2;
             }
-            $pack = unpack('nn/ntotal_len', $buffer);
-            $data_len = $pack['total_len'] + 4;
-        } else if ($data_len === 127) {
-            if (strlen($buffer) < 10) {
-                return 0;
+            if ($is_fin_frame) {
+                return $current_frame_length;
+            } else {
+                $connection->websocketCurrentFrameLength = $current_frame_length;
             }
-            $arr = unpack('n/N2c', $buffer);
-            $data_len = $arr['c1']*4294967296 + $arr['c2'] + 10;
-        } else {
-            $data_len += 2;
         }
-        return $data_len;
+        // Received just a frame length data.
+        if ($connection->websocketCurrentFrameLength === $recv_len) {
+            self::decode($buffer, $connection);
+            $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
+            $connection->websocketCurrentFrameLength = 0;
+            return 0;
+        } // The length of the received data is greater than the length of a frame.
+        elseif ($connection->websocketCurrentFrameLength < $recv_len) {
+            self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
+            $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
+            $current_frame_length                    = $connection->websocketCurrentFrameLength;
+            $connection->websocketCurrentFrameLength = 0;
+            // Continue to read next frame.
+            return self::input(substr($buffer, $current_frame_length), $connection);
+        } // The length of the received data is less than the length of a frame.
+        else {
+            return 0;
+        }
     }
 
     /**
@@ -165,22 +179,7 @@ class Ws
     public static function encode($payload, $connection)
     {
         if (empty($connection->handshakeStep)) {
-            // Get Host.
-            $port = $connection->getRemotePort();
-            $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
-            // Handshake header.
-            $header = "GET / HTTP/1.1\r\n".
-            "Host: $host\r\n".
-            "Connection: Upgrade\r\n".
-            "Upgrade: websocket\r\n".
-            "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
-            "Sec-WebSocket-Version: 13\r\n".
-            "Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
-            $connection->send($header, true);
-            $connection->handshakeStep = 1;
-            if (empty($connection->websocketType)) {
-                $connection->websocketType = self::BINARY_TYPE_BLOB;
-            }
+            self::sendHandshake($connection);
         }
         $mask = 1;
         $mask_key = "\x00\x00\x00\x00";
@@ -245,7 +244,86 @@ class Ws
                 $decoded_data = substr($bytes, 2);
             }
         }
-        return $decoded_data;
+        if ($connection->websocketCurrentFrameLength) {
+            $connection->websocketDataBuffer .= $decoded_data;
+            return $connection->websocketDataBuffer;
+        } else {
+            if ($connection->websocketDataBuffer !== '') {
+                $decoded_data                    = $connection->websocketDataBuffer . $decoded_data;
+                $connection->websocketDataBuffer = '';
+            }
+            return $decoded_data;
+        }
+    }
+
+    /**
+     * Send websocket handshake.
+     *
+     * @param \Workerman\Connection\TcpConnection $connection
+     * @return void 
+     */
+    public static function sendHandshake($connection)
+    {
+        // Get Host.
+        $port = $connection->getRemotePort();
+        $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
+        // Handshake header.
+        $header = "GET / HTTP/1.1\r\n".
+        "Host: $host\r\n".
+        "Connection: Upgrade\r\n".
+        "Upgrade: websocket\r\n".
+        "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
+        "Sec-WebSocket-Version: 13\r\n".
+        "Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
+        $connection->send($header, true);
+        $connection->handshakeStep               = 1;
+        $connection->websocketCurrentFrameLength = 0;
+        $connection->websocketDataBuffer  = '';
+        if (empty($connection->websocketType)) {
+            $connection->websocketType = self::BINARY_TYPE_BLOB;
+        }
     }
 
+    /**
+     * Websocket handshake.
+     *
+     * @param string                              $buffer
+     * @param \Workerman\Connection\TcpConnection $connection
+     * @return int
+     */
+    public static function dealHandshake($buffer, $connection)
+    {
+        $pos = strpos($buffer, "\r\n\r\n");
+        if ($pos) {
+            // handshake complete
+            $connection->handshakeStep = 2;
+            $handshake_respnse_length = $pos + 4;
+            // Try to emit onWebSocketConnect callback.
+            if (isset($connection->onWebSocketConnect)) {
+                try {
+                    call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_respnse_length));
+                } catch (\Exception $e) {
+                    echo $e;
+                    exit(250);
+                }
+            }
+            // Headbeat.
+            if (!empty($connection->websocketPingInterval)) {
+                $connection->websocketPingTimer = \Workerman\Lib\Timer::add($connection->websocketPingInterval, function() use ($connection){
+                    if (false === $connection->send(pack('H*', '8900'), true)) {
+                        \Workerman\Lib\Timer::del($connection->websocketPingTimer);
+                    }
+                });
+            }
+
+            $connection->consumeRecvBuffer($handshake_respnse_length);
+            if (!empty($connection->tmpWebsocketData)) {
+                $connection->send($connection->tmpWebsocketData, true);
+            }
+            if (strlen($buffer > $handshake_respnse_length)) {
+                return self::input(substr($buffer, $handshake_respnse_length));
+            }
+        }
+        return 0;
+    }
 }