Websocket.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Protocols;
  16. use Exception;
  17. use Throwable;
  18. use Workerman\Connection\ConnectionInterface;
  19. use Workerman\Connection\TcpConnection;
  20. use Workerman\Protocols\Http\Request;
  21. use Workerman\Worker;
  22. use function base64_encode;
  23. use function chr;
  24. use function floor;
  25. use function gettype;
  26. use function is_scalar;
  27. use function ord;
  28. use function pack;
  29. use function preg_match;
  30. use function sha1;
  31. use function str_repeat;
  32. use function stripos;
  33. use function strlen;
  34. use function strpos;
  35. use function substr;
  36. use function unpack;
  37. /**
  38. * WebSocket protocol.
  39. */
  40. class Websocket
  41. {
  42. /**
  43. * Websocket blob type.
  44. *
  45. * @var string
  46. */
  47. public const BINARY_TYPE_BLOB = "\x81";
  48. /**
  49. * Websocket arraybuffer type.
  50. *
  51. * @var string
  52. */
  53. public const BINARY_TYPE_ARRAYBUFFER = "\x82";
  54. /**
  55. * Check the integrity of the package.
  56. *
  57. * @param string $buffer
  58. * @param TcpConnection $connection
  59. * @return int
  60. * @throws Throwable
  61. */
  62. public static function input(string $buffer, TcpConnection $connection): int
  63. {
  64. // Receive length.
  65. $recvLen = strlen($buffer);
  66. // We need more data.
  67. if ($recvLen < 6) {
  68. return 0;
  69. }
  70. // Has not yet completed the handshake.
  71. if (empty($connection->context->websocketHandshake)) {
  72. return static::dealHandshake($buffer, $connection);
  73. }
  74. // Buffer websocket frame data.
  75. if ($connection->context->websocketCurrentFrameLength) {
  76. // We need more frame data.
  77. if ($connection->context->websocketCurrentFrameLength > $recvLen) {
  78. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  79. return 0;
  80. }
  81. } else {
  82. $firstByte = ord($buffer[0]);
  83. $secondByte = ord($buffer[1]);
  84. $dataLen = $secondByte & 127;
  85. $isFinFrame = $firstByte >> 7;
  86. $masked = $secondByte >> 7;
  87. if (!$masked) {
  88. Worker::safeEcho("frame not masked so close the connection\n");
  89. $connection->close();
  90. return 0;
  91. }
  92. $opcode = $firstByte & 0xf;
  93. switch ($opcode) {
  94. case 0x0:
  95. // Blob type.
  96. case 0x1:
  97. // Arraybuffer type.
  98. case 0x2:
  99. // Ping package.
  100. case 0x9:
  101. // Pong package.
  102. case 0xa:
  103. break;
  104. // Close package.
  105. case 0x8:
  106. // Try to emit onWebSocketClose callback.
  107. $closeCb = $connection->onWebSocketClose ?? $connection->worker->onWebSocketClose ?? false;
  108. if ($closeCb) {
  109. try {
  110. $closeCb($connection);
  111. } catch (Throwable $e) {
  112. Worker::stopAll(250, $e);
  113. }
  114. } // Close connection.
  115. else {
  116. $connection->close("\x88\x02\x03\xe8", true);
  117. }
  118. return 0;
  119. // Wrong opcode.
  120. default :
  121. Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n");
  122. $connection->close();
  123. return 0;
  124. }
  125. // Calculate packet length.
  126. $headLen = 6;
  127. if ($dataLen === 126) {
  128. $headLen = 8;
  129. if ($headLen > $recvLen) {
  130. return 0;
  131. }
  132. $pack = unpack('nn/ntotal_len', $buffer);
  133. $dataLen = $pack['total_len'];
  134. } else {
  135. if ($dataLen === 127) {
  136. $headLen = 14;
  137. if ($headLen > $recvLen) {
  138. return 0;
  139. }
  140. $arr = unpack('n/N2c', $buffer);
  141. $dataLen = $arr['c1'] * 4294967296 + $arr['c2'];
  142. }
  143. }
  144. $currentFrameLength = $headLen + $dataLen;
  145. $totalPackageSize = strlen($connection->context->websocketDataBuffer) + $currentFrameLength;
  146. if ($totalPackageSize > $connection->maxPackageSize) {
  147. Worker::safeEcho("error package. package_length=$totalPackageSize\n");
  148. $connection->close();
  149. return 0;
  150. }
  151. if ($isFinFrame) {
  152. if ($opcode === 0x9) {
  153. if ($recvLen >= $currentFrameLength) {
  154. $pingData = static::decode(substr($buffer, 0, $currentFrameLength), $connection);
  155. $connection->consumeRecvBuffer($currentFrameLength);
  156. $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  157. $connection->websocketType = "\x8a";
  158. $pingCb = $connection->onWebSocketPing ?? $connection->worker->onWebSocketPing ?? false;
  159. if ($pingCb) {
  160. try {
  161. $pingCb($connection, $pingData);
  162. } catch (Throwable $e) {
  163. Worker::stopAll(250, $e);
  164. }
  165. } else {
  166. $connection->send($pingData);
  167. }
  168. $connection->websocketType = $tmpConnectionType;
  169. if ($recvLen > $currentFrameLength) {
  170. return static::input(substr($buffer, $currentFrameLength), $connection);
  171. }
  172. }
  173. return 0;
  174. }
  175. if ($opcode === 0xa) {
  176. if ($recvLen >= $currentFrameLength) {
  177. $pongData = static::decode(substr($buffer, 0, $currentFrameLength), $connection);
  178. $connection->consumeRecvBuffer($currentFrameLength);
  179. $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  180. $connection->websocketType = "\x8a";
  181. // Try to emit onWebSocketPong callback.
  182. $pongCb = $connection->onWebSocketPong ?? $connection->worker->onWebSocketPong ?? false;
  183. if ($pongCb) {
  184. try {
  185. $pongCb($connection, $pongData);
  186. } catch (Throwable $e) {
  187. Worker::stopAll(250, $e);
  188. }
  189. }
  190. $connection->websocketType = $tmpConnectionType;
  191. if ($recvLen > $currentFrameLength) {
  192. return static::input(substr($buffer, $currentFrameLength), $connection);
  193. }
  194. }
  195. return 0;
  196. }
  197. return $currentFrameLength;
  198. }
  199. $connection->context->websocketCurrentFrameLength = $currentFrameLength;
  200. }
  201. // Received just a frame length data.
  202. if ($connection->context->websocketCurrentFrameLength === $recvLen) {
  203. static::decode($buffer, $connection);
  204. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  205. $connection->context->websocketCurrentFrameLength = 0;
  206. return 0;
  207. }
  208. // The length of the received data is greater than the length of a frame.
  209. if ($connection->context->websocketCurrentFrameLength < $recvLen) {
  210. static::decode(substr($buffer, 0, $connection->context->websocketCurrentFrameLength), $connection);
  211. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  212. $currentFrameLength = $connection->context->websocketCurrentFrameLength;
  213. $connection->context->websocketCurrentFrameLength = 0;
  214. // Continue to read next frame.
  215. return static::input(substr($buffer, $currentFrameLength), $connection);
  216. }
  217. // The length of the received data is less than the length of a frame.
  218. return 0;
  219. }
  220. /**
  221. * Websocket encode.
  222. *
  223. * @param mixed $buffer
  224. * @param TcpConnection $connection
  225. * @return string
  226. * @throws Throwable
  227. */
  228. public static function encode(mixed $buffer, TcpConnection $connection): string
  229. {
  230. if (!is_scalar($buffer)) {
  231. throw new Exception("You can't send(" . gettype($buffer) . ") to client, you need to convert it to string. ");
  232. }
  233. $len = strlen($buffer);
  234. if (empty($connection->websocketType)) {
  235. $connection->websocketType = static::BINARY_TYPE_BLOB;
  236. }
  237. $firstByte = $connection->websocketType;
  238. if ($len <= 125) {
  239. $encodeBuffer = $firstByte . chr($len) . $buffer;
  240. } else {
  241. if ($len <= 65535) {
  242. $encodeBuffer = $firstByte . chr(126) . pack("n", $len) . $buffer;
  243. } else {
  244. $encodeBuffer = $firstByte . chr(127) . pack("xxxxN", $len) . $buffer;
  245. }
  246. }
  247. // Handshake not completed so temporary buffer websocket data waiting for send.
  248. if (empty($connection->context->websocketHandshake)) {
  249. if (empty($connection->context->tmpWebsocketData)) {
  250. $connection->context->tmpWebsocketData = '';
  251. }
  252. // If buffer has already full then discard the current package.
  253. if (strlen($connection->context->tmpWebsocketData) > $connection->maxSendBufferSize) {
  254. if ($connection->onError) {
  255. try {
  256. ($connection->onError)($connection, ConnectionInterface::SEND_FAIL, 'send buffer full and drop package');
  257. } catch (Throwable $e) {
  258. Worker::stopAll(250, $e);
  259. }
  260. }
  261. return '';
  262. }
  263. $connection->context->tmpWebsocketData .= $encodeBuffer;
  264. // Check buffer is full.
  265. if ($connection->onBufferFull && $connection->maxSendBufferSize <= strlen($connection->context->tmpWebsocketData)) {
  266. try {
  267. ($connection->onBufferFull)($connection);
  268. } catch (Throwable $e) {
  269. Worker::stopAll(250, $e);
  270. }
  271. }
  272. // Return empty string.
  273. return '';
  274. }
  275. return $encodeBuffer;
  276. }
  277. /**
  278. * Websocket decode.
  279. *
  280. * @param string $buffer
  281. * @param TcpConnection $connection
  282. * @return string
  283. */
  284. public static function decode(string $buffer, TcpConnection $connection): string
  285. {
  286. $firstByte = ord($buffer[1]);
  287. $len = $firstByte & 127;
  288. if ($len === 126) {
  289. $masks = substr($buffer, 4, 4);
  290. $data = substr($buffer, 8);
  291. } else {
  292. if ($len === 127) {
  293. $masks = substr($buffer, 10, 4);
  294. $data = substr($buffer, 14);
  295. } else {
  296. $masks = substr($buffer, 2, 4);
  297. $data = substr($buffer, 6);
  298. }
  299. }
  300. $dataLength = strlen($data);
  301. $masks = str_repeat($masks, (int)floor($dataLength / 4)) . substr($masks, 0, $dataLength % 4);
  302. $decoded = $data ^ $masks;
  303. if ($connection->context->websocketCurrentFrameLength) {
  304. $connection->context->websocketDataBuffer .= $decoded;
  305. return $connection->context->websocketDataBuffer;
  306. }
  307. if ($connection->context->websocketDataBuffer !== '') {
  308. $decoded = $connection->context->websocketDataBuffer . $decoded;
  309. $connection->context->websocketDataBuffer = '';
  310. }
  311. return $decoded;
  312. }
  313. /**
  314. * Websocket handshake.
  315. *
  316. * @param string $buffer
  317. * @param TcpConnection $connection
  318. * @return int
  319. * @throws Throwable
  320. */
  321. public static function dealHandshake(string $buffer, TcpConnection $connection): int
  322. {
  323. // HTTP protocol.
  324. if (str_starts_with($buffer, 'GET')) {
  325. // Find \r\n\r\n.
  326. $headerEndPos = strpos($buffer, "\r\n\r\n");
  327. if (!$headerEndPos) {
  328. return 0;
  329. }
  330. $headerLength = $headerEndPos + 4;
  331. // Get Sec-WebSocket-Key.
  332. if (preg_match("/Sec-WebSocket-Key: *(.*?)\r\n/i", $buffer, $match)) {
  333. $SecWebSocketKey = $match[1];
  334. } else {
  335. $connection->close(
  336. "HTTP/1.0 400 Bad Request\r\nServer: workerman\r\n\r\n<div style=\"text-align:center\"><h1>WebSocket</h1><hr>workerman</div>", true);
  337. return 0;
  338. }
  339. // Calculation websocket key.
  340. $newKey = base64_encode(sha1($SecWebSocketKey . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));
  341. // Handshake response data.
  342. $handshakeMessage = "HTTP/1.1 101 Switching Protocol\r\n"
  343. . "Upgrade: websocket\r\n"
  344. . "Sec-WebSocket-Version: 13\r\n"
  345. . "Connection: Upgrade\r\n"
  346. . "Sec-WebSocket-Accept: " . $newKey . "\r\n";
  347. // Websocket data buffer.
  348. $connection->context->websocketDataBuffer = '';
  349. // Current websocket frame length.
  350. $connection->context->websocketCurrentFrameLength = 0;
  351. // Current websocket frame data.
  352. $connection->context->websocketCurrentFrameBuffer = '';
  353. // Consume handshake data.
  354. $connection->consumeRecvBuffer($headerLength);
  355. // Request from buffer
  356. $request = new Request($buffer);
  357. // Try to emit onWebSocketConnect callback.
  358. $onWebsocketConnect = $connection->onWebSocketConnect ?? $connection->worker->onWebSocketConnect ?? false;
  359. if ($onWebsocketConnect) {
  360. try {
  361. $onWebsocketConnect($connection, $request);
  362. } catch (Throwable $e) {
  363. Worker::stopAll(250, $e);
  364. }
  365. }
  366. // blob or arraybuffer
  367. if (empty($connection->websocketType)) {
  368. $connection->websocketType = static::BINARY_TYPE_BLOB;
  369. }
  370. $hasServerHeader = false;
  371. if ($connection->headers) {
  372. foreach ($connection->headers as $header) {
  373. if (stripos($header, 'Server:') === 0) {
  374. $hasServerHeader = true;
  375. }
  376. $handshakeMessage .= "$header\r\n";
  377. }
  378. }
  379. if (!$hasServerHeader) {
  380. $handshakeMessage .= "Server: workerman\r\n";
  381. }
  382. $handshakeMessage .= "\r\n";
  383. // Send handshake response.
  384. $status = $connection->send($handshakeMessage, true);
  385. // Mark handshake complete.
  386. $connection->context->websocketHandshake = true;
  387. // Try to emit onWebSocketConnected callback.
  388. $onWebsocketConnected = $connection->onWebSocketConnected ?? $connection->worker->onWebSocketConnected ?? false;
  389. if ($status && $onWebsocketConnected) {
  390. try {
  391. $onWebsocketConnected($connection, $request);
  392. } catch (Throwable $e) {
  393. Worker::stopAll(250, $e);
  394. }
  395. }
  396. // There are data waiting to be sent.
  397. if (!empty($connection->context->tmpWebsocketData)) {
  398. $connection->send($connection->context->tmpWebsocketData, true);
  399. $connection->context->tmpWebsocketData = '';
  400. }
  401. if (strlen($buffer) > $headerLength) {
  402. return static::input(substr($buffer, $headerLength), $connection);
  403. }
  404. return 0;
  405. }
  406. // Bad websocket handshake request.
  407. $connection->close(
  408. "HTTP/1.0 400 Bad Request\r\nServer: workerman\r\n\r\n<div style=\"text-align:center\"><h1>400 Bad Request</h1><hr>workerman</div>", true);
  409. return 0;
  410. }
  411. }