Ws.php 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. <?php
  2. namespace Workerman\Protocols;
  3. /**
  4. * Websocket protocol for client.
  5. */
  6. class Ws
  7. {
  8. /**
  9. * Websocket blob type.
  10. *
  11. * @var string
  12. */
  13. const BINARY_TYPE_BLOB = "\x81";
  14. /**
  15. * Websocket arraybuffer type.
  16. *
  17. * @var string
  18. */
  19. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  20. /**
  21. * Check the integrity of the package.
  22. *
  23. * @param string $buffer
  24. * @param ConnectionInterface $connection
  25. * @return int
  26. */
  27. public static function input($buffer, $connection)
  28. {
  29. if (empty($connection->handshakeStep)) {
  30. echo "recv data before handshake\n";
  31. return false;
  32. }
  33. // Recv handshake response
  34. if ($connection->handshakeStep === 1) {
  35. $pos = strpos($buffer, "\r\n\r\n");
  36. if ($pos) {
  37. // handshake complete
  38. $connection->handshakeStep = 2;
  39. $handshake_respnse_length = $pos + 4;
  40. // Try to emit onWebSocketConnect callback.
  41. if (isset($connection->onWebSocketConnect)) {
  42. try {
  43. call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_respnse_length));
  44. } catch (\Exception $e) {
  45. echo $e;
  46. exit(250);
  47. }
  48. }
  49. // Headbeat.
  50. if (!empty($connection->websocketPingInterval)) {
  51. $connection->websocketPingTimer = \Workerman\Lib\Timer::add($connection->websocketPingInterval, function() use ($connection){
  52. if (false === $connection->send(pack('H*', '8900'), true)) {
  53. \Workerman\Lib\Timer::del($connection->websocketPingTimer);
  54. }
  55. });
  56. }
  57. $connection->consumeRecvBuffer($handshake_respnse_length);
  58. if (!empty($connection->tmpWebsocketData)) {
  59. $connection->send($connection->tmpWebsocketData, true);
  60. }
  61. if (strlen($buffer > $handshake_respnse_length)) {
  62. return self::input(substr($buffer, $handshake_respnse_length));
  63. }
  64. }
  65. return 0;
  66. }
  67. if (strlen($buffer) < 2) {
  68. return 0;
  69. }
  70. $opcode = ord($buffer[0]) & 0xf;
  71. $data_len = ord($buffer[1]) & 127;
  72. switch ($opcode) {
  73. // Continue.
  74. case 0x0:
  75. break;
  76. // Blob type.
  77. case 0x1:
  78. break;
  79. // Arraybuffer type.
  80. case 0x2:
  81. break;
  82. // Close package.
  83. case 0x8:
  84. // Try to emit onWebSocketClose callback.
  85. if (isset($connection->onWebSocketClose)) {
  86. try {
  87. call_user_func($connection->onWebSocketClose, $connection);
  88. } catch (\Exception $e) {
  89. echo $e;
  90. exit(250);
  91. }
  92. } else {
  93. // Close connection.
  94. $connection->close();
  95. }
  96. return 0;
  97. // Ping package.
  98. case 0x9:
  99. // Try to emit onWebSocketPing callback.
  100. if (isset($connection->onWebSocketPing)) {
  101. try {
  102. call_user_func($connection->onWebSocketPing, $connection);
  103. } catch (\Exception $e) {
  104. echo $e;
  105. exit(250);
  106. }
  107. } else {
  108. // Send pong package to remote.
  109. $connection->send(pack('H*', '8a00'), true);
  110. }
  111. // Consume data from receive buffer.
  112. if (!$data_len) {
  113. $connection->consumeRecvBuffer(2);
  114. return 0;
  115. }
  116. break;
  117. // Pong package.
  118. case 0xa:
  119. // Try to emit onWebSocketPong callback.
  120. if (isset($connection->onWebSocketPong)) {
  121. try {
  122. call_user_func($connection->onWebSocketPong, $connection);
  123. } catch (\Exception $e) {
  124. echo $e;
  125. exit(250);
  126. }
  127. }
  128. // Consume data from receive buffer.
  129. if (!$data_len) {
  130. $connection->consumeRecvBuffer(2);
  131. return 0;
  132. }
  133. break;
  134. }
  135. if ($data_len === 126) {
  136. if (strlen($buffer) < 6) {
  137. return 0;
  138. }
  139. $pack = unpack('nn/ntotal_len', $buffer);
  140. $data_len = $pack['total_len'] + 4;
  141. } else if ($data_len === 127) {
  142. if (strlen($buffer) < 10) {
  143. return 0;
  144. }
  145. $arr = unpack('n/N2c', $buffer);
  146. $data_len = $arr['c1']*4294967296 + $arr['c2'] + 10;
  147. } else {
  148. $data_len += 2;
  149. }
  150. return $data_len;
  151. }
  152. /**
  153. * Websocket encode.
  154. *
  155. * @param string $buffer
  156. * @param ConnectionInterface $connection
  157. * @return string
  158. */
  159. public static function encode($payload, $connection)
  160. {
  161. if (empty($connection->handshakeStep)) {
  162. // Get Host.
  163. $port = $connection->getRemotePort();
  164. $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
  165. // Handshake header.
  166. $header = "GET / HTTP/1.1\r\n".
  167. "Host: $host\r\n".
  168. "Connection: Upgrade\r\n".
  169. "Upgrade: websocket\r\n".
  170. "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
  171. "Sec-WebSocket-Version: 13\r\n".
  172. "Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
  173. $connection->send($header, true);
  174. $connection->handshakeStep = 1;
  175. if (empty($connection->websocketType)) {
  176. $connection->websocketType = self::BINARY_TYPE_BLOB;
  177. }
  178. }
  179. $mask = 1;
  180. $mask_key = "\x00\x00\x00\x00";
  181. $pack = '';
  182. $length = $length_flag = strlen($payload);
  183. if (65535 < $length) {
  184. $pack = pack('NN', ($length & 0xFFFFFFFF00000000) >> 0b100000, $length & 0x00000000FFFFFFFF);
  185. $length_flag = 127;
  186. } else if (125 < $length) {
  187. $pack = pack('n*', $length);
  188. $length_flag = 126;
  189. }
  190. $head = ($mask << 7) | $length_flag;
  191. $head = $connection->websocketType . chr($head) . $pack;
  192. $frame = $head . $mask_key;
  193. // append payload to frame:
  194. for ($i = 0; $i < $length; $i++) {
  195. $frame .= $payload[$i] ^ $mask_key[$i % 4];
  196. }
  197. if ($connection->handshakeStep === 1) {
  198. $connection->tmpWebsocketData = isset($connection->tmpWebsocketData) ? $connection->tmpWebsocketData . $frame : $frame;
  199. return '';
  200. }
  201. return $frame;
  202. }
  203. /**
  204. * Websocket decode.
  205. *
  206. * @param string $buffer
  207. * @param ConnectionInterface $connection
  208. * @return string
  209. */
  210. public static function decode($bytes, $connection)
  211. {
  212. $masked = $bytes[1] >> 7;
  213. $data_length = $masked ? ord($bytes[1]) & 127 : ord($bytes[1]);
  214. $decoded_data = '';
  215. if ($masked === true) {
  216. if ($data_length === 126) {
  217. $mask = substr($bytes, 4, 4);
  218. $coded_data = substr($bytes, 8);
  219. } else if ($data_length === 127) {
  220. $mask = substr($bytes, 10, 4);
  221. $coded_data = substr($bytes, 14);
  222. } else {
  223. $mask = substr($bytes, 2, 4);
  224. $coded_data = substr($bytes, 6);
  225. }
  226. for ($i = 0; $i < strlen($coded_data); $i++) {
  227. $decoded_data .= $coded_data[$i] ^ $mask[$i % 4];
  228. }
  229. } else {
  230. if ($data_length === 126) {
  231. $decoded_data = substr($bytes, 4);
  232. } else if ($data_length === 127) {
  233. $decoded_data = substr($bytes, 10);
  234. } else {
  235. $decoded_data = substr($bytes, 2);
  236. }
  237. }
  238. return $decoded_data;
  239. }
  240. }