浏览代码

Websocket protocol support ping pong arraybuffer

walkor 10 年之前
父节点
当前提交
ed6692ac6f
共有 1 个文件被更改,包括 97 次插入11 次删除
  1. 97 11
      Workerman/Protocols/Websocket.php

+ 97 - 11
Workerman/Protocols/Websocket.php

@@ -10,6 +10,24 @@ use Workerman\Connection\ConnectionInterface;
 class Websocket implements \Workerman\Protocols\ProtocolInterface
 class Websocket implements \Workerman\Protocols\ProtocolInterface
 {
 {
     /**
     /**
+     * websocket头部最小长度
+     * @var int
+     */
+    const MIN_HEAD_LEN = 6;
+    
+    /**
+     * websocket blob类型
+     * @var char
+     */
+    const BINARY_TYPE_BLOB = 0x81;
+
+    /**
+     * websocket arraybuffer类型
+     * @var char
+     */
+    const BINARY_TYPE_ARRAYBUFFER = 0x82;
+    
+    /**
      * 检查包的完整性
      * 检查包的完整性
      * @param string $buffer
      * @param string $buffer
      */
      */
@@ -18,7 +36,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
         // 数据长度
         // 数据长度
         $recv_len = strlen($buffer);
         $recv_len = strlen($buffer);
         // 长度不够
         // 长度不够
-        if($recv_len < 6)
+        if($recv_len < self::MIN_HEAD_LEN)
         {
         {
             return 0;
             return 0;
         }
         }
@@ -51,6 +69,14 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                 $connection->handshake = true;
                 $connection->handshake = true;
                 $connection->consumeRecvBuffer(strlen($buffer));
                 $connection->consumeRecvBuffer(strlen($buffer));
                 $connection->send($new_message, true);
                 $connection->send($new_message, true);
+                $connection->protocolData = array(
+                    'binaryType' => self::BINARY_TYPE_BLOB, // blob or arraybuffer
+                );
+                // 如果有设置onWebSocketConnect回调,尝试执行
+                if(isset($connection->onWebSocketConnect))
+                {
+                    call_user_func(array($connection, 'onWebSocketConnect'), $connection);
+                }
                 return 0;
                 return 0;
             }
             }
             // 如果是flash的policy-file-request
             // 如果是flash的policy-file-request
@@ -65,21 +91,79 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
                 $connection->consumeRecvBuffer(strlen($buffer));
                 $connection->consumeRecvBuffer(strlen($buffer));
                 return 0;
                 return 0;
             }
             }
-            // error
+            // 出错
             $connection->close();
             $connection->close();
             return 0;
             return 0;
         }
         }
         
         
-        // close package
-        if((ord($buffer[0]) & 0xf) == 8)
+        $data_len = ord($buffer[1]) & 127;
+        
+        $opcode = ord($buffer[0]) & 0xf;
+        switch($opcode)
         {
         {
-            $connection->close();
-            return 0;
+            // 附加数据帧 @todo 实现附加数据帧
+            case 0x0:
+                break;
+            // 文本数据帧
+            case 0x1:
+                break;
+            // 二进制数据帧
+            case 0x2:
+                break;
+            // 关闭的包
+            case 0x8:
+                // 如果有设置onWebSocketClose回调,尝试执行
+                if(isset($connection->onWebSocketClose))
+                {
+                    call_user_func(array($connection, 'onWebSocketClose'), $connection);
+                }
+                // 默认行为是关闭连接
+                else
+                {
+                    $connection->close();
+                }
+                return 0;
+            // ping的包
+            case 0x9:
+                // 如果有设置onWebSocketPing回调,尝试执行
+                if(isset($connection->onWebSocketPing))
+                {
+                    call_user_func(array($connection, 'onWebSocketPing'), $connection);
+                }
+                // 默认发送pong
+                else 
+                {
+                    $connection->send(pack('H*', '8a00'));
+                }
+                // 从接受缓冲区中消费掉该数据包
+                if(!$data_len)
+                {
+                    $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
+                    return 0;
+                }
+                break;
+            // pong的包
+            case 0xa:
+                // 如果有设置onWebSocketPong回调,尝试执行
+                if(isset($connection->onWebSocketPong))
+                {
+                    call_user_func(array($connection, 'onWebSocketPong'), $connection);
+                }
+                // 从接受缓冲区中消费掉该数据包
+                if(!$data_len)
+                {
+                    $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
+                    return 0;
+                }
+                break;
+            // 错误的opcode 
+            default :
+                $connection->close();
+                return 0;
         }
         }
         
         
         // websocket二进制数据
         // websocket二进制数据
-        $data_len = ord($buffer[1]) & 127;
-        $head_len = 6;
+        $head_len = self::MIN_HEAD_LEN;
         if ($data_len === 126) {
         if ($data_len === 126) {
             $pack = unpack('ntotal_len', substr($buffer, 2, 2));
             $pack = unpack('ntotal_len', substr($buffer, 2, 2));
             $data_len = $pack['total_len'];
             $data_len = $pack['total_len'];
@@ -95,21 +179,23 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
     /**
     /**
      * 打包
      * 打包
      * @param string $buffer
      * @param string $buffer
+     * @return string
      */
      */
     public static function encode($buffer, ConnectionInterface $connection)
     public static function encode($buffer, ConnectionInterface $connection)
     {
     {
         $len = strlen($buffer);
         $len = strlen($buffer);
+        $first_byte = $connection->protocolData['binaryType'];
         if($len<=125)
         if($len<=125)
         {
         {
-            return "\x81".chr($len).$buffer;
+            return $first_byte.chr($len).$buffer;
         }
         }
         else if($len<=65535)
         else if($len<=65535)
         {
         {
-            return "\x81".chr(126).pack("n", $len).$buffer;
+            return $first_byte.chr(126).pack("n", $len).$buffer;
         }
         }
         else
         else
         {
         {
-            return "\x81".chr(127).pack("xxxxN", $len).$buffer;
+            return $first_byte.chr(127).pack("xxxxN", $len).$buffer;
         }
         }
     }
     }