Ws.php 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Protocols;
  16. use Exception;
  17. use Throwable;
  18. use Workerman\Connection\AsyncTcpConnection;
  19. use Workerman\Connection\ConnectionInterface;
  20. use Workerman\Protocols\Http\Response;
  21. use Workerman\Timer;
  22. use Workerman\Worker;
  23. use function base64_encode;
  24. use function bin2hex;
  25. use function floor;
  26. use function gettype;
  27. use function is_array;
  28. use function is_scalar;
  29. use function ord;
  30. use function pack;
  31. use function preg_match;
  32. use function sha1;
  33. use function str_repeat;
  34. use function strlen;
  35. use function strpos;
  36. use function substr;
  37. use function trim;
  38. use function unpack;
  39. /**
  40. * Websocket protocol for client.
  41. */
  42. class Ws
  43. {
  44. /**
  45. * Websocket blob type.
  46. *
  47. * @var string
  48. */
  49. public const BINARY_TYPE_BLOB = "\x81";
  50. /**
  51. * Websocket arraybuffer type.
  52. *
  53. * @var string
  54. */
  55. public const BINARY_TYPE_ARRAYBUFFER = "\x82";
  56. /**
  57. * Check the integrity of the package.
  58. *
  59. * @param string $buffer
  60. * @param AsyncTcpConnection $connection
  61. * @return int|false
  62. * @throws Throwable
  63. */
  64. public static function input(string $buffer, AsyncTcpConnection $connection): bool|int
  65. {
  66. if (empty($connection->context->handshakeStep)) {
  67. Worker::safeEcho("recv data before handshake. Buffer:" . bin2hex($buffer) . "\n");
  68. return false;
  69. }
  70. // Recv handshake response
  71. if ($connection->context->handshakeStep === 1) {
  72. return self::dealHandshake($buffer, $connection);
  73. }
  74. $recvLen = strlen($buffer);
  75. if ($recvLen < 2) {
  76. return 0;
  77. }
  78. // Buffer websocket frame data.
  79. if ($connection->context->websocketCurrentFrameLength) {
  80. // We need more frame data.
  81. if ($connection->context->websocketCurrentFrameLength > $recvLen) {
  82. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  83. return 0;
  84. }
  85. } else {
  86. $firstByte = ord($buffer[0]);
  87. $secondByte = ord($buffer[1]);
  88. $dataLen = $secondByte & 127;
  89. $isFinFrame = $firstByte >> 7;
  90. $masked = $secondByte >> 7;
  91. if ($masked) {
  92. Worker::safeEcho("frame masked so close the connection\n");
  93. $connection->close();
  94. return 0;
  95. }
  96. $opcode = $firstByte & 0xf;
  97. switch ($opcode) {
  98. case 0x0:
  99. // Blob type.
  100. case 0x1:
  101. // Arraybuffer type.
  102. case 0x2:
  103. // Ping package.
  104. case 0x9:
  105. // Pong package.
  106. case 0xa:
  107. break;
  108. // Close package.
  109. case 0x8:
  110. // Try to emit onWebSocketClose callback.
  111. if (isset($connection->onWebSocketClose)) {
  112. try {
  113. ($connection->onWebSocketClose)($connection);
  114. } catch (Throwable $e) {
  115. Worker::stopAll(250, $e);
  116. }
  117. } // Close connection.
  118. else {
  119. $connection->close();
  120. }
  121. return 0;
  122. // Wrong opcode.
  123. default :
  124. Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n");
  125. $connection->close();
  126. return 0;
  127. }
  128. // Calculate packet length.
  129. if ($dataLen === 126) {
  130. if (strlen($buffer) < 4) {
  131. return 0;
  132. }
  133. $pack = unpack('nn/ntotal_len', $buffer);
  134. $currentFrameLength = $pack['total_len'] + 4;
  135. } else if ($dataLen === 127) {
  136. if (strlen($buffer) < 10) {
  137. return 0;
  138. }
  139. $arr = unpack('n/N2c', $buffer);
  140. $currentFrameLength = $arr['c1'] * 4294967296 + $arr['c2'] + 10;
  141. } else {
  142. $currentFrameLength = $dataLen + 2;
  143. }
  144. $totalPackageSize = strlen($connection->context->websocketDataBuffer) + $currentFrameLength;
  145. if ($totalPackageSize > $connection->maxPackageSize) {
  146. Worker::safeEcho("error package. package_length=$totalPackageSize\n");
  147. $connection->close();
  148. return 0;
  149. }
  150. if ($isFinFrame) {
  151. if ($opcode === 0x9) {
  152. if ($recvLen >= $currentFrameLength) {
  153. $pingData = static::decode(substr($buffer, 0, $currentFrameLength), $connection);
  154. $connection->consumeRecvBuffer($currentFrameLength);
  155. $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  156. $connection->websocketType = "\x8a";
  157. if (isset($connection->onWebSocketPing)) {
  158. try {
  159. ($connection->onWebSocketPing)($connection, $pingData);
  160. } catch (Throwable $e) {
  161. Worker::stopAll(250, $e);
  162. }
  163. } else {
  164. $connection->send($pingData);
  165. }
  166. $connection->websocketType = $tmpConnectionType;
  167. if ($recvLen > $currentFrameLength) {
  168. return static::input(substr($buffer, $currentFrameLength), $connection);
  169. }
  170. }
  171. return 0;
  172. }
  173. if ($opcode === 0xa) {
  174. if ($recvLen >= $currentFrameLength) {
  175. $pongData = static::decode(substr($buffer, 0, $currentFrameLength), $connection);
  176. $connection->consumeRecvBuffer($currentFrameLength);
  177. $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  178. $connection->websocketType = "\x8a";
  179. // Try to emit onWebSocketPong callback.
  180. if (isset($connection->onWebSocketPong)) {
  181. try {
  182. ($connection->onWebSocketPong)($connection, $pongData);
  183. } catch (Throwable $e) {
  184. Worker::stopAll(250, $e);
  185. }
  186. }
  187. $connection->websocketType = $tmpConnectionType;
  188. if ($recvLen > $currentFrameLength) {
  189. return static::input(substr($buffer, $currentFrameLength), $connection);
  190. }
  191. }
  192. return 0;
  193. }
  194. return $currentFrameLength;
  195. }
  196. $connection->context->websocketCurrentFrameLength = $currentFrameLength;
  197. }
  198. // Received just a frame length data.
  199. if ($connection->context->websocketCurrentFrameLength === $recvLen) {
  200. self::decode($buffer, $connection);
  201. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  202. $connection->context->websocketCurrentFrameLength = 0;
  203. return 0;
  204. } // The length of the received data is greater than the length of a frame.
  205. elseif ($connection->context->websocketCurrentFrameLength < $recvLen) {
  206. self::decode(substr($buffer, 0, $connection->context->websocketCurrentFrameLength), $connection);
  207. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  208. $currentFrameLength = $connection->context->websocketCurrentFrameLength;
  209. $connection->context->websocketCurrentFrameLength = 0;
  210. // Continue to read next frame.
  211. return self::input(substr($buffer, $currentFrameLength), $connection);
  212. } // The length of the received data is less than the length of a frame.
  213. else {
  214. return 0;
  215. }
  216. }
  217. /**
  218. * Websocket encode.
  219. *
  220. * @param string $payload
  221. * @param AsyncTcpConnection $connection
  222. * @return string
  223. * @throws Throwable
  224. */
  225. public static function encode(string $payload, AsyncTcpConnection $connection): string
  226. {
  227. if (empty($connection->websocketType)) {
  228. $connection->websocketType = self::BINARY_TYPE_BLOB;
  229. }
  230. if (empty($connection->context->handshakeStep)) {
  231. static::sendHandshake($connection);
  232. }
  233. $maskKey = "\x00\x00\x00\x00";
  234. $length = strlen($payload);
  235. if (strlen($payload) < 126) {
  236. $head = chr(0x80 | $length);
  237. } elseif ($length < 0xFFFF) {
  238. $head = chr(0x80 | 126) . pack("n", $length);
  239. } else {
  240. $head = chr(0x80 | 127) . pack("N", 0) . pack("N", $length);
  241. }
  242. $frame = $connection->websocketType . $head . $maskKey;
  243. // append payload to frame:
  244. $maskKey = str_repeat($maskKey, (int)floor($length / 4)) . substr($maskKey, 0, $length % 4);
  245. $frame .= $payload ^ $maskKey;
  246. if ($connection->context->handshakeStep === 1) {
  247. // If buffer has already full then discard the current package.
  248. if (strlen($connection->context->tmpWebsocketData) > $connection->maxSendBufferSize) {
  249. if ($connection->onError) {
  250. try {
  251. ($connection->onError)($connection, ConnectionInterface::SEND_FAIL, 'send buffer full and drop package');
  252. } catch (Throwable $e) {
  253. Worker::stopAll(250, $e);
  254. }
  255. }
  256. return '';
  257. }
  258. $connection->context->tmpWebsocketData .= $frame;
  259. // Check buffer is full.
  260. if ($connection->onBufferFull && $connection->maxSendBufferSize <= strlen($connection->context->tmpWebsocketData)) {
  261. try {
  262. ($connection->onBufferFull)($connection);
  263. } catch (Throwable $e) {
  264. Worker::stopAll(250, $e);
  265. }
  266. }
  267. return '';
  268. }
  269. return $frame;
  270. }
  271. /**
  272. * Websocket decode.
  273. *
  274. * @param string $bytes
  275. * @param AsyncTcpConnection $connection
  276. * @return string
  277. */
  278. public static function decode(string $bytes, AsyncTcpConnection $connection): string
  279. {
  280. $dataLength = ord($bytes[1]);
  281. if ($dataLength === 126) {
  282. $decodedData = substr($bytes, 4);
  283. } else if ($dataLength === 127) {
  284. $decodedData = substr($bytes, 10);
  285. } else {
  286. $decodedData = substr($bytes, 2);
  287. }
  288. if ($connection->context->websocketCurrentFrameLength) {
  289. $connection->context->websocketDataBuffer .= $decodedData;
  290. return $connection->context->websocketDataBuffer;
  291. }
  292. if ($connection->context->websocketDataBuffer !== '') {
  293. $decodedData = $connection->context->websocketDataBuffer . $decodedData;
  294. $connection->context->websocketDataBuffer = '';
  295. }
  296. return $decodedData;
  297. }
  298. /**
  299. * Send websocket handshake data.
  300. *
  301. * @param AsyncTcpConnection $connection
  302. * @return void
  303. * @throws Throwable
  304. */
  305. public static function onConnect(AsyncTcpConnection $connection): void
  306. {
  307. static::sendHandshake($connection);
  308. }
  309. /**
  310. * Clean
  311. *
  312. * @param AsyncTcpConnection $connection
  313. */
  314. public static function onClose(AsyncTcpConnection $connection): void
  315. {
  316. $connection->context->handshakeStep = null;
  317. $connection->context->websocketCurrentFrameLength = 0;
  318. $connection->context->tmpWebsocketData = '';
  319. $connection->context->websocketDataBuffer = '';
  320. if (!empty($connection->context->websocketPingTimer)) {
  321. Timer::del($connection->context->websocketPingTimer);
  322. $connection->context->websocketPingTimer = null;
  323. }
  324. }
  325. /**
  326. * Send websocket handshake.
  327. *
  328. * @param AsyncTcpConnection $connection
  329. * @return void
  330. * @throws Throwable
  331. */
  332. public static function sendHandshake(AsyncTcpConnection $connection): void
  333. {
  334. if (!empty($connection->context->handshakeStep)) {
  335. return;
  336. }
  337. // Get Host.
  338. $port = $connection->getRemotePort();
  339. $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
  340. // Handshake header.
  341. $connection->context->websocketSecKey = base64_encode(random_bytes(16));
  342. $userHeader = $connection->headers ?? null;
  343. $userHeaderStr = '';
  344. if (!empty($userHeader)) {
  345. foreach ($userHeader as $k => $v) {
  346. $userHeaderStr .= "$k: $v\r\n";
  347. }
  348. $userHeaderStr = "\r\n" . trim($userHeaderStr);
  349. }
  350. $header = 'GET ' . $connection->getRemoteURI() . " HTTP/1.1\r\n" .
  351. (!preg_match("/\nHost:/i", $userHeaderStr) ? "Host: $host\r\n" : '') .
  352. "Connection: Upgrade\r\n" .
  353. "Upgrade: websocket\r\n" .
  354. (isset($connection->websocketOrigin) ? "Origin: " . $connection->websocketOrigin . "\r\n" : '') .
  355. (isset($connection->websocketClientProtocol) ? "Sec-WebSocket-Protocol: " . $connection->websocketClientProtocol . "\r\n" : '') .
  356. "Sec-WebSocket-Version: 13\r\n" .
  357. "Sec-WebSocket-Key: " . $connection->context->websocketSecKey . $userHeaderStr . "\r\n\r\n";
  358. $connection->send($header, true);
  359. $connection->context->handshakeStep = 1;
  360. $connection->context->websocketCurrentFrameLength = 0;
  361. $connection->context->websocketDataBuffer = '';
  362. $connection->context->tmpWebsocketData = '';
  363. }
  364. /**
  365. * Websocket handshake.
  366. *
  367. * @param string $buffer
  368. * @param AsyncTcpConnection $connection
  369. * @return bool|int
  370. * @throws Throwable
  371. */
  372. public static function dealHandshake(string $buffer, AsyncTcpConnection $connection): bool|int
  373. {
  374. $pos = strpos($buffer, "\r\n\r\n");
  375. if ($pos) {
  376. //checking Sec-WebSocket-Accept
  377. if (preg_match("/Sec-WebSocket-Accept: *(.*?)\r\n/i", $buffer, $match)) {
  378. if ($match[1] !== base64_encode(sha1($connection->context->websocketSecKey . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true))) {
  379. Worker::safeEcho("Sec-WebSocket-Accept not match. Header:\n" . substr($buffer, 0, $pos) . "\n");
  380. $connection->close();
  381. return 0;
  382. }
  383. } else {
  384. Worker::safeEcho("Sec-WebSocket-Accept not found. Header:\n" . substr($buffer, 0, $pos) . "\n");
  385. $connection->close();
  386. return 0;
  387. }
  388. // handshake complete
  389. $connection->context->handshakeStep = 2;
  390. $handshakeResponseLength = $pos + 4;
  391. $buffer = substr($buffer, 0, $handshakeResponseLength);
  392. $response = static::parseResponse($buffer);
  393. // Try to emit onWebSocketConnect callback.
  394. if (isset($connection->onWebSocketConnect)) {
  395. try {
  396. ($connection->onWebSocketConnect)($connection, $response);
  397. } catch (Throwable $e) {
  398. Worker::stopAll(250, $e);
  399. }
  400. }
  401. // Headbeat.
  402. if (!empty($connection->websocketPingInterval)) {
  403. $connection->context->websocketPingTimer = Timer::add($connection->websocketPingInterval, function () use ($connection) {
  404. if (false === $connection->send(pack('H*', '898000000000'), true)) {
  405. Timer::del($connection->context->websocketPingTimer);
  406. $connection->context->websocketPingTimer = null;
  407. }
  408. });
  409. }
  410. $connection->consumeRecvBuffer($handshakeResponseLength);
  411. if (!empty($connection->context->tmpWebsocketData)) {
  412. $connection->send($connection->context->tmpWebsocketData, true);
  413. $connection->context->tmpWebsocketData = '';
  414. }
  415. if (strlen($buffer) > $handshakeResponseLength) {
  416. return self::input(substr($buffer, $handshakeResponseLength), $connection);
  417. }
  418. }
  419. return 0;
  420. }
  421. /**
  422. * Parse response.
  423. *
  424. * @param string $buffer
  425. * @return Response
  426. */
  427. protected static function parseResponse(string $buffer): Response
  428. {
  429. [$http_header, ] = \explode("\r\n\r\n", $buffer, 2);
  430. $header_data = \explode("\r\n", $http_header);
  431. [$protocol, $status, $phrase] = \explode(' ', $header_data[0], 3);
  432. $protocolVersion = substr($protocol, 5);
  433. unset($header_data[0]);
  434. $headers = [];
  435. foreach ($header_data as $content) {
  436. // \r\n\r\n
  437. if (empty($content)) {
  438. continue;
  439. }
  440. list($key, $value) = \explode(':', $content, 2);
  441. $value = \trim($value);
  442. $headers[$key] = $value;
  443. }
  444. return (new Response())->withStatus((int)$status, $phrase)->withHeaders($headers)->withProtocolVersion($protocolVersion);
  445. }
  446. }