Ws.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. <?php
  2. namespace Workerman\Protocols;
  3. /**
  4. * Websocket protocol for client.
  5. */
  6. class Ws
  7. {
  8. /**
  9. * Minimum head length of websocket protocol.
  10. *
  11. * @var int
  12. */
  13. const MIN_HEAD_LEN = 2;
  14. /**
  15. * Websocket blob type.
  16. *
  17. * @var string
  18. */
  19. const BINARY_TYPE_BLOB = "\x81";
  20. /**
  21. * Websocket arraybuffer type.
  22. *
  23. * @var string
  24. */
  25. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  26. /**
  27. * Check the integrity of the package.
  28. *
  29. * @param string $buffer
  30. * @param ConnectionInterface $connection
  31. * @return int
  32. */
  33. public static function input($buffer, $connection)
  34. {
  35. if (empty($connection->handshakeStep)) {
  36. echo "recv data before handshake\n";
  37. return false;
  38. }
  39. // Recv handshake response
  40. if ($connection->handshakeStep === 1) {
  41. return self::dealHandshake($buffer, $connection);
  42. }
  43. $recv_len = strlen($buffer);
  44. if ($recv_len < self::MIN_HEAD_LEN) {
  45. return 0;
  46. }
  47. // Buffer websocket frame data.
  48. if ($connection->websocketCurrentFrameLength) {
  49. // We need more frame data.
  50. if ($connection->websocketCurrentFrameLength > $recv_len) {
  51. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  52. return 0;
  53. }
  54. } else {
  55. $data_len = ord($buffer[1]) & 127;
  56. $firstbyte = ord($buffer[0]);
  57. $is_fin_frame = $firstbyte >> 7;
  58. $opcode = $firstbyte & 0xf;
  59. switch ($opcode) {
  60. case 0x0:
  61. break;
  62. // Blob type.
  63. case 0x1:
  64. break;
  65. // Arraybuffer type.
  66. case 0x2:
  67. break;
  68. // Close package.
  69. case 0x8:
  70. // Try to emit onWebSocketClose callback.
  71. if (isset($connection->onWebSocketClose)) {
  72. try {
  73. call_user_func($connection->onWebSocketClose, $connection);
  74. } catch (\Exception $e) {
  75. echo $e;
  76. exit(250);
  77. } catch (\Error $e) {
  78. echo $e;
  79. exit(250);
  80. }
  81. } // Close connection.
  82. else {
  83. $connection->close();
  84. }
  85. return 0;
  86. // Ping package.
  87. case 0x9:
  88. // Try to emit onWebSocketPing callback.
  89. if (isset($connection->onWebSocketPing)) {
  90. try {
  91. call_user_func($connection->onWebSocketPing, $connection);
  92. } catch (\Exception $e) {
  93. echo $e;
  94. exit(250);
  95. } catch (\Error $e) {
  96. echo $e;
  97. exit(250);
  98. }
  99. } // Send pong package to client.
  100. else {
  101. $connection->send(pack('H*', '8a00'), true);
  102. }
  103. // Consume data from receive buffer.
  104. if (!$data_len) {
  105. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  106. if ($recv_len > self::MIN_HEAD_LEN) {
  107. return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
  108. }
  109. return 0;
  110. }
  111. break;
  112. // Pong package.
  113. case 0xa:
  114. // Try to emit onWebSocketPong callback.
  115. if (isset($connection->onWebSocketPong)) {
  116. try {
  117. call_user_func($connection->onWebSocketPong, $connection);
  118. } catch (\Exception $e) {
  119. echo $e;
  120. exit(250);
  121. } catch (\Error $e) {
  122. echo $e;
  123. exit(250);
  124. }
  125. }
  126. // Consume data from receive buffer.
  127. if (!$data_len) {
  128. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  129. if ($recv_len > self::MIN_HEAD_LEN) {
  130. return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
  131. }
  132. return 0;
  133. }
  134. break;
  135. // Wrong opcode.
  136. default :
  137. echo "error opcode $opcode and close websocket connection\n";
  138. $connection->close();
  139. return 0;
  140. }
  141. // Calculate packet length.
  142. if ($data_len === 126) {
  143. if (strlen($buffer) < 6) {
  144. return 0;
  145. }
  146. $pack = unpack('nn/ntotal_len', $buffer);
  147. $current_frame_length = $pack['total_len'] + 4;
  148. } else if ($data_len === 127) {
  149. if (strlen($buffer) < 10) {
  150. return 0;
  151. }
  152. $arr = unpack('n/N2c', $buffer);
  153. $current_frame_length = $arr['c1']*4294967296 + $arr['c2'] + 10;
  154. } else {
  155. $current_frame_length = $data_len + 2;
  156. }
  157. if ($is_fin_frame) {
  158. return $current_frame_length;
  159. } else {
  160. $connection->websocketCurrentFrameLength = $current_frame_length;
  161. }
  162. }
  163. // Received just a frame length data.
  164. if ($connection->websocketCurrentFrameLength === $recv_len) {
  165. self::decode($buffer, $connection);
  166. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  167. $connection->websocketCurrentFrameLength = 0;
  168. return 0;
  169. } // The length of the received data is greater than the length of a frame.
  170. elseif ($connection->websocketCurrentFrameLength < $recv_len) {
  171. self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
  172. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  173. $current_frame_length = $connection->websocketCurrentFrameLength;
  174. $connection->websocketCurrentFrameLength = 0;
  175. // Continue to read next frame.
  176. return self::input(substr($buffer, $current_frame_length), $connection);
  177. } // The length of the received data is less than the length of a frame.
  178. else {
  179. return 0;
  180. }
  181. }
  182. /**
  183. * Websocket encode.
  184. *
  185. * @param string $buffer
  186. * @param ConnectionInterface $connection
  187. * @return string
  188. */
  189. public static function encode($payload, $connection)
  190. {
  191. $payload = (string)$payload;
  192. if (empty($connection->handshakeStep)) {
  193. self::sendHandshake($connection);
  194. }
  195. $mask = 1;
  196. $mask_key = "\x00\x00\x00\x00";
  197. $pack = '';
  198. $length = $length_flag = strlen($payload);
  199. if (65535 < $length) {
  200. $pack = pack('NN', ($length & 0xFFFFFFFF00000000) >> 32, $length & 0x00000000FFFFFFFF);
  201. $length_flag = 127;
  202. } else if (125 < $length) {
  203. $pack = pack('n*', $length);
  204. $length_flag = 126;
  205. }
  206. $head = ($mask << 7) | $length_flag;
  207. $head = $connection->websocketType . chr($head) . $pack;
  208. $frame = $head . $mask_key;
  209. // append payload to frame:
  210. for ($i = 0; $i < $length; $i++) {
  211. $frame .= $payload[$i] ^ $mask_key[$i % 4];
  212. }
  213. if ($connection->handshakeStep === 1) {
  214. $connection->tmpWebsocketData = isset($connection->tmpWebsocketData) ? $connection->tmpWebsocketData . $frame : $frame;
  215. return '';
  216. }
  217. return $frame;
  218. }
  219. /**
  220. * Websocket decode.
  221. *
  222. * @param string $buffer
  223. * @param ConnectionInterface $connection
  224. * @return string
  225. */
  226. public static function decode($bytes, $connection)
  227. {
  228. $masked = $bytes[1] >> 7;
  229. $data_length = $masked ? ord($bytes[1]) & 127 : ord($bytes[1]);
  230. $decoded_data = '';
  231. if ($masked === true) {
  232. if ($data_length === 126) {
  233. $mask = substr($bytes, 4, 4);
  234. $coded_data = substr($bytes, 8);
  235. } else if ($data_length === 127) {
  236. $mask = substr($bytes, 10, 4);
  237. $coded_data = substr($bytes, 14);
  238. } else {
  239. $mask = substr($bytes, 2, 4);
  240. $coded_data = substr($bytes, 6);
  241. }
  242. for ($i = 0; $i < strlen($coded_data); $i++) {
  243. $decoded_data .= $coded_data[$i] ^ $mask[$i % 4];
  244. }
  245. } else {
  246. if ($data_length === 126) {
  247. $decoded_data = substr($bytes, 4);
  248. } else if ($data_length === 127) {
  249. $decoded_data = substr($bytes, 10);
  250. } else {
  251. $decoded_data = substr($bytes, 2);
  252. }
  253. }
  254. if ($connection->websocketCurrentFrameLength) {
  255. $connection->websocketDataBuffer .= $decoded_data;
  256. return $connection->websocketDataBuffer;
  257. } else {
  258. if ($connection->websocketDataBuffer !== '') {
  259. $decoded_data = $connection->websocketDataBuffer . $decoded_data;
  260. $connection->websocketDataBuffer = '';
  261. }
  262. return $decoded_data;
  263. }
  264. }
  265. /**
  266. * Send websocket handshake.
  267. *
  268. * @param \Workerman\Connection\TcpConnection $connection
  269. * @return void
  270. */
  271. public static function sendHandshake($connection)
  272. {
  273. // Get Host.
  274. $port = $connection->getRemotePort();
  275. $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
  276. // Handshake header.
  277. $header = "GET / HTTP/1.1\r\n".
  278. "Host: $host\r\n".
  279. "Connection: Upgrade\r\n".
  280. "Upgrade: websocket\r\n".
  281. "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
  282. "Sec-WebSocket-Version: 13\r\n".
  283. "Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
  284. $connection->send($header, true);
  285. $connection->handshakeStep = 1;
  286. $connection->websocketCurrentFrameLength = 0;
  287. $connection->websocketDataBuffer = '';
  288. if (empty($connection->websocketType)) {
  289. $connection->websocketType = self::BINARY_TYPE_BLOB;
  290. }
  291. }
  292. /**
  293. * Websocket handshake.
  294. *
  295. * @param string $buffer
  296. * @param \Workerman\Connection\TcpConnection $connection
  297. * @return int
  298. */
  299. public static function dealHandshake($buffer, $connection)
  300. {
  301. $pos = strpos($buffer, "\r\n\r\n");
  302. if ($pos) {
  303. // handshake complete
  304. $connection->handshakeStep = 2;
  305. $handshake_respnse_length = $pos + 4;
  306. // Try to emit onWebSocketConnect callback.
  307. if (isset($connection->onWebSocketConnect)) {
  308. try {
  309. call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_respnse_length));
  310. } catch (\Exception $e) {
  311. echo $e;
  312. exit(250);
  313. } catch (\Error $e) {
  314. echo $e;
  315. exit(250);
  316. }
  317. }
  318. // Headbeat.
  319. if (!empty($connection->websocketPingInterval)) {
  320. $connection->websocketPingTimer = \Workerman\Lib\Timer::add($connection->websocketPingInterval, function() use ($connection){
  321. if (false === $connection->send(pack('H*', '8900'), true)) {
  322. \Workerman\Lib\Timer::del($connection->websocketPingTimer);
  323. }
  324. });
  325. }
  326. $connection->consumeRecvBuffer($handshake_respnse_length);
  327. if (!empty($connection->tmpWebsocketData)) {
  328. $connection->send($connection->tmpWebsocketData, true);
  329. }
  330. if (strlen($buffer > $handshake_respnse_length)) {
  331. return self::input(substr($buffer, $handshake_respnse_length));
  332. }
  333. }
  334. return 0;
  335. }
  336. }