Ws.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. <?php
  2. namespace Workerman\Protocols;
  3. use Workerman\Worker;
  4. use Workerman\Lib\Timer;
  5. /**
  6. * Websocket protocol for client.
  7. */
  8. class Ws
  9. {
  10. /**
  11. * Minimum head length of websocket protocol.
  12. *
  13. * @var int
  14. */
  15. const MIN_HEAD_LEN = 2;
  16. /**
  17. * Websocket blob type.
  18. *
  19. * @var string
  20. */
  21. const BINARY_TYPE_BLOB = "\x81";
  22. /**
  23. * Websocket arraybuffer type.
  24. *
  25. * @var string
  26. */
  27. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  28. /**
  29. * Check the integrity of the package.
  30. *
  31. * @param string $buffer
  32. * @param ConnectionInterface $connection
  33. * @return int
  34. */
  35. public static function input($buffer, $connection)
  36. {
  37. if (empty($connection->handshakeStep)) {
  38. echo "recv data before handshake. Buffer:" . bin2hex($buffer) . "\n";
  39. return false;
  40. }
  41. // Recv handshake response
  42. if ($connection->handshakeStep === 1) {
  43. return self::dealHandshake($buffer, $connection);
  44. }
  45. $recv_len = strlen($buffer);
  46. if ($recv_len < self::MIN_HEAD_LEN) {
  47. return 0;
  48. }
  49. // Buffer websocket frame data.
  50. if ($connection->websocketCurrentFrameLength) {
  51. // We need more frame data.
  52. if ($connection->websocketCurrentFrameLength > $recv_len) {
  53. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  54. return 0;
  55. }
  56. } else {
  57. $data_len = ord($buffer[1]) & 127;
  58. $firstbyte = ord($buffer[0]);
  59. $is_fin_frame = $firstbyte >> 7;
  60. $opcode = $firstbyte & 0xf;
  61. switch ($opcode) {
  62. case 0x0:
  63. break;
  64. // Blob type.
  65. case 0x1:
  66. break;
  67. // Arraybuffer type.
  68. case 0x2:
  69. break;
  70. // Close package.
  71. case 0x8:
  72. // Try to emit onWebSocketClose callback.
  73. if (isset($connection->onWebSocketClose)) {
  74. try {
  75. call_user_func($connection->onWebSocketClose, $connection);
  76. } catch (\Exception $e) {
  77. Worker::log($e);
  78. exit(250);
  79. } catch (\Error $e) {
  80. Worker::log($e);
  81. exit(250);
  82. }
  83. } // Close connection.
  84. else {
  85. $connection->close();
  86. }
  87. return 0;
  88. // Ping package.
  89. case 0x9:
  90. // Try to emit onWebSocketPing callback.
  91. if (isset($connection->onWebSocketPing)) {
  92. try {
  93. call_user_func($connection->onWebSocketPing, $connection);
  94. } catch (\Exception $e) {
  95. Worker::log($e);
  96. exit(250);
  97. } catch (\Error $e) {
  98. Worker::log($e);
  99. exit(250);
  100. }
  101. } // Send pong package to client.
  102. else {
  103. $connection->send(pack('H*', '8a00'), true);
  104. }
  105. // Consume data from receive buffer.
  106. if (!$data_len) {
  107. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  108. if ($recv_len > self::MIN_HEAD_LEN) {
  109. return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
  110. }
  111. return 0;
  112. }
  113. break;
  114. // Pong package.
  115. case 0xa:
  116. // Try to emit onWebSocketPong callback.
  117. if (isset($connection->onWebSocketPong)) {
  118. try {
  119. call_user_func($connection->onWebSocketPong, $connection);
  120. } catch (\Exception $e) {
  121. Worker::log($e);
  122. exit(250);
  123. } catch (\Error $e) {
  124. Worker::log($e);
  125. exit(250);
  126. }
  127. }
  128. // Consume data from receive buffer.
  129. if (!$data_len) {
  130. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  131. if ($recv_len > self::MIN_HEAD_LEN) {
  132. return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
  133. }
  134. return 0;
  135. }
  136. break;
  137. // Wrong opcode.
  138. default :
  139. echo "error opcode $opcode and close websocket connection. Buffer:" . $buffer . "\n";
  140. $connection->close();
  141. return 0;
  142. }
  143. // Calculate packet length.
  144. if ($data_len === 126) {
  145. if (strlen($buffer) < 6) {
  146. return 0;
  147. }
  148. $pack = unpack('nn/ntotal_len', $buffer);
  149. $current_frame_length = $pack['total_len'] + 4;
  150. } else if ($data_len === 127) {
  151. if (strlen($buffer) < 10) {
  152. return 0;
  153. }
  154. $arr = unpack('n/N2c', $buffer);
  155. $current_frame_length = $arr['c1']*4294967296 + $arr['c2'] + 10;
  156. } else {
  157. $current_frame_length = $data_len + 2;
  158. }
  159. if ($is_fin_frame) {
  160. return $current_frame_length;
  161. } else {
  162. $connection->websocketCurrentFrameLength = $current_frame_length;
  163. }
  164. }
  165. // Received just a frame length data.
  166. if ($connection->websocketCurrentFrameLength === $recv_len) {
  167. self::decode($buffer, $connection);
  168. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  169. $connection->websocketCurrentFrameLength = 0;
  170. return 0;
  171. } // The length of the received data is greater than the length of a frame.
  172. elseif ($connection->websocketCurrentFrameLength < $recv_len) {
  173. self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
  174. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  175. $current_frame_length = $connection->websocketCurrentFrameLength;
  176. $connection->websocketCurrentFrameLength = 0;
  177. // Continue to read next frame.
  178. return self::input(substr($buffer, $current_frame_length), $connection);
  179. } // The length of the received data is less than the length of a frame.
  180. else {
  181. return 0;
  182. }
  183. }
  184. /**
  185. * Websocket encode.
  186. *
  187. * @param string $buffer
  188. * @param ConnectionInterface $connection
  189. * @return string
  190. */
  191. public static function encode($payload, $connection)
  192. {
  193. if (empty($connection->websocketType)) {
  194. $connection->websocketType = self::BINARY_TYPE_BLOB;
  195. }
  196. $payload = (string)$payload;
  197. if (empty($connection->handshakeStep)) {
  198. self::sendHandshake($connection);
  199. }
  200. $mask = 1;
  201. $mask_key = "\x00\x00\x00\x00";
  202. $pack = '';
  203. $length = $length_flag = strlen($payload);
  204. if (65535 < $length) {
  205. $pack = pack('NN', ($length & 0xFFFFFFFF00000000) >> 32, $length & 0x00000000FFFFFFFF);
  206. $length_flag = 127;
  207. } else if (125 < $length) {
  208. $pack = pack('n*', $length);
  209. $length_flag = 126;
  210. }
  211. $head = ($mask << 7) | $length_flag;
  212. $head = $connection->websocketType . chr($head) . $pack;
  213. $frame = $head . $mask_key;
  214. // append payload to frame:
  215. for ($i = 0; $i < $length; $i++) {
  216. $frame .= $payload[$i] ^ $mask_key[$i % 4];
  217. }
  218. if ($connection->handshakeStep === 1) {
  219. $connection->tmpWebsocketData = isset($connection->tmpWebsocketData) ? $connection->tmpWebsocketData . $frame : $frame;
  220. return '';
  221. }
  222. return $frame;
  223. }
  224. /**
  225. * Websocket decode.
  226. *
  227. * @param string $buffer
  228. * @param ConnectionInterface $connection
  229. * @return string
  230. */
  231. public static function decode($bytes, $connection)
  232. {
  233. $masked = $bytes[1] >> 7;
  234. $data_length = $masked ? ord($bytes[1]) & 127 : ord($bytes[1]);
  235. $decoded_data = '';
  236. if ($masked === true) {
  237. if ($data_length === 126) {
  238. $mask = substr($bytes, 4, 4);
  239. $coded_data = substr($bytes, 8);
  240. } else if ($data_length === 127) {
  241. $mask = substr($bytes, 10, 4);
  242. $coded_data = substr($bytes, 14);
  243. } else {
  244. $mask = substr($bytes, 2, 4);
  245. $coded_data = substr($bytes, 6);
  246. }
  247. for ($i = 0; $i < strlen($coded_data); $i++) {
  248. $decoded_data .= $coded_data[$i] ^ $mask[$i % 4];
  249. }
  250. } else {
  251. if ($data_length === 126) {
  252. $decoded_data = substr($bytes, 4);
  253. } else if ($data_length === 127) {
  254. $decoded_data = substr($bytes, 10);
  255. } else {
  256. $decoded_data = substr($bytes, 2);
  257. }
  258. }
  259. if ($connection->websocketCurrentFrameLength) {
  260. $connection->websocketDataBuffer .= $decoded_data;
  261. return $connection->websocketDataBuffer;
  262. } else {
  263. if ($connection->websocketDataBuffer !== '') {
  264. $decoded_data = $connection->websocketDataBuffer . $decoded_data;
  265. $connection->websocketDataBuffer = '';
  266. }
  267. return $decoded_data;
  268. }
  269. }
  270. /**
  271. * Send websocket handshake data.
  272. *
  273. * @return void
  274. */
  275. public static function onConnect($connection)
  276. {
  277. self::sendHandshake($connection);
  278. }
  279. /**
  280. * Clean
  281. *
  282. * @param $connection
  283. */
  284. public static function onClose($connection)
  285. {
  286. $connection->handshakeStep = null;
  287. $connection->websocketCurrentFrameLength = 0;
  288. $connection->tmpWebsocketData = '';
  289. $connection->websocketDataBuffer = '';
  290. if (!empty($connection->websocketPingTimer)) {
  291. Timer::del($connection->websocketPingTimer);
  292. $connection->websocketPingTimer = null;
  293. }
  294. }
  295. /**
  296. * Send websocket handshake.
  297. *
  298. * @param \Workerman\Connection\TcpConnection $connection
  299. * @return void
  300. */
  301. public static function sendHandshake($connection)
  302. {
  303. if (!empty($connection->handshakeStep)) {
  304. return;
  305. }
  306. // Get Host.
  307. $port = $connection->getRemotePort();
  308. $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
  309. // Handshake header.
  310. $header = "GET / HTTP/1.1\r\n".
  311. "Host: $host\r\n".
  312. "Connection: Upgrade\r\n".
  313. "Upgrade: websocket\r\n".
  314. "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
  315. "Sec-WebSocket-Version: 13\r\n".
  316. "Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
  317. $connection->send($header, true);
  318. $connection->handshakeStep = 1;
  319. $connection->websocketCurrentFrameLength = 0;
  320. $connection->websocketDataBuffer = '';
  321. }
  322. /**
  323. * Websocket handshake.
  324. *
  325. * @param string $buffer
  326. * @param \Workerman\Connection\TcpConnection $connection
  327. * @return int
  328. */
  329. public static function dealHandshake($buffer, $connection)
  330. {
  331. $pos = strpos($buffer, "\r\n\r\n");
  332. if ($pos) {
  333. // handshake complete
  334. $connection->handshakeStep = 2;
  335. $handshake_response_length = $pos + 4;
  336. // Try to emit onWebSocketConnect callback.
  337. if (isset($connection->onWebSocketConnect)) {
  338. try {
  339. call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_response_length));
  340. } catch (\Exception $e) {
  341. Worker::log($e);
  342. exit(250);
  343. } catch (\Error $e) {
  344. Worker::log($e);
  345. exit(250);
  346. }
  347. }
  348. // Headbeat.
  349. if (!empty($connection->websocketPingInterval)) {
  350. $connection->websocketPingTimer = Timer::add($connection->websocketPingInterval, function() use ($connection){
  351. if (false === $connection->send(pack('H*', '8900'), true)) {
  352. Timer::del($connection->websocketPingTimer);
  353. $connection->websocketPingTimer = null;
  354. }
  355. });
  356. }
  357. $connection->consumeRecvBuffer($handshake_response_length);
  358. if (!empty($connection->tmpWebsocketData)) {
  359. $connection->send($connection->tmpWebsocketData, true);
  360. $connection->tmpWebsocketData = '';
  361. }
  362. if (strlen($buffer > $handshake_response_length)) {
  363. return self::input(substr($buffer, $handshake_response_length), $connection);
  364. }
  365. }
  366. return 0;
  367. }
  368. }