Ver código fonte

websocket frame fin=0 support

walkor 10 anos atrás
pai
commit
dc4ca1872d
1 arquivos alterados com 140 adições e 72 exclusões
  1. 140 72
      Workerman/Protocols/Websocket.php

+ 140 - 72
Workerman/Protocols/Websocket.php

@@ -42,89 +42,144 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
         }
         
         // 还没有握手
-        if(empty($connection->handshake))
+        if(empty($connection->websocketHandshake))
         {
             return self::dealHandshake($buffer, $connection);
         }
         
-        $data_len = ord($buffer[1]) & 127;
-        
-        $opcode = ord($buffer[0]) & 0xf;
-        switch($opcode)
+        // $connection->websocketCurrentFrameLength有值说明当前fin为0,则缓冲websocket帧数据
+        if($connection->websocketCurrentFrameLength)
         {
-            // 附加数据帧 @todo 实现附加数据帧
-            case 0x0:
-                break;
-            // 文本数据帧
-            case 0x1:
-                break;
-            // 二进制数据帧
-            case 0x2:
-                break;
-            // 关闭的包
-            case 0x8:
-                // 如果有设置onWebSocketClose回调,尝试执行
-                if(isset($connection->onWebSocketClose))
-                {
-                    call_user_func($connection->onWebSocketClose, $connection);
-                }
-                // 默认行为是关闭连接
-                else
-                {
-                    $connection->close();
-                }
+            // 如果当前帧数据未收全,则继续收
+            if($connection->websocketCurrentFrameLength > $recv_len)
+            {
+                // 返回0,因为不清楚完整的数据包长度,需要等待fin=1的帧
                 return 0;
-            // ping的包
-            case 0x9:
-                // 如果有设置onWebSocketPing回调,尝试执行
-                if(isset($connection->onWebSocketPing))
-                {
-                    call_user_func($connection->onWebSocketPing, $connection);
-                }
-                // 默认发送pong
-                else 
-                {
-                    $connection->send(pack('H*', '8a00'), true);
-                }
-                // 从接受缓冲区中消费掉该数据包
-                if(!$data_len)
-                {
-                    $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
+            }
+        }
+        else 
+        {
+            $data_len = ord($buffer[1]) & 127;
+            $firstbyte = ord($buffer[0]);
+            $is_fin_frame = $firstbyte>>7;
+            $opcode = $firstbyte & 0xf;
+            switch($opcode)
+            {
+                // 附加数据帧 @todo 实现附加数据帧
+                case 0x0:
+                    break;
+                // 文本数据帧
+                case 0x1:
+                    break;
+                // 二进制数据帧
+                case 0x2:
+                    break;
+                // 关闭的包
+                case 0x8:
+                    // 如果有设置onWebSocketClose回调,尝试执行
+                    if(isset($connection->onWebSocketClose))
+                    {
+                        call_user_func($connection->onWebSocketClose, $connection);
+                    }
+                    // 默认行为是关闭连接
+                    else
+                    {
+                        $connection->close();
+                    }
                     return 0;
-                }
-                break;
-            // pong的包
-            case 0xa:
-                // 如果有设置onWebSocketPong回调,尝试执行
-                if(isset($connection->onWebSocketPong))
+                // ping的包
+                case 0x9:
+                    // 如果有设置onWebSocketPing回调,尝试执行
+                    if(isset($connection->onWebSocketPing))
+                    {
+                        call_user_func($connection->onWebSocketPing, $connection);
+                    }
+                    // 默认发送pong
+                    else 
+                    {
+                        $connection->send(pack('H*', '8a00'), true);
+                    }
+                    // 从接受缓冲区中消费掉该数据包
+                    if(!$data_len)
+                    {
+                        $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
+                        return 0;
+                    }
+                    break;
+                // pong的包
+                case 0xa:
+                    // 如果有设置onWebSocketPong回调,尝试执行
+                    if(isset($connection->onWebSocketPong))
+                    {
+                        call_user_func($connection->onWebSocketPong, $connection);
+                    }
+                    // 从接受缓冲区中消费掉该数据包
+                    if(!$data_len)
+                    {
+                        $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
+                        return 0;
+                    }
+                    break;
+                // 错误的opcode 
+                default :
+                    echo "error opcode $opcode and close websocket connection\n";
+                    $connection->close();
+                    return 0;
+            }
+            
+            // websocket二进制数据
+            $head_len = self::MIN_HEAD_LEN;
+            if ($data_len === 126) {
+                $head_len = 8;
+                if($head_len > $recv_len)
                 {
-                    call_user_func($connection->onWebSocketPong, $connection);
+                    return 0;
                 }
-                // 从接受缓冲区中消费掉该数据包
-                if(!$data_len)
+                $pack = unpack('ntotal_len', substr($buffer, 2, 2));
+                $data_len = $pack['total_len'];
+            } else if ($data_len === 127) {
+                $head_len = 14;
+                if($head_len > $recv_len)
                 {
-                    $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
                     return 0;
                 }
-                break;
-            // 错误的opcode 
-            default :
-                $connection->close();
-                return 0;
+                $arr = unpack('N2', substr($buffer, 2, 8));
+                $data_len = $arr[1]*4294967296 + $arr[2];
+            }
+            $current_frame_length = $head_len + $data_len;
+            if($is_fin_frame)
+            {
+                return $current_frame_length;
+            }
+            else
+            {
+                $connection->websocketCurrentFrameLength = $current_frame_length;
+            }
         }
         
