Ws.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. <?php
  2. namespace Workerman\Protocols;
  3. /**
  4. * Websocket protocol for client.
  5. */
  6. class Ws
  7. {
  8. /**
  9. * Minimum head length of websocket protocol.
  10. *
  11. * @var int
  12. */
  13. const MIN_HEAD_LEN = 2;
  14. /**
  15. * Websocket blob type.
  16. *
  17. * @var string
  18. */
  19. const BINARY_TYPE_BLOB = "\x81";
  20. /**
  21. * Websocket arraybuffer type.
  22. *
  23. * @var string
  24. */
  25. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  26. /**
  27. * Check the integrity of the package.
  28. *
  29. * @param string $buffer
  30. * @param ConnectionInterface $connection
  31. * @return int
  32. */
  33. public static function input($buffer, $connection)
  34. {
  35. if (empty($connection->handshakeStep)) {
  36. echo "recv data before handshake\n";
  37. return false;
  38. }
  39. // Recv handshake response
  40. if ($connection->handshakeStep === 1) {
  41. return self::dealHandshake($buffer, $connection);
  42. }
  43. $recv_len = strlen($buffer);
  44. if ($recv_len < self::MIN_HEAD_LEN) {
  45. return 0;
  46. }
  47. // Buffer websocket frame data.
  48. if ($connection->websocketCurrentFrameLength) {
  49. // We need more frame data.
  50. if ($connection->websocketCurrentFrameLength > $recv_len) {
  51. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  52. return 0;
  53. }
  54. } else {
  55. $data_len = ord($buffer[1]) & 127;
  56. $firstbyte = ord($buffer[0]);
  57. $is_fin_frame = $firstbyte >> 7;
  58. $opcode = $firstbyte & 0xf;
  59. switch ($opcode) {
  60. case 0x0:
  61. break;
  62. // Blob type.
  63. case 0x1:
  64. break;
  65. // Arraybuffer type.
  66. case 0x2:
  67. break;
  68. // Close package.
  69. case 0x8:
  70. // Try to emit onWebSocketClose callback.
  71. if (isset($connection->onWebSocketClose)) {
  72. try {
  73. call_user_func($connection->onWebSocketClose, $connection);
  74. } catch (\Exception $e) {
  75. echo $e;
  76. exit(250);
  77. }
  78. } // Close connection.
  79. else {
  80. $connection->close();
  81. }
  82. return 0;
  83. // Ping package.
  84. case 0x9:
  85. // Try to emit onWebSocketPing callback.
  86. if (isset($connection->onWebSocketPing)) {
  87. try {
  88. call_user_func($connection->onWebSocketPing, $connection);
  89. } catch (\Exception $e) {
  90. echo $e;
  91. exit(250);
  92. }
  93. } // Send pong package to client.
  94. else {
  95. $connection->send(pack('H*', '8a00'), true);
  96. }
  97. // Consume data from receive buffer.
  98. if (!$data_len) {
  99. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  100. return 0;
  101. }
  102. break;
  103. // Pong package.
  104. case 0xa:
  105. // Try to emit onWebSocketPong callback.
  106. if (isset($connection->onWebSocketPong)) {
  107. try {
  108. call_user_func($connection->onWebSocketPong, $connection);
  109. } catch (\Exception $e) {
  110. echo $e;
  111. exit(250);
  112. }
  113. }
  114. // Consume data from receive buffer.
  115. if (!$data_len) {
  116. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  117. return 0;
  118. }
  119. break;
  120. // Wrong opcode.
  121. default :
  122. echo "error opcode $opcode and close websocket connection\n";
  123. $connection->close();
  124. return 0;
  125. }
  126. // Calculate packet length.
  127. if ($data_len === 126) {
  128. if (strlen($buffer) < 6) {
  129. return 0;
  130. }
  131. $pack = unpack('nn/ntotal_len', $buffer);
  132. $current_frame_length = $pack['total_len'] + 4;
  133. } else if ($data_len === 127) {
  134. if (strlen($buffer) < 10) {
  135. return 0;
  136. }
  137. $arr = unpack('n/N2c', $buffer);
  138. $current_frame_length = $arr['c1']*4294967296 + $arr['c2'] + 10;
  139. } else {
  140. $current_frame_length = $data_len + 2;
  141. }
  142. if ($is_fin_frame) {
  143. return $current_frame_length;
  144. } else {
  145. $connection->websocketCurrentFrameLength = $current_frame_length;
  146. }
  147. }
  148. // Received just a frame length data.
  149. if ($connection->websocketCurrentFrameLength === $recv_len) {
  150. self::decode($buffer, $connection);
  151. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  152. $connection->websocketCurrentFrameLength = 0;
  153. return 0;
  154. } // The length of the received data is greater than the length of a frame.
  155. elseif ($connection->websocketCurrentFrameLength < $recv_len) {
  156. self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
  157. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  158. $current_frame_length = $connection->websocketCurrentFrameLength;
  159. $connection->websocketCurrentFrameLength = 0;
  160. // Continue to read next frame.
  161. return self::input(substr($buffer, $current_frame_length), $connection);
  162. } // The length of the received data is less than the length of a frame.
  163. else {
  164. return 0;
  165. }
  166. }
  167. /**
  168. * Websocket encode.
  169. *
  170. * @param string $buffer
  171. * @param ConnectionInterface $connection
  172. * @return string
  173. */
  174. public static function encode($payload, $connection)
  175. {
  176. if (empty($connection->handshakeStep)) {
  177. self::sendHandshake($connection);
  178. }
  179. $mask = 1;
  180. $mask_key = "\x00\x00\x00\x00";
  181. $pack = '';
  182. $length = $length_flag = strlen($payload);
  183. if (65535 < $length) {
  184. $pack = pack('NN', ($length & 0xFFFFFFFF00000000) >> 0b100000, $length & 0x00000000FFFFFFFF);
  185. $length_flag = 127;
  186. } else if (125 < $length) {
  187. $pack = pack('n*', $length);
  188. $length_flag = 126;
  189. }
  190. $head = ($mask << 7) | $length_flag;
  191. $head = $connection->websocketType . chr($head) . $pack;
  192. $frame = $head . $mask_key;
  193. // append payload to frame:
  194. for ($i = 0; $i < $length; $i++) {
  195. $frame .= $payload[$i] ^ $mask_key[$i % 4];
  196. }
  197. if ($connection->handshakeStep === 1) {
  198. $connection->tmpWebsocketData = isset($connection->tmpWebsocketData) ? $connection->tmpWebsocketData . $frame : $frame;
  199. return '';
  200. }
  201. return $frame;
  202. }
  203. /**
  204. * Websocket decode.
  205. *
  206. * @param string $buffer
  207. * @param ConnectionInterface $connection
  208. * @return string
  209. */
  210. public static function decode($bytes, $connection)
  211. {
  212. $masked = $bytes[1] >> 7;
  213. $data_length = $masked ? ord($bytes[1]) & 127 : ord($bytes[1]);
  214. $decoded_data = '';
  215. if ($masked === true) {
  216. if ($data_length === 126) {
  217. $mask = substr($bytes, 4, 4);
  218. $coded_data = substr($bytes, 8);
  219. } else if ($data_length === 127) {
  220. $mask = substr($bytes, 10, 4);
  221. $coded_data = substr($bytes, 14);
  222. } else {
  223. $mask = substr($bytes, 2, 4);
  224. $coded_data = substr($bytes, 6);
  225. }
  226. for ($i = 0; $i < strlen($coded_data); $i++) {
  227. $decoded_data .= $coded_data[$i] ^ $mask[$i % 4];
  228. }
  229. } else {
  230. if ($data_length === 126) {
  231. $decoded_data = substr($bytes, 4);
  232. } else if ($data_length === 127) {
  233. $decoded_data = substr($bytes, 10);
  234. } else {
  235. $decoded_data = substr($bytes, 2);
  236. }
  237. }
  238. if ($connection->websocketCurrentFrameLength) {
  239. $connection->websocketDataBuffer .= $decoded_data;
  240. return $connection->websocketDataBuffer;
  241. } else {
  242. if ($connection->websocketDataBuffer !== '') {
  243. $decoded_data = $connection->websocketDataBuffer . $decoded_data;
  244. $connection->websocketDataBuffer = '';
  245. }
  246. return $decoded_data;
  247. }
  248. }
  249. /**
  250. * Send websocket handshake.
  251. *
  252. * @param \Workerman\Connection\TcpConnection $connection
  253. * @return void
  254. */
  255. public static function sendHandshake($connection)
  256. {
  257. // Get Host.
  258. $port = $connection->getRemotePort();
  259. $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
  260. // Handshake header.
  261. $header = "GET / HTTP/1.1\r\n".
  262. "Host: $host\r\n".
  263. "Connection: Upgrade\r\n".
  264. "Upgrade: websocket\r\n".
  265. "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
  266. "Sec-WebSocket-Version: 13\r\n".
  267. "Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
  268. $connection->send($header, true);
  269. $connection->handshakeStep = 1;
  270. $connection->websocketCurrentFrameLength = 0;
  271. $connection->websocketDataBuffer = '';
  272. if (empty($connection->websocketType)) {
  273. $connection->websocketType = self::BINARY_TYPE_BLOB;
  274. }
  275. }
  276. /**
  277. * Websocket handshake.
  278. *
  279. * @param string $buffer
  280. * @param \Workerman\Connection\TcpConnection $connection
  281. * @return int
  282. */
  283. public static function dealHandshake($buffer, $connection)
  284. {
  285. $pos = strpos($buffer, "\r\n\r\n");
  286. if ($pos) {
  287. // handshake complete
  288. $connection->handshakeStep = 2;
  289. $handshake_respnse_length = $pos + 4;
  290. // Try to emit onWebSocketConnect callback.
  291. if (isset($connection->onWebSocketConnect)) {
  292. try {
  293. call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_respnse_length));
  294. } catch (\Exception $e) {
  295. echo $e;
  296. exit(250);
  297. }
  298. }
  299. // Headbeat.
  300. if (!empty($connection->websocketPingInterval)) {
  301. $connection->websocketPingTimer = \Workerman\Lib\Timer::add($connection->websocketPingInterval, function() use ($connection){
  302. if (false === $connection->send(pack('H*', '8900'), true)) {
  303. \Workerman\Lib\Timer::del($connection->websocketPingTimer);
  304. }
  305. });
  306. }
  307. $connection->consumeRecvBuffer($handshake_respnse_length);
  308. if (!empty($connection->tmpWebsocketData)) {
  309. $connection->send($connection->tmpWebsocketData, true);
  310. }
  311. if (strlen($buffer > $handshake_respnse_length)) {
  312. return self::input(substr($buffer, $handshake_respnse_length));
  313. }
  314. }
  315. return 0;
  316. }
  317. }