Ws.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. <?php
  2. namespace Workerman\Protocols;
  3. use Workerman\Worker;
  4. use Workerman\Lib\Timer;
  5. use Workerman\Connection\TcpConnection;
  6. /**
  7. * Websocket protocol for client.
  8. */
  9. class Ws
  10. {
  11. /**
  12. * Minimum head length of websocket protocol.
  13. *
  14. * @var int
  15. */
  16. const MIN_HEAD_LEN = 2;
  17. /**
  18. * Websocket blob type.
  19. *
  20. * @var string
  21. */
  22. const BINARY_TYPE_BLOB = "\x81";
  23. /**
  24. * Websocket arraybuffer type.
  25. *
  26. * @var string
  27. */
  28. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  29. /**
  30. * Check the integrity of the package.
  31. *
  32. * @param string $buffer
  33. * @param ConnectionInterface $connection
  34. * @return int
  35. */
  36. public static function input($buffer, $connection)
  37. {
  38. if (empty($connection->handshakeStep)) {
  39. echo "recv data before handshake. Buffer:" . bin2hex($buffer) . "\n";
  40. return false;
  41. }
  42. // Recv handshake response
  43. if ($connection->handshakeStep === 1) {
  44. return self::dealHandshake($buffer, $connection);
  45. }
  46. $recv_len = strlen($buffer);
  47. if ($recv_len < self::MIN_HEAD_LEN) {
  48. return 0;
  49. }
  50. // Buffer websocket frame data.
  51. if ($connection->websocketCurrentFrameLength) {
  52. // We need more frame data.
  53. if ($connection->websocketCurrentFrameLength > $recv_len) {
  54. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  55. return 0;
  56. }
  57. } else {
  58. $data_len = ord($buffer[1]) & 127;
  59. $firstbyte = ord($buffer[0]);
  60. $is_fin_frame = $firstbyte >> 7;
  61. $opcode = $firstbyte & 0xf;
  62. switch ($opcode) {
  63. case 0x0:
  64. break;
  65. // Blob type.
  66. case 0x1:
  67. break;
  68. // Arraybuffer type.
  69. case 0x2:
  70. break;
  71. // Close package.
  72. case 0x8:
  73. // Try to emit onWebSocketClose callback.
  74. if (isset($connection->onWebSocketClose)) {
  75. try {
  76. call_user_func($connection->onWebSocketClose, $connection);
  77. } catch (\Exception $e) {
  78. Worker::log($e);
  79. exit(250);
  80. } catch (\Error $e) {
  81. Worker::log($e);
  82. exit(250);
  83. }
  84. } // Close connection.
  85. else {
  86. $connection->close();
  87. }
  88. return 0;
  89. // Ping package.
  90. case 0x9:
  91. // Try to emit onWebSocketPing callback.
  92. if (isset($connection->onWebSocketPing)) {
  93. try {
  94. call_user_func($connection->onWebSocketPing, $connection);
  95. } catch (\Exception $e) {
  96. Worker::log($e);
  97. exit(250);
  98. } catch (\Error $e) {
  99. Worker::log($e);
  100. exit(250);
  101. }
  102. } // Send pong package to client.
  103. else {
  104. $connection->send(pack('H*', '8a00'), true);
  105. }
  106. // Consume data from receive buffer.
  107. if (!$data_len) {
  108. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  109. if ($recv_len > self::MIN_HEAD_LEN) {
  110. return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
  111. }
  112. return 0;
  113. }
  114. break;
  115. // Pong package.
  116. case 0xa:
  117. // Try to emit onWebSocketPong callback.
  118. if (isset($connection->onWebSocketPong)) {
  119. try {
  120. call_user_func($connection->onWebSocketPong, $connection);
  121. } catch (\Exception $e) {
  122. Worker::log($e);
  123. exit(250);
  124. } catch (\Error $e) {
  125. Worker::log($e);
  126. exit(250);
  127. }
  128. }
  129. // Consume data from receive buffer.
  130. if (!$data_len) {
  131. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  132. if ($recv_len > self::MIN_HEAD_LEN) {
  133. return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
  134. }
  135. return 0;
  136. }
  137. break;
  138. // Wrong opcode.
  139. default :
  140. echo "error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n";
  141. $connection->close();
  142. return 0;
  143. }
  144. // Calculate packet length.
  145. if ($data_len === 126) {
  146. if (strlen($buffer) < 6) {
  147. return 0;
  148. }
  149. $pack = unpack('nn/ntotal_len', $buffer);
  150. $current_frame_length = $pack['total_len'] + 4;
  151. } else if ($data_len === 127) {
  152. if (strlen($buffer) < 10) {
  153. return 0;
  154. }
  155. $arr = unpack('n/N2c', $buffer);
  156. $current_frame_length = $arr['c1']*4294967296 + $arr['c2'] + 10;
  157. } else {
  158. $current_frame_length = $data_len + 2;
  159. }
  160. $total_package_size = strlen($connection->websocketDataBuffer) + $current_frame_length;
  161. if ($total_package_size > TcpConnection::$maxPackageSize) {
  162. echo "error package. package_length=$total_package_size\n";
  163. $connection->close();
  164. return 0;
  165. }
  166. if ($is_fin_frame) {
  167. return $current_frame_length;
  168. } else {
  169. $connection->websocketCurrentFrameLength = $current_frame_length;
  170. }
  171. }
  172. // Received just a frame length data.
  173. if ($connection->websocketCurrentFrameLength === $recv_len) {
  174. self::decode($buffer, $connection);
  175. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  176. $connection->websocketCurrentFrameLength = 0;
  177. return 0;
  178. } // The length of the received data is greater than the length of a frame.
  179. elseif ($connection->websocketCurrentFrameLength < $recv_len) {
  180. self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
  181. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  182. $current_frame_length = $connection->websocketCurrentFrameLength;
  183. $connection->websocketCurrentFrameLength = 0;
  184. // Continue to read next frame.
  185. return self::input(substr($buffer, $current_frame_length), $connection);
  186. } // The length of the received data is less than the length of a frame.
  187. else {
  188. return 0;
  189. }
  190. }
  191. /**
  192. * Websocket encode.
  193. *
  194. * @param string $buffer
  195. * @param ConnectionInterface $connection
  196. * @return string
  197. */
  198. public static function encode($payload, $connection)
  199. {
  200. if (empty($connection->websocketType)) {
  201. $connection->websocketType = self::BINARY_TYPE_BLOB;
  202. }
  203. $payload = (string)$payload;
  204. if (empty($connection->handshakeStep)) {
  205. self::sendHandshake($connection);
  206. }
  207. $mask = 1;
  208. $mask_key = "\x00\x00\x00\x00";
  209. $pack = '';
  210. $length = $length_flag = strlen($payload);
  211. if (65535 < $length) {
  212. $pack = pack('NN', ($length & 0xFFFFFFFF00000000) >> 32, $length & 0x00000000FFFFFFFF);
  213. $length_flag = 127;
  214. } else if (125 < $length) {
  215. $pack = pack('n*', $length);
  216. $length_flag = 126;
  217. }
  218. $head = ($mask << 7) | $length_flag;
  219. $head = $connection->websocketType . chr($head) . $pack;
  220. $frame = $head . $mask_key;
  221. // append payload to frame:
  222. for ($i = 0; $i < $length; $i++) {
  223. $frame .= $payload[$i] ^ $mask_key[$i % 4];
  224. }
  225. if ($connection->handshakeStep === 1) {
  226. // If buffer has already full then discard the current package.
  227. if (strlen($connection->tmpWebsocketData) > $connection->maxSendBufferSize) {
  228. if ($connection->onError) {
  229. try {
  230. call_user_func($connection->onError, $connection, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
  231. } catch (\Exception $e) {
  232. Worker::log($e);
  233. exit(250);
  234. } catch (\Error $e) {
  235. Worker::log($e);
  236. exit(250);
  237. }
  238. }
  239. return '';
  240. }
  241. $connection->tmpWebsocketData = $connection->tmpWebsocketData . $frame;
  242. // Check buffer is full.
  243. if ($connection->maxSendBufferSize <= strlen($connection->tmpWebsocketData)) {
  244. if ($connection->onBufferFull) {
  245. try {
  246. call_user_func($connection->onBufferFull, $connection);
  247. } catch (\Exception $e) {
  248. Worker::log($e);
  249. exit(250);
  250. } catch (\Error $e) {
  251. Worker::log($e);
  252. exit(250);
  253. }
  254. }
  255. }
  256. return '';
  257. }
  258. return $frame;
  259. }
  260. /**
  261. * Websocket decode.
  262. *
  263. * @param string $buffer
  264. * @param ConnectionInterface $connection
  265. * @return string
  266. */
  267. public static function decode($bytes, $connection)
  268. {
  269. $masked = $bytes[1] >> 7;
  270. $data_length = $masked ? ord($bytes[1]) & 127 : ord($bytes[1]);
  271. $decoded_data = '';
  272. if ($masked === true) {
  273. if ($data_length === 126) {
  274. $mask = substr($bytes, 4, 4);
  275. $coded_data = substr($bytes, 8);
  276. } else if ($data_length === 127) {
  277. $mask = substr($bytes, 10, 4);
  278. $coded_data = substr($bytes, 14);
  279. } else {
  280. $mask = substr($bytes, 2, 4);
  281. $coded_data = substr($bytes, 6);
  282. }
  283. for ($i = 0; $i < strlen($coded_data); $i++) {
  284. $decoded_data .= $coded_data[$i] ^ $mask[$i % 4];
  285. }
  286. } else {
  287. if ($data_length === 126) {
  288. $decoded_data = substr($bytes, 4);
  289. } else if ($data_length === 127) {
  290. $decoded_data = substr($bytes, 10);
  291. } else {
  292. $decoded_data = substr($bytes, 2);
  293. }
  294. }
  295. if ($connection->websocketCurrentFrameLength) {
  296. $connection->websocketDataBuffer .= $decoded_data;
  297. return $connection->websocketDataBuffer;
  298. } else {
  299. if ($connection->websocketDataBuffer !== '') {
  300. $decoded_data = $connection->websocketDataBuffer . $decoded_data;
  301. $connection->websocketDataBuffer = '';
  302. }
  303. return $decoded_data;
  304. }
  305. }
  306. /**
  307. * Send websocket handshake data.
  308. *
  309. * @return void
  310. */
  311. public static function onConnect($connection)
  312. {
  313. self::sendHandshake($connection);
  314. }
  315. /**
  316. * Clean
  317. *
  318. * @param $connection
  319. */
  320. public static function onClose($connection)
  321. {
  322. $connection->handshakeStep = null;
  323. $connection->websocketCurrentFrameLength = 0;
  324. $connection->tmpWebsocketData = '';
  325. $connection->websocketDataBuffer = '';
  326. if (!empty($connection->websocketPingTimer)) {
  327. Timer::del($connection->websocketPingTimer);
  328. $connection->websocketPingTimer = null;
  329. }
  330. }
  331. /**
  332. * Send websocket handshake.
  333. *
  334. * @param \Workerman\Connection\TcpConnection $connection
  335. * @return void
  336. */
  337. public static function sendHandshake($connection)
  338. {
  339. if (!empty($connection->handshakeStep)) {
  340. return;
  341. }
  342. // Get Host.
  343. $port = $connection->getRemotePort();
  344. $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
  345. // Handshake header.
  346. $header = 'GET ' . $connection->getRemoteURI() . " HTTP/1.1\r\n".
  347. "Host: $host\r\n".
  348. "Connection: Upgrade\r\n".
  349. "Upgrade: websocket\r\n".
  350. "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
  351. "Sec-WebSocket-Version: 13\r\n".
  352. "Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
  353. $connection->send($header, true);
  354. $connection->handshakeStep = 1;
  355. $connection->websocketCurrentFrameLength = 0;
  356. $connection->websocketDataBuffer = '';
  357. $connection->tmpWebsocketData = '';
  358. }
  359. /**
  360. * Websocket handshake.
  361. *
  362. * @param string $buffer
  363. * @param \Workerman\Connection\TcpConnection $connection
  364. * @return int
  365. */
  366. public static function dealHandshake($buffer, $connection)
  367. {
  368. $pos = strpos($buffer, "\r\n\r\n");
  369. if ($pos) {
  370. // handshake complete
  371. $connection->handshakeStep = 2;
  372. $handshake_response_length = $pos + 4;
  373. // Try to emit onWebSocketConnect callback.
  374. if (isset($connection->onWebSocketConnect)) {
  375. try {
  376. call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_response_length));
  377. } catch (\Exception $e) {
  378. Worker::log($e);
  379. exit(250);
  380. } catch (\Error $e) {
  381. Worker::log($e);
  382. exit(250);
  383. }
  384. }
  385. // Headbeat.
  386. if (!empty($connection->websocketPingInterval)) {
  387. $connection->websocketPingTimer = Timer::add($connection->websocketPingInterval, function() use ($connection){
  388. if (false === $connection->send(pack('H*', '8900'), true)) {
  389. Timer::del($connection->websocketPingTimer);
  390. $connection->websocketPingTimer = null;
  391. }
  392. });
  393. }
  394. $connection->consumeRecvBuffer($handshake_response_length);
  395. if (!empty($connection->tmpWebsocketData)) {
  396. $connection->send($connection->tmpWebsocketData, true);
  397. $connection->tmpWebsocketData = '';
  398. }
  399. if (strlen($buffer) > $handshake_response_length) {
  400. return self::input(substr($buffer, $handshake_response_length), $connection);
  401. }
  402. }
  403. return 0;
  404. }
  405. }