-        // websocket二进制数据
-        $head_len = self::MIN_HEAD_LEN;
-        if ($data_len === 126) {
-            $pack = unpack('ntotal_len', substr($buffer, 2, 2));
-            $data_len = $pack['total_len'];
-            $head_len = 8;
-        } else if ($data_len === 127) {
-            $arr = unpack('N2', substr($buffer, 2, 8));
-            $data_len = $arr[1]*4294967296 + $arr[2];
-            $head_len = 14;
+        // 收到的数据刚好是一个frame
+        if($connection->websocketCurrentFrameLength == $recv_len)
+        {
+            self::decode($buffer, $connection);
+            $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
+            $connection->websocketCurrentFrameLength = 0;
+            return 0;
+        }
+        // 收到的数据大于一个frame
+        elseif($connection->websocketCurrentFrameLength < $recv_len)
+        {
+            self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
+            $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
+            $current_frame_length = $connection->websocketCurrentFrameLength;
+            $connection->websocketCurrentFrameLength = 0;
+            // 继续读取下一个frame
+            return self::input(substr($buffer, $current_frame_length), $connection);
+        }
+        // 收到的数据不足一个frame
+        else
+        {
+            return 0;
         }
-        return $head_len + $data_len;
     }
     
     /**
@@ -136,13 +191,13 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
     {
         $len = strlen($buffer);
         // 还没握手不能发数据
-        if(empty($connection->handshake))
+        if(empty($connection->websocketHandshake))
         {
             $connection->send("HTTP/1.1 400 Bad Request\r\n\r\n<b>400 Bad Request</b><br>Send data before handshake. ", true);
             $connection->close();
             return false;
         }
-       $first_byte = $connection->websocketType;
+        $first_byte = $connection->websocketType;
         
         if($len<=125)
         {
@@ -180,7 +235,17 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
         for ($index = 0; $index < strlen($data); $index++) {
             $decoded .= $data[$index] ^ $masks[$index % 4];
         }
-        return $decoded;
+        if($connection->websocketCurrentFrameLength)
+        {
+            $connection->websocketDataBuffer .= $decoded;
+            return $connection->websocketDataBuffer;
+        }
+        else
+        {
+            $decoded = $connection->websocketDataBuffer . $decoded;
+            $connection->websocketDataBuffer = '';
+            return $decoded;
+        }
     }
     
     /**
@@ -220,7 +285,10 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
             $new_message .= "Sec-WebSocket-Version: 13\r\n";
             $new_message .= "Connection: Upgrade\r\n";
             $new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";
-            $connection->handshake = true;
+            $connection->websocketHandshake = true;
+            $connection->websocketDataBuffer = '';
+            $connection->websocketCurrentFrameLength = 0;
+            $connection->websocketCurrentFrameBuffer = '';
             $connection->consumeRecvBuffer(strlen($buffer));
             $connection->send($new_message, true);
             // blob or arraybuffer