Websocket.php 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Protocols;
  16. use Throwable;
  17. use Workerman\Connection\ConnectionInterface;
  18. use Workerman\Connection\TcpConnection;
  19. use Workerman\Protocols\Http\Request;
  20. use Workerman\Worker;
  21. use function base64_encode;
  22. use function chr;
  23. use function deflate_add;
  24. use function deflate_init;
  25. use function floor;
  26. use function inflate_add;
  27. use function inflate_init;
  28. use function is_scalar;
  29. use function ord;
  30. use function pack;
  31. use function preg_match;
  32. use function sha1;
  33. use function str_repeat;
  34. use function stripos;
  35. use function strlen;
  36. use function strpos;
  37. use function substr;
  38. use function unpack;
  39. use const ZLIB_DEFAULT_STRATEGY;
  40. use const ZLIB_ENCODING_RAW;
  41. /**
  42. * WebSocket protocol.
  43. */
  44. class Websocket
  45. {
  46. /**
  47. * Websocket blob type.
  48. *
  49. * @var string
  50. */
  51. public const BINARY_TYPE_BLOB = "\x81";
  52. /**
  53. * Websocket blob type.
  54. *
  55. * @var string
  56. */
  57. const BINARY_TYPE_BLOB_DEFLATE = "\xc1";
  58. /**
  59. * Websocket arraybuffer type.
  60. *
  61. * @var string
  62. */
  63. public const BINARY_TYPE_ARRAYBUFFER = "\x82";
  64. /**
  65. * Websocket arraybuffer type.
  66. *
  67. * @var string
  68. */
  69. const BINARY_TYPE_ARRAYBUFFER_DEFLATE = "\xc2";
  70. /**
  71. * Check the integrity of the package.
  72. *
  73. * @param string $buffer
  74. * @param TcpConnection $connection
  75. * @return int
  76. */
  77. public static function input(string $buffer, TcpConnection $connection): int
  78. {
  79. // Receive length.
  80. $recvLen = strlen($buffer);
  81. // We need more data.
  82. if ($recvLen < 6) {
  83. return 0;
  84. }
  85. // Has not yet completed the handshake.
  86. if (empty($connection->context->websocketHandshake)) {
  87. return static::dealHandshake($buffer, $connection);
  88. }
  89. // Buffer websocket frame data.
  90. if ($connection->context->websocketCurrentFrameLength) {
  91. // We need more frame data.
  92. if ($connection->context->websocketCurrentFrameLength > $recvLen) {
  93. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  94. return 0;
  95. }
  96. } else {
  97. $firstByte = ord($buffer[0]);
  98. $secondByte = ord($buffer[1]);
  99. $dataLen = $secondByte & 127;
  100. $isFinFrame = $firstByte >> 7;
  101. $masked = $secondByte >> 7;
  102. if (!$masked) {
  103. Worker::safeEcho("frame not masked so close the connection\n");
  104. $connection->close();
  105. return 0;
  106. }
  107. $opcode = $firstByte & 0xf;
  108. switch ($opcode) {
  109. case 0x0:
  110. // Blob type.
  111. case 0x1:
  112. // Arraybuffer type.
  113. case 0x2:
  114. // Ping package.
  115. case 0x9:
  116. // Pong package.
  117. case 0xa:
  118. break;
  119. // Close package.
  120. case 0x8:
  121. // Try to emit onWebSocketClose callback.
  122. $closeCb = $connection->onWebSocketClose ?? $connection->worker->onWebSocketClose ?? false;
  123. if ($closeCb) {
  124. try {
  125. $closeCb($connection);
  126. } catch (Throwable $e) {
  127. Worker::stopAll(250, $e);
  128. }
  129. } // Close connection.
  130. else {
  131. $connection->close("\x88\x02\x03\xe8", true);
  132. }
  133. return 0;
  134. // Wrong opcode.
  135. default :
  136. Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . bin2hex($buffer) . "\n");
  137. $connection->close();
  138. return 0;
  139. }
  140. // Calculate packet length.
  141. $headLen = 6;
  142. if ($dataLen === 126) {
  143. $headLen = 8;
  144. if ($headLen > $recvLen) {
  145. return 0;
  146. }
  147. $pack = unpack('nn/ntotal_len', $buffer);
  148. $dataLen = $pack['total_len'];
  149. } else {
  150. if ($dataLen === 127) {
  151. $headLen = 14;
  152. if ($headLen > $recvLen) {
  153. return 0;
  154. }
  155. $arr = unpack('n/N2c', $buffer);
  156. $dataLen = $arr['c1'] * 4294967296 + $arr['c2'];
  157. }
  158. }
  159. $currentFrameLength = $headLen + $dataLen;
  160. $totalPackageSize = strlen($connection->context->websocketDataBuffer) + $currentFrameLength;
  161. if ($totalPackageSize > $connection->maxPackageSize) {
  162. Worker::safeEcho("error package. package_length=$totalPackageSize\n");
  163. $connection->close();
  164. return 0;
  165. }
  166. if ($isFinFrame) {
  167. if ($opcode === 0x9) {
  168. if ($recvLen >= $currentFrameLength) {
  169. $pingData = static::decode(substr($buffer, 0, $currentFrameLength), $connection);
  170. $connection->consumeRecvBuffer($currentFrameLength);
  171. $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  172. $connection->websocketType = "\x8a";
  173. $pingCb = $connection->onWebSocketPing ?? $connection->worker->onWebSocketPing ?? false;
  174. if ($pingCb) {
  175. try {
  176. $pingCb($connection, $pingData);
  177. } catch (Throwable $e) {
  178. Worker::stopAll(250, $e);
  179. }
  180. } else {
  181. $connection->send($pingData);
  182. }
  183. $connection->websocketType = $tmpConnectionType;
  184. if ($recvLen > $currentFrameLength) {
  185. return static::input(substr($buffer, $currentFrameLength), $connection);
  186. }
  187. }
  188. return 0;
  189. }
  190. if ($opcode === 0xa) {
  191. if ($recvLen >= $currentFrameLength) {
  192. $pongData = static::decode(substr($buffer, 0, $currentFrameLength), $connection);
  193. $connection->consumeRecvBuffer($currentFrameLength);
  194. $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  195. $connection->websocketType = "\x8a";
  196. // Try to emit onWebSocketPong callback.
  197. $pongCb = $connection->onWebSocketPong ?? $connection->worker->onWebSocketPong ?? false;
  198. if ($pongCb) {
  199. try {
  200. $pongCb($connection, $pongData);
  201. } catch (Throwable $e) {
  202. Worker::stopAll(250, $e);
  203. }
  204. }
  205. $connection->websocketType = $tmpConnectionType;
  206. if ($recvLen > $currentFrameLength) {
  207. return static::input(substr($buffer, $currentFrameLength), $connection);
  208. }
  209. }
  210. return 0;
  211. }
  212. return $currentFrameLength;
  213. }
  214. $connection->context->websocketCurrentFrameLength = $currentFrameLength;
  215. }
  216. // Received just a frame length data.
  217. if ($connection->context->websocketCurrentFrameLength === $recvLen) {
  218. static::decode($buffer, $connection);
  219. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  220. $connection->context->websocketCurrentFrameLength = 0;
  221. return 0;
  222. }
  223. // The length of the received data is greater than the length of a frame.
  224. if ($connection->context->websocketCurrentFrameLength < $recvLen) {
  225. static::decode(substr($buffer, 0, $connection->context->websocketCurrentFrameLength), $connection);
  226. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  227. $currentFrameLength = $connection->context->websocketCurrentFrameLength;
  228. $connection->context->websocketCurrentFrameLength = 0;
  229. // Continue to read next frame.
  230. return static::input(substr($buffer, $currentFrameLength), $connection);
  231. }
  232. // The length of the received data is less than the length of a frame.
  233. return 0;
  234. }
  235. /**
  236. * Websocket encode.
  237. *
  238. * @param mixed $buffer
  239. * @param TcpConnection $connection
  240. * @return string
  241. */
  242. public static function encode(mixed $buffer, TcpConnection $connection): string
  243. {
  244. if (!is_scalar($buffer)) {
  245. $buffer = json_encode($buffer, JSON_UNESCAPED_UNICODE);
  246. }
  247. if (empty($connection->websocketType)) {
  248. $connection->websocketType = static::BINARY_TYPE_BLOB;
  249. }
  250. if (ord($connection->websocketType) & 64) {
  251. $buffer = static::deflate($connection, $buffer);
  252. }
  253. $firstByte = $connection->websocketType;
  254. $len = strlen($buffer);
  255. if ($len <= 125) {
  256. $encodeBuffer = $firstByte . chr($len) . $buffer;
  257. } else {
  258. if ($len <= 65535) {
  259. $encodeBuffer = $firstByte . chr(126) . pack("n", $len) . $buffer;
  260. } else {
  261. $encodeBuffer = $firstByte . chr(127) . pack("xxxxN", $len) . $buffer;
  262. }
  263. }
  264. // Handshake not completed so temporary buffer websocket data waiting for send.
  265. if (empty($connection->context->websocketHandshake)) {
  266. if (empty($connection->context->tmpWebsocketData)) {
  267. $connection->context->tmpWebsocketData = '';
  268. }
  269. // If buffer has already full then discard the current package.
  270. if (strlen($connection->context->tmpWebsocketData) > $connection->maxSendBufferSize) {
  271. if ($connection->onError) {
  272. try {
  273. ($connection->onError)($connection, ConnectionInterface::SEND_FAIL, 'send buffer full and drop package');
  274. } catch (Throwable $e) {
  275. Worker::stopAll(250, $e);
  276. }
  277. }
  278. return '';
  279. }
  280. $connection->context->tmpWebsocketData .= $encodeBuffer;
  281. // Check buffer is full.
  282. if ($connection->onBufferFull && $connection->maxSendBufferSize <= strlen($connection->context->tmpWebsocketData)) {
  283. try {
  284. ($connection->onBufferFull)($connection);
  285. } catch (Throwable $e) {
  286. Worker::stopAll(250, $e);
  287. }
  288. }
  289. // Return empty string.
  290. return '';
  291. }
  292. return $encodeBuffer;
  293. }
  294. /**
  295. * Websocket decode.
  296. *
  297. * @param string $buffer
  298. * @param TcpConnection $connection
  299. * @return string
  300. */
  301. public static function decode(string $buffer, TcpConnection $connection): string
  302. {
  303. $firstByte = ord($buffer[0]);
  304. $secondByte = ord($buffer[1]);
  305. $len = $secondByte & 127;
  306. $isFinFrame = (bool)($firstByte >> 7);
  307. $rsv1 = 64 === ($firstByte & 64);
  308. if ($len === 126) {
  309. $masks = substr($buffer, 4, 4);
  310. $data = substr($buffer, 8);
  311. } else {
  312. if ($len === 127) {
  313. $masks = substr($buffer, 10, 4);
  314. $data = substr($buffer, 14);
  315. } else {
  316. $masks = substr($buffer, 2, 4);
  317. $data = substr($buffer, 6);
  318. }
  319. }
  320. $dataLength = strlen($data);
  321. $masks = str_repeat($masks, (int)floor($dataLength / 4)) . substr($masks, 0, $dataLength % 4);
  322. $decoded = $data ^ $masks;
  323. if ($connection->context->websocketCurrentFrameLength) {
  324. $connection->context->websocketDataBuffer .= $decoded;
  325. if ($rsv1) {
  326. return static::inflate($connection, $connection->context->websocketDataBuffer, $isFinFrame);
  327. }
  328. return $connection->context->websocketDataBuffer;
  329. }
  330. if ($connection->context->websocketDataBuffer !== '') {
  331. $decoded = $connection->context->websocketDataBuffer . $decoded;
  332. $connection->context->websocketDataBuffer = '';
  333. }
  334. if ($rsv1) {
  335. return static::inflate($connection, $decoded, $isFinFrame);
  336. }
  337. return $decoded;
  338. }
  339. /**
  340. * Inflate.
  341. *
  342. * @param TcpConnection $connection
  343. * @param string $buffer
  344. * @param bool $isFinFrame
  345. * @return false|string
  346. */
  347. protected static function inflate(TcpConnection $connection, string $buffer, bool $isFinFrame): bool|string
  348. {
  349. if (!isset($connection->context->inflator)) {
  350. $connection->context->inflator = inflate_init(
  351. ZLIB_ENCODING_RAW,
  352. [
  353. 'level' => -1,
  354. 'memory' => 8,
  355. 'window' => 15,
  356. 'strategy' => ZLIB_DEFAULT_STRATEGY
  357. ]
  358. );
  359. }
  360. if ($isFinFrame) {
  361. $buffer .= "\x00\x00\xff\xff";
  362. }
  363. return inflate_add($connection->context->inflator, $buffer);
  364. }
  365. /**
  366. * Deflate.
  367. *
  368. * @param TcpConnection $connection
  369. * @param string $buffer
  370. * @return false|string
  371. */
  372. protected static function deflate(TcpConnection $connection, string $buffer): bool|string
  373. {
  374. if (!isset($connection->context->deflator)) {
  375. $connection->context->deflator = deflate_init(
  376. ZLIB_ENCODING_RAW,
  377. [
  378. 'level' => -1,
  379. 'memory' => 8,
  380. 'window' => 15,
  381. 'strategy' => ZLIB_DEFAULT_STRATEGY
  382. ]
  383. );
  384. }
  385. return substr(deflate_add($connection->context->deflator, $buffer), 0, -4);
  386. }
  387. /**
  388. * Websocket handshake.
  389. *
  390. * @param string $buffer
  391. * @param TcpConnection $connection
  392. * @return int
  393. */
  394. public static function dealHandshake(string $buffer, TcpConnection $connection): int
  395. {
  396. // HTTP protocol.
  397. if (str_starts_with($buffer, 'GET')) {
  398. // Find \r\n\r\n.
  399. $headerEndPos = strpos($buffer, "\r\n\r\n");
  400. if (!$headerEndPos) {
  401. return 0;
  402. }
  403. $headerLength = $headerEndPos + 4;
  404. // Get Sec-WebSocket-Key.
  405. if (preg_match("/Sec-WebSocket-Key: *(.*?)\r\n/i", $buffer, $match)) {
  406. $SecWebSocketKey = $match[1];
  407. } else {
  408. $connection->close(
  409. "HTTP/1.0 400 Bad Request\r\nServer: workerman\r\n\r\n<div style=\"text-align:center\"><h1>WebSocket</h1><hr>workerman</div>", true);
  410. return 0;
  411. }
  412. // Calculation websocket key.
  413. $newKey = base64_encode(sha1($SecWebSocketKey . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));
  414. // Handshake response data.
  415. $handshakeMessage = "HTTP/1.1 101 Switching Protocol\r\n"
  416. . "Upgrade: websocket\r\n"
  417. . "Sec-WebSocket-Version: 13\r\n"
  418. . "Connection: Upgrade\r\n"
  419. . "Sec-WebSocket-Accept: " . $newKey . "\r\n";
  420. // Websocket data buffer.
  421. $connection->context->websocketDataBuffer = '';
  422. // Current websocket frame length.
  423. $connection->context->websocketCurrentFrameLength = 0;
  424. // Current websocket frame data.
  425. $connection->context->websocketCurrentFrameBuffer = '';
  426. // Consume handshake data.
  427. $connection->consumeRecvBuffer($headerLength);
  428. // Request from buffer
  429. $request = new Request($buffer);
  430. // Try to emit onWebSocketConnect callback.
  431. $onWebsocketConnect = $connection->onWebSocketConnect ?? $connection->worker->onWebSocketConnect ?? false;
  432. if ($onWebsocketConnect) {
  433. try {
  434. $onWebsocketConnect($connection, $request);
  435. } catch (Throwable $e) {
  436. Worker::stopAll(250, $e);
  437. }
  438. }
  439. // blob or arraybuffer
  440. if (empty($connection->websocketType)) {
  441. $connection->websocketType = static::BINARY_TYPE_BLOB;
  442. }
  443. $hasServerHeader = false;
  444. if ($connection->headers) {
  445. foreach ($connection->headers as $header) {
  446. if (stripos($header, 'Server:') === 0) {
  447. $hasServerHeader = true;
  448. }
  449. $handshakeMessage .= "$header\r\n";
  450. }
  451. }
  452. if (!$hasServerHeader) {
  453. $handshakeMessage .= "Server: workerman\r\n";
  454. }
  455. $handshakeMessage .= "\r\n";
  456. // Send handshake response.
  457. $connection->send($handshakeMessage, true);
  458. // Mark handshake complete.
  459. $connection->context->websocketHandshake = true;
  460. // Try to emit onWebSocketConnected callback.
  461. $onWebsocketConnected = $connection->onWebSocketConnected ?? $connection->worker->onWebSocketConnected ?? false;
  462. if ($onWebsocketConnected) {
  463. try {
  464. $onWebsocketConnected($connection, $request);
  465. } catch (Throwable $e) {
  466. Worker::stopAll(250, $e);
  467. }
  468. }
  469. // There are data waiting to be sent.
  470. if (!empty($connection->context->tmpWebsocketData)) {
  471. $connection->send($connection->context->tmpWebsocketData, true);
  472. $connection->context->tmpWebsocketData = '';
  473. }
  474. if (strlen($buffer) > $headerLength) {
  475. return static::input(substr($buffer, $headerLength), $connection);
  476. }
  477. return 0;
  478. }
  479. // Bad websocket handshake request.
  480. $connection->close(
  481. "HTTP/1.0 400 Bad Request\r\nServer: workerman\r\n\r\n<div style=\"text-align:center\"><h1>400 Bad Request</h1><hr>workerman</div>", true);
  482. return 0;
  483. }
  484. }