Ws.php 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. declare(strict_types=1);
  15. namespace Workerman\Protocols;
  16. use Exception;
  17. use Throwable;
  18. use Workerman\Connection\AsyncTcpConnection;
  19. use Workerman\Connection\ConnectionInterface;
  20. use Workerman\Timer;
  21. use Workerman\Worker;
  22. /**
  23. * Websocket protocol for client.
  24. */
  25. class Ws
  26. {
  27. /**
  28. * Websocket blob type.
  29. *
  30. * @var string
  31. */
  32. const BINARY_TYPE_BLOB = "\x81";
  33. /**
  34. * Websocket arraybuffer type.
  35. *
  36. * @var string
  37. */
  38. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  39. /**
  40. * Check the integrity of the package.
  41. *
  42. * @param string $buffer
  43. * @param AsyncTcpConnection $connection
  44. * @return int|false
  45. * @throws Throwable
  46. */
  47. public static function input(string $buffer, AsyncTcpConnection $connection): bool|int
  48. {
  49. if (empty($connection->context->handshakeStep)) {
  50. Worker::safeEcho("recv data before handshake. Buffer:" . bin2hex($buffer) . "\n");
  51. return false;
  52. }
  53. // Recv handshake response
  54. if ($connection->context->handshakeStep === 1) {
  55. return self::dealHandshake($buffer, $connection);
  56. }
  57. $recvLen = strlen($buffer);
  58. if ($recvLen < 2) {
  59. return 0;
  60. }
  61. // Buffer websocket frame data.
  62. if ($connection->context->websocketCurrentFrameLength) {
  63. // We need more frame data.
  64. if ($connection->context->websocketCurrentFrameLength > $recvLen) {
  65. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  66. return 0;
  67. }
  68. } else {
  69. $firstByte = ord($buffer[0]);
  70. $secondByte = ord($buffer[1]);
  71. $dataLen = $secondByte & 127;
  72. $isFinFrame = $firstByte >> 7;
  73. $masked = $secondByte >> 7;
  74. if ($masked) {
  75. Worker::safeEcho("frame masked so close the connection\n");
  76. $connection->close();
  77. return 0;
  78. }
  79. $opcode = $firstByte & 0xf;
  80. switch ($opcode) {
  81. case 0x0:
  82. // Blob type.
  83. case 0x1:
  84. // Arraybuffer type.
  85. case 0x2:
  86. // Ping package.
  87. case 0x9:
  88. // Pong package.
  89. case 0xa:
  90. break;
  91. // Close package.
  92. case 0x8:
  93. // Try to emit onWebSocketClose callback.
  94. if (isset($connection->onWebSocketClose)) {
  95. try {
  96. ($connection->onWebSocketClose)($connection);
  97. } catch (Throwable $e) {
  98. Worker::stopAll(250, $e);
  99. }
  100. } // Close connection.
  101. else {
  102. $connection->close();
  103. }
  104. return 0;
  105. // Wrong opcode.
  106. default :
  107. Worker::safeEcho("error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n");
  108. $connection->close();
  109. return 0;
  110. }
  111. // Calculate packet length.
  112. if ($dataLen === 126) {
  113. if (strlen($buffer) < 4) {
  114. return 0;
  115. }
  116. $pack = unpack('nn/ntotal_len', $buffer);
  117. $currentFrameLength = $pack['total_len'] + 4;
  118. } else if ($dataLen === 127) {
  119. if (strlen($buffer) < 10) {
  120. return 0;
  121. }
  122. $arr = unpack('n/N2c', $buffer);
  123. $currentFrameLength = $arr['c1'] * 4294967296 + $arr['c2'] + 10;
  124. } else {
  125. $currentFrameLength = $dataLen + 2;
  126. }
  127. $totalPackageSize = strlen($connection->context->websocketDataBuffer) + $currentFrameLength;
  128. if ($totalPackageSize > $connection->maxPackageSize) {
  129. Worker::safeEcho("error package. package_length=$totalPackageSize\n");
  130. $connection->close();
  131. return 0;
  132. }
  133. if ($isFinFrame) {
  134. if ($opcode === 0x9) {
  135. if ($recvLen >= $currentFrameLength) {
  136. $pingData = static::decode(substr($buffer, 0, $currentFrameLength), $connection);
  137. $connection->consumeRecvBuffer($currentFrameLength);
  138. $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  139. $connection->websocketType = "\x8a";
  140. if (isset($connection->onWebSocketPing)) {
  141. try {
  142. ($connection->onWebSocketPing)($connection, $pingData);
  143. } catch (Throwable $e) {
  144. Worker::stopAll(250, $e);
  145. }
  146. } else {
  147. $connection->send($pingData);
  148. }
  149. $connection->websocketType = $tmpConnectionType;
  150. if ($recvLen > $currentFrameLength) {
  151. return static::input(substr($buffer, $currentFrameLength), $connection);
  152. }
  153. }
  154. return 0;
  155. } else if ($opcode === 0xa) {
  156. if ($recvLen >= $currentFrameLength) {
  157. $pongData = static::decode(substr($buffer, 0, $currentFrameLength), $connection);
  158. $connection->consumeRecvBuffer($currentFrameLength);
  159. $tmpConnectionType = $connection->websocketType ?? static::BINARY_TYPE_BLOB;
  160. $connection->websocketType = "\x8a";
  161. // Try to emit onWebSocketPong callback.
  162. if (isset($connection->onWebSocketPong)) {
  163. try {
  164. ($connection->onWebSocketPong)($connection, $pongData);
  165. } catch (Throwable $e) {
  166. Worker::stopAll(250, $e);
  167. }
  168. }
  169. $connection->websocketType = $tmpConnectionType;
  170. if ($recvLen > $currentFrameLength) {
  171. return static::input(substr($buffer, $currentFrameLength), $connection);
  172. }
  173. }
  174. return 0;
  175. }
  176. return $currentFrameLength;
  177. } else {
  178. $connection->context->websocketCurrentFrameLength = $currentFrameLength;
  179. }
  180. }
  181. // Received just a frame length data.
  182. if ($connection->context->websocketCurrentFrameLength === $recvLen) {
  183. self::decode($buffer, $connection);
  184. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  185. $connection->context->websocketCurrentFrameLength = 0;
  186. return 0;
  187. } // The length of the received data is greater than the length of a frame.
  188. elseif ($connection->context->websocketCurrentFrameLength < $recvLen) {
  189. self::decode(substr($buffer, 0, $connection->context->websocketCurrentFrameLength), $connection);
  190. $connection->consumeRecvBuffer($connection->context->websocketCurrentFrameLength);
  191. $currentFrameLength = $connection->context->websocketCurrentFrameLength;
  192. $connection->context->websocketCurrentFrameLength = 0;
  193. // Continue to read next frame.
  194. return self::input(substr($buffer, $currentFrameLength), $connection);
  195. } // The length of the received data is less than the length of a frame.
  196. else {
  197. return 0;
  198. }
  199. }
  200. /**
  201. * Websocket encode.
  202. *
  203. * @param mixed $payload
  204. * @param AsyncTcpConnection $connection
  205. * @return string
  206. * @throws Throwable
  207. */
  208. public static function encode(mixed $payload, AsyncTcpConnection $connection): string
  209. {
  210. if (!is_scalar($payload)) {
  211. throw new Exception("You can't send(" . gettype($payload) . ") to client, you need to convert it to string. ");
  212. }
  213. if (empty($connection->websocketType)) {
  214. $connection->websocketType = self::BINARY_TYPE_BLOB;
  215. }
  216. if (empty($connection->context->handshakeStep)) {
  217. static::sendHandshake($connection);
  218. }
  219. $maskKey = "\x00\x00\x00\x00";
  220. $length = strlen($payload);
  221. if (strlen($payload) < 126) {
  222. $head = chr(0x80 | $length);
  223. } elseif ($length < 0xFFFF) {
  224. $head = chr(0x80 | 126) . pack("n", $length);
  225. } else {
  226. $head = chr(0x80 | 127) . pack("N", 0) . pack("N", $length);
  227. }
  228. $frame = $connection->websocketType . $head . $maskKey;
  229. // append payload to frame:
  230. $maskKey = str_repeat($maskKey, (int)floor($length / 4)) . substr($maskKey, 0, $length % 4);
  231. $frame .= $payload ^ $maskKey;
  232. if ($connection->context->handshakeStep === 1) {
  233. // If buffer has already full then discard the current package.
  234. if (strlen($connection->context->tmpWebsocketData) > $connection->maxSendBufferSize) {
  235. if ($connection->onError) {
  236. try {
  237. ($connection->onError)($connection, ConnectionInterface::SEND_FAIL, 'send buffer full and drop package');
  238. } catch (Throwable $e) {
  239. Worker::stopAll(250, $e);
  240. }
  241. }
  242. return '';
  243. }
  244. $connection->context->tmpWebsocketData = $connection->context->tmpWebsocketData . $frame;
  245. // Check buffer is full.
  246. if ($connection->maxSendBufferSize <= strlen($connection->context->tmpWebsocketData)) {
  247. if ($connection->onBufferFull) {
  248. try {
  249. ($connection->onBufferFull)($connection);
  250. } catch (Throwable $e) {
  251. Worker::stopAll(250, $e);
  252. }
  253. }
  254. }
  255. return '';
  256. }
  257. return $frame;
  258. }
  259. /**
  260. * Websocket decode.
  261. *
  262. * @param string $bytes
  263. * @param AsyncTcpConnection $connection
  264. * @return string
  265. */
  266. public static function decode(string $bytes, AsyncTcpConnection $connection): string
  267. {
  268. $dataLength = ord($bytes[1]);
  269. if ($dataLength === 126) {
  270. $decodedData = substr($bytes, 4);
  271. } else if ($dataLength === 127) {
  272. $decodedData = substr($bytes, 10);
  273. } else {
  274. $decodedData = substr($bytes, 2);
  275. }
  276. if ($connection->context->websocketCurrentFrameLength) {
  277. $connection->context->websocketDataBuffer .= $decodedData;
  278. return $connection->context->websocketDataBuffer;
  279. } else {
  280. if ($connection->context->websocketDataBuffer !== '') {
  281. $decodedData = $connection->context->websocketDataBuffer . $decodedData;
  282. $connection->context->websocketDataBuffer = '';
  283. }
  284. return $decodedData;
  285. }
  286. }
  287. /**
  288. * Send websocket handshake data.
  289. *
  290. * @param AsyncTcpConnection $connection
  291. * @return void
  292. * @throws Throwable
  293. */
  294. public static function onConnect(AsyncTcpConnection $connection): void
  295. {
  296. static::sendHandshake($connection);
  297. }
  298. /**
  299. * Clean
  300. *
  301. * @param AsyncTcpConnection $connection
  302. */
  303. public static function onClose(AsyncTcpConnection $connection): void
  304. {
  305. $connection->context->handshakeStep = null;
  306. $connection->context->websocketCurrentFrameLength = 0;
  307. $connection->context->tmpWebsocketData = '';
  308. $connection->context->websocketDataBuffer = '';
  309. if (!empty($connection->context->websocketPingTimer)) {
  310. Timer::del($connection->context->websocketPingTimer);
  311. $connection->context->websocketPingTimer = null;
  312. }
  313. }
  314. /**
  315. * Send websocket handshake.
  316. *
  317. * @param AsyncTcpConnection $connection
  318. * @return void
  319. * @throws Throwable
  320. */
  321. public static function sendHandshake(AsyncTcpConnection $connection): void
  322. {
  323. if (!empty($connection->context->handshakeStep)) {
  324. return;
  325. }
  326. // Get Host.
  327. $port = $connection->getRemotePort();
  328. $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
  329. // Handshake header.
  330. $connection->context->websocketSecKey = base64_encode(random_bytes(16));
  331. $userHeader = $connection->headers ?? null;
  332. $userHeaderStr = '';
  333. if (!empty($userHeader)) {
  334. if (is_array($userHeader)) {
  335. foreach ($userHeader as $k => $v) {
  336. $userHeaderStr .= "$k: $v\r\n";
  337. }
  338. } else {
  339. $userHeaderStr .= $userHeader;
  340. }
  341. $userHeaderStr = "\r\n" . trim($userHeaderStr);
  342. }
  343. $header = 'GET ' . $connection->getRemoteURI() . " HTTP/1.1\r\n" .
  344. (!preg_match("/\nHost:/i", $userHeaderStr) ? "Host: $host\r\n" : '') .
  345. "Connection: Upgrade\r\n" .
  346. "Upgrade: websocket\r\n" .
  347. (isset($connection->websocketOrigin) ? "Origin: " . $connection->websocketOrigin . "\r\n" : '') .
  348. (isset($connection->websocketClientProtocol) ? "Sec-WebSocket-Protocol: " . $connection->websocketClientProtocol . "\r\n" : '') .
  349. "Sec-WebSocket-Version: 13\r\n" .
  350. "Sec-WebSocket-Key: " . $connection->context->websocketSecKey . $userHeaderStr . "\r\n\r\n";
  351. $connection->send($header, true);
  352. $connection->context->handshakeStep = 1;
  353. $connection->context->websocketCurrentFrameLength = 0;
  354. $connection->context->websocketDataBuffer = '';
  355. $connection->context->tmpWebsocketData = '';
  356. }
  357. /**
  358. * Websocket handshake.
  359. *
  360. * @param string $buffer
  361. * @param AsyncTcpConnection $connection
  362. * @return bool|int
  363. * @throws Throwable
  364. */
  365. public static function dealHandshake(string $buffer, AsyncTcpConnection $connection): bool|int
  366. {
  367. $pos = strpos($buffer, "\r\n\r\n");
  368. if ($pos) {
  369. //checking Sec-WebSocket-Accept
  370. if (preg_match("/Sec-WebSocket-Accept: *(.*?)\r\n/i", $buffer, $match)) {
  371. if ($match[1] !== base64_encode(sha1($connection->context->websocketSecKey . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true))) {
  372. Worker::safeEcho("Sec-WebSocket-Accept not match. Header:\n" . substr($buffer, 0, $pos) . "\n");
  373. $connection->close();
  374. return 0;
  375. }
  376. } else {
  377. Worker::safeEcho("Sec-WebSocket-Accept not found. Header:\n" . substr($buffer, 0, $pos) . "\n");
  378. $connection->close();
  379. return 0;
  380. }
  381. // handshake complete
  382. $connection->context->handshakeStep = 2;
  383. $handshakeResponseLength = $pos + 4;
  384. // Try to emit onWebSocketConnect callback.
  385. if (isset($connection->onWebSocketConnect)) {
  386. try {
  387. ($connection->onWebSocketConnect)($connection, substr($buffer, 0, $handshakeResponseLength));
  388. } catch (Throwable $e) {
  389. Worker::stopAll(250, $e);
  390. }
  391. }
  392. // Headbeat.
  393. if (!empty($connection->websocketPingInterval)) {
  394. $connection->context->websocketPingTimer = Timer::add($connection->websocketPingInterval, function () use ($connection) {
  395. if (false === $connection->send(pack('H*', '898000000000'), true)) {
  396. Timer::del($connection->context->websocketPingTimer);
  397. $connection->context->websocketPingTimer = null;
  398. }
  399. });
  400. }
  401. $connection->consumeRecvBuffer($handshakeResponseLength);
  402. if (!empty($connection->context->tmpWebsocketData)) {
  403. $connection->send($connection->context->tmpWebsocketData, true);
  404. $connection->context->tmpWebsocketData = '';
  405. }
  406. if (strlen($buffer) > $handshakeResponseLength) {
  407. return self::input(substr($buffer, $handshakeResponseLength), $connection);
  408. }
  409. }
  410. return 0;
  411. }
  412. }