Websocket.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman\Protocols;
  15. use Workerman\Connection\ConnectionInterface;
  16. use Workerman\Connection\TcpConnection;
  17. use Workerman\Protocols\Http\Request;
  18. use Workerman\Worker;
  19. /**
  20. * WebSocket protocol.
  21. */
  22. class Websocket implements \Workerman\Protocols\ProtocolInterface
  23. {
  24. /**
  25. * Websocket blob type.
  26. *
  27. * @var string
  28. */
  29. const BINARY_TYPE_BLOB = "\x81";
  30. /**
  31. * Websocket arraybuffer type.
  32. *
  33. * @var string
  34. */
  35. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  36. /**
  37. * Check the integrity of the package.
  38. *
  39. * @param string $buffer
  40. * @param ConnectionInterface $connection
  41. * @return int
  42. */
  43. public static function input($buffer, ConnectionInterface $connection)
  44. {
  45. // Receive length.
  46. $recv_len = \strlen($buffer);
  47. // We need more data.
  48. if ($recv_len < 6) {
  49. return 0;
  50. }
  51. // Has not yet completed the handshake.
  52. if (empty($connection->context->websocketHandshake)) {
  53. return static::dealHandshake($buffer, $connection);
  54. }
  55. // Buffer websocket frame data.
  56. if ($connection->context->websocketCurrentFrameLength) {
  57. // We need more frame data.
  58. if ($connection->context->websocketCurrentFrameLength > $recv_len) {
  59. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  60. return 0;
  61. }
  62. } else {
  63. $firstbyte = \ord($buffer[0]);
  64. $secondbyte = \ord($buffer[1]);
  65. $data_len = $secondbyte & 127;
  66. $is_fin_frame = $firstbyte >> 7;
  67. $masked = $secondbyte >> 7;
  68. if (!$masked) {
  69. Worker::safeEcho("frame not masked so close the connection\n");
  70. $connection->close();
  71. return 0;
  72. }
  73. $opcode = $firstbyte & 0xf;
  74. switch ($opcode) {
  75. case 0x0:
  76. break;
  77. // Blob type.
  78. case 0x1:
  79. break;
  80. // Arraybuffer type.
  81. case 0x2:
  82. break;
  83. // Close package.
  84. case 0x8:
  85. // Try to emit onWebSocketClose callback.
  86. $close_cb = $connection->onWebSocketClose ?? $connection->worker->onWebSocketClose ?? false;
  87. if ($close_cb) {
  88. try {
  89. $close_cb($connection);
  90. } catch (\Throwable $e) {
  91. Worker::stopAll(250, $e);
  92. }
  93. } // Close connection.
  94. else {
  95. $connection->close("\x88\x02\x03\xe8", true);
  96. }
  97. return 0;
  98. // Ping package.
  99. case 0x9:
  100. break;
  101. // Pong package.
  102. case 0xa:
  103. break;
  104. // Wrong opcode.
  105. default :
  106. Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n");
  107. $connection->close();
  108. return 0;
  109. }
  110. // Calculate packet length.
  111. $head_len = 6;
  112. if ($data_len === 126) {
  113. $head_len = 8;
  114. if ($head_len > $recv_len) {
  115. return 0;
  116. }
  117. $pack = \unpack('nn/ntotal_len', $buffer);
  118. $data_len = $pack['total_len'];
  119. } else {
  120. if ($data_len === 127) {
  121. $head_len = 14;
  122. if ($head_len > $recv_len) {
  123. return 0;
  124. }
  125. $arr = \unpack('n/N2c', $buffer);
  126. $data_len = $arr['c1'] * 4294967296 + $arr['c2'];
  127. }
  128. }
  129. $current_frame_length = $head_len + $data_len;
  130. $total_package_size = \strlen($connection->context->websocketDataBuffer) + $current_frame_length;
  131. if ($total_package_size > $connection->maxPackageSize) {
  132. Worker::safeEcho("error package. package_length=$total_package_size\n");
  133. $connection->close();
  134. return 0;
  135. }
  136. if ($is_fin_frame) {
  137. if ($opcode === 0x9) {
  138. if ($recv_len >= $current_frame_length) {
  139. $ping_data = static::decode(\substr($buffer, 0, $current_frame_length), $connection);
  140. $connection->consumeRecvBuffer($current_frame_length);
  141. $tmp_connection_type = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  142. $connection->websocketType = "\x8a";
  143. $ping_cb = $connection->onWebSocketPing ?? $connection->worker->onWebSocketPing ?? false;
  144. if ($ping_cb) {
  145. try {
  146. $ping_cb($connection, $ping_data);
  147. } catch (\Throwable $e) {
  148. Worker::stopAll(250, $e);
  149. }
  150. } else {
  151. $connection->send($ping_data);
  152. }
  153. $connection->websocketType = $tmp_connection_type;
  154. if ($recv_len > $current_frame_length) {
  155. return static::input(\substr($buffer, $current_frame_length), $connection);
  156. }
  157. }
  158. return 0;
  159. } else if ($opcode === 0xa) {
  160. if ($recv_len >= $current_frame_length) {
  161. $pong_data = static::decode(\substr($buffer, 0, $current_frame_length), $connection);
  162. $connection->consumeRecvBuffer($current_frame_length);
  163. $tmp_connection_type = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  164. $connection->websocketType = "\x8a";
  165. // Try to emit onWebSocketPong callback.
  166. $pong_cb = $connection->onWebSocketPong ?? $connection->worker->onWebSocketPong ?? false;
  167. if ($pong_cb) {
  168. try {
  169. $pong_cb($connection, $pong_data);
  170. } catch (\Throwable $e) {
  171. Worker::stopAll(250, $e);
  172. }
  173. }
  174. $connection->websocketType = $tmp_connection_type;
  175. if ($recv_len > $current_frame_length) {
  176. return static::input(\substr($buffer, $current_frame_length), $connection);
  177. }
  178. }
  179. return 0;
  180. }
  181. return $current_frame_length;
  182. } else {
  183. $connection->context->websocketCurrentFrameLength = $current_frame_length;
  184. }
  185. }
  186. // Received just a frame length data.
  187. if ($connection->context->websocketCurrentFrameLength === $recv_len) {
  188. static::decode($buffer, $connection);
  189. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  190. $connection->context->websocketCurrentFrameLength = 0;
  191. return 0;
  192. } // The length of the received data is greater than the length of a frame.
  193. elseif ($connection->context->websocketCurrentFrameLength < $recv_len) {
  194. static::decode(\substr($buffer, 0, $connection->context->websocketCurrentFrameLength), $connection);
  195. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  196. $current_frame_length = $connection->context->websocketCurrentFrameLength;
  197. $connection->context->websocketCurrentFrameLength = 0;
  198. // Continue to read next frame.
  199. return static::input(\substr($buffer, $current_frame_length), $connection);
  200. } // The length of the received data is less than the length of a frame.
  201. else {
  202. return 0;
  203. }
  204. }
  205. /**
  206. * Websocket encode.
  207. *
  208. * @param string $buffer
  209. * @param ConnectionInterface $connection
  210. * @return string
  211. */
  212. public static function encode($buffer, ConnectionInterface $connection)
  213. {
  214. if (!is_scalar($buffer)) {
  215. throw new \Exception("You can't send(" . \gettype($buffer) . ") to client, you need to convert it to a string. ");
  216. }
  217. $len = \strlen($buffer);
  218. if (empty($connection->websocketType)) {
  219. $connection->websocketType = static::BINARY_TYPE_BLOB;
  220. }
  221. $first_byte = $connection->websocketType;
  222. if ($len <= 125) {
  223. $encode_buffer = $first_byte . \chr($len) . $buffer;
  224. } else {
  225. if ($len <= 65535) {
  226. $encode_buffer = $first_byte . \chr(126) . \pack("n", $len) . $buffer;
  227. } else {
  228. $encode_buffer = $first_byte . \chr(127) . \pack("xxxxN", $len) . $buffer;
  229. }
  230. }
  231. // Handshake not completed so temporary buffer websocket data waiting for send.
  232. if (empty($connection->context->websocketHandshake)) {
  233. if (empty($connection->context->tmpWebsocketData)) {
  234. $connection->context->tmpWebsocketData = '';
  235. }
  236. // If buffer has already full then discard the current package.
  237. if (\strlen($connection->context->tmpWebsocketData) > $connection->maxSendBufferSize) {
  238. if ($connection->onError) {
  239. try {
  240. ($connection->onError)($connection, ConnectionInterface::SEND_FAIL, 'send buffer full and drop package');
  241. } catch (\Throwable $e) {
  242. Worker::stopAll(250, $e);
  243. }
  244. }
  245. return '';
  246. }
  247. $connection->context->tmpWebsocketData .= $encode_buffer;
  248. // Check buffer is full.
  249. if ($connection->maxSendBufferSize <= \strlen($connection->context->tmpWebsocketData)) {
  250. if ($connection->onBufferFull) {
  251. try {
  252. ($connection->onBufferFull)($connection);
  253. } catch (\Throwable $e) {
  254. Worker::stopAll(250, $e);
  255. }
  256. }
  257. }
  258. // Return empty string.
  259. return '';
  260. }
  261. return $encode_buffer;
  262. }
  263. /**
  264. * Websocket decode.
  265. *
  266. * @param string $buffer
  267. * @param ConnectionInterface $connection
  268. * @return string
  269. */
  270. public static function decode($buffer, ConnectionInterface $connection)
  271. {
  272. $first_byte = \ord($buffer[1]);
  273. $len = $first_byte & 127;
  274. $rsv1 = $first_byte & 64;
  275. if ($len === 126) {
  276. $masks = \substr($buffer, 4, 4);
  277. $data = \substr($buffer, 8);
  278. } else {
  279. if ($len === 127) {
  280. $masks = \substr($buffer, 10, 4);
  281. $data = \substr($buffer, 14);
  282. } else {
  283. $masks = \substr($buffer, 2, 4);
  284. $data = \substr($buffer, 6);
  285. }
  286. }
  287. $dataLength = \strlen($data);
  288. $masks = \str_repeat($masks, \floor($dataLength / 4)) . \substr($masks, 0, $dataLength % 4);
  289. $decoded = $data ^ $masks;
  290. if ($connection->context->websocketCurrentFrameLength) {
  291. $connection->context->websocketDataBuffer .= $decoded;
  292. return $connection->context->websocketDataBuffer;
  293. } else {
  294. if ($connection->context->websocketDataBuffer !== '') {
  295. $decoded = $connection->context->websocketDataBuffer . $decoded;
  296. $connection->context->websocketDataBuffer = '';
  297. }
  298. return $decoded;
  299. }
  300. }
  301. /**
  302. * Websocket handshake.
  303. *
  304. * @param string $buffer
  305. * @param TcpConnection $connection
  306. * @return int
  307. */
  308. public static function dealHandshake($buffer, TcpConnection $connection)
  309. {
  310. // HTTP protocol.
  311. if (0 === \strpos($buffer, 'GET')) {
  312. // Find \r\n\r\n.
  313. $header_end_pos = \strpos($buffer, "\r\n\r\n");
  314. if (!$header_end_pos) {
  315. return 0;
  316. }
  317. $header_length = $header_end_pos + 4;
  318. // Get Sec-WebSocket-Key.
  319. $Sec_WebSocket_Key = '';
  320. if (\preg_match("/Sec-WebSocket-Key: *(.*?)\r\n/i", $buffer, $match)) {
  321. $Sec_WebSocket_Key = $match[1];
  322. } else {
  323. $connection->close("HTTP/1.1 200 WebSocket\r\nServer: workerman/" . Worker::VERSION . "\r\n\r\n<div style=\"text-align:center\"><h1>WebSocket</h1><hr>workerman/" . Worker::VERSION . "</div>",
  324. true);
  325. return 0;
  326. }
  327. // Calculation websocket key.
  328. $new_key = \base64_encode(\sha1($Sec_WebSocket_Key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));
  329. // Handshake response data.
  330. $handshake_message = "HTTP/1.1 101 Switching Protocols\r\n"
  331. . "Upgrade: websocket\r\n"
  332. . "Sec-WebSocket-Version: 13\r\n"
  333. . "Connection: Upgrade\r\n"
  334. . "Sec-WebSocket-Accept: " . $new_key . "\r\n";
  335. // Websocket data buffer.
  336. $connection->context->websocketDataBuffer = '';
  337. // Current websocket frame length.
  338. $connection->context->websocketCurrentFrameLength = 0;
  339. // Current websocket frame data.
  340. $connection->context->websocketCurrentFrameBuffer = '';
  341. // Consume handshake data.
  342. $connection->consumeRecvBuffer($header_length);
  343. // Try to emit onWebSocketConnect callback.
  344. $on_websocket_connect = $connection->onWebSocketConnect ?? $connection->worker->onWebSocketConnect ?? false;
  345. if ($on_websocket_connect) {
  346. try {
  347. $on_websocket_connect($connection, new Request($buffer));
  348. } catch (\Throwable $e) {
  349. Worker::stopAll(250, $e);
  350. }
  351. }
  352. // blob or arraybuffer
  353. if (empty($connection->websocketType)) {
  354. $connection->websocketType = static::BINARY_TYPE_BLOB;
  355. }
  356. $has_server_header = false;
  357. if (isset($connection->headers)) {
  358. if (\is_array($connection->headers)) {
  359. foreach ($connection->headers as $header) {
  360. if (\stripos($header, 'Server:') === 0) {
  361. $has_server_header = true;
  362. }
  363. $handshake_message .= "$header\r\n";
  364. }
  365. } else {
  366. if (\stripos($connection->headers, 'Server:') !== false) {
  367. $has_server_header = true;
  368. }
  369. $handshake_message .= "$connection->headers\r\n";
  370. }
  371. }
  372. if (!$has_server_header) {
  373. $handshake_message .= "Server: workerman/" . Worker::VERSION . "\r\n";
  374. }
  375. $handshake_message .= "\r\n";
  376. // Send handshake response.
  377. $connection->send($handshake_message, true);
  378. // Mark handshake complete..
  379. $connection->context->websocketHandshake = true;
  380. // There are data waiting to be sent.
  381. if (!empty($connection->context->tmpWebsocketData)) {
  382. $connection->send($connection->context->tmpWebsocketData, true);
  383. $connection->context->tmpWebsocketData = '';
  384. }
  385. if (\strlen($buffer) > $header_length) {
  386. return static::input(\substr($buffer, $header_length), $connection);
  387. }
  388. return 0;
  389. }
  390. // Bad websocket handshake request.
  391. $connection->close("HTTP/1.1 200 WebSocket\r\nServer: workerman/" . Worker::VERSION . "\r\n\r\n<div style=\"text-align:center\"><h1>WebSocket</h1><hr>workerman/" . Worker::VERSION . "</div>",
  392. true);
  393. return 0;
  394. }
  395. }