Ws.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. <?php
  2. namespace Workerman\Protocols;
  3. /**
  4. * Websocket protocol for client.
  5. */
  6. class Ws
  7. {
  8. /**
  9. * Minimum head length of websocket protocol.
  10. *
  11. * @var int
  12. */
  13. const MIN_HEAD_LEN = 2;
  14. /**
  15. * Websocket blob type.
  16. *
  17. * @var string
  18. */
  19. const BINARY_TYPE_BLOB = "\x81";
  20. /**
  21. * Websocket arraybuffer type.
  22. *
  23. * @var string
  24. */
  25. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  26. /**
  27. * Check the integrity of the package.
  28. *
  29. * @param string $buffer
  30. * @param ConnectionInterface $connection
  31. * @return int
  32. */
  33. public static function input($buffer, $connection)
  34. {
  35. if (empty($connection->handshakeStep)) {
  36. echo "recv data before handshake\n";
  37. return false;
  38. }
  39. // Recv handshake response
  40. if ($connection->handshakeStep === 1) {
  41. return self::dealHandshake($buffer, $connection);
  42. }
  43. $recv_len = strlen($buffer);
  44. if ($recv_len < self::MIN_HEAD_LEN) {
  45. return 0;
  46. }
  47. // Buffer websocket frame data.
  48. if ($connection->websocketCurrentFrameLength) {
  49. // We need more frame data.
  50. if ($connection->websocketCurrentFrameLength > $recv_len) {
  51. // Return 0, because it is not clear the full packet length, waiting for the frame of fin=1.
  52. return 0;
  53. }
  54. } else {
  55. $data_len = ord($buffer[1]) & 127;
  56. $firstbyte = ord($buffer[0]);
  57. $is_fin_frame = $firstbyte >> 7;
  58. $opcode = $firstbyte & 0xf;
  59. switch ($opcode) {
  60. case 0x0:
  61. break;
  62. // Blob type.
  63. case 0x1:
  64. break;
  65. // Arraybuffer type.
  66. case 0x2:
  67. break;
  68. // Close package.
  69. case 0x8:
  70. // Try to emit onWebSocketClose callback.
  71. if (isset($connection->onWebSocketClose)) {
  72. try {
  73. call_user_func($connection->onWebSocketClose, $connection);
  74. } catch (\Exception $e) {
  75. echo $e;
  76. exit(250);
  77. }
  78. } // Close connection.
  79. else {
  80. $connection->close();
  81. }
  82. return 0;
  83. // Ping package.
  84. case 0x9:
  85. // Try to emit onWebSocketPing callback.
  86. if (isset($connection->onWebSocketPing)) {
  87. try {
  88. call_user_func($connection->onWebSocketPing, $connection);
  89. } catch (\Exception $e) {
  90. echo $e;
  91. exit(250);
  92. }
  93. } // Send pong package to client.
  94. else {
  95. $connection->send(pack('H*', '8a00'), true);
  96. }
  97. // Consume data from receive buffer.
  98. if (!$data_len) {
  99. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  100. if ($recv_len > self::MIN_HEAD_LEN) {
  101. return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
  102. }
  103. return 0;
  104. }
  105. break;
  106. // Pong package.
  107. case 0xa:
  108. // Try to emit onWebSocketPong callback.
  109. if (isset($connection->onWebSocketPong)) {
  110. try {
  111. call_user_func($connection->onWebSocketPong, $connection);
  112. } catch (\Exception $e) {
  113. echo $e;
  114. exit(250);
  115. }
  116. }
  117. // Consume data from receive buffer.
  118. if (!$data_len) {
  119. $connection->consumeRecvBuffer(self::MIN_HEAD_LEN);
  120. if ($recv_len > self::MIN_HEAD_LEN) {
  121. return self::input(substr($buffer, self::MIN_HEAD_LEN), $connection);
  122. }
  123. return 0;
  124. }
  125. break;
  126. // Wrong opcode.
  127. default :
  128. echo "error opcode $opcode and close websocket connection\n";
  129. $connection->close();
  130. return 0;
  131. }
  132. // Calculate packet length.
  133. if ($data_len === 126) {
  134. if (strlen($buffer) < 6) {
  135. return 0;
  136. }
  137. $pack = unpack('nn/ntotal_len', $buffer);
  138. $current_frame_length = $pack['total_len'] + 4;
  139. } else if ($data_len === 127) {
  140. if (strlen($buffer) < 10) {
  141. return 0;
  142. }
  143. $arr = unpack('n/N2c', $buffer);
  144. $current_frame_length = $arr['c1']*4294967296 + $arr['c2'] + 10;
  145. } else {
  146. $current_frame_length = $data_len + 2;
  147. }
  148. if ($is_fin_frame) {
  149. return $current_frame_length;
  150. } else {
  151. $connection->websocketCurrentFrameLength = $current_frame_length;
  152. }
  153. }
  154. // Received just a frame length data.
  155. if ($connection->websocketCurrentFrameLength === $recv_len) {
  156. self::decode($buffer, $connection);
  157. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  158. $connection->websocketCurrentFrameLength = 0;
  159. return 0;
  160. } // The length of the received data is greater than the length of a frame.
  161. elseif ($connection->websocketCurrentFrameLength < $recv_len) {
  162. self::decode(substr($buffer, 0, $connection->websocketCurrentFrameLength), $connection);
  163. $connection->consumeRecvBuffer($connection->websocketCurrentFrameLength);
  164. $current_frame_length = $connection->websocketCurrentFrameLength;
  165. $connection->websocketCurrentFrameLength = 0;
  166. // Continue to read next frame.
  167. return self::input(substr($buffer, $current_frame_length), $connection);
  168. } // The length of the received data is less than the length of a frame.
  169. else {
  170. return 0;
  171. }
  172. }
  173. /**
  174. * Websocket encode.
  175. *
  176. * @param string $buffer
  177. * @param ConnectionInterface $connection
  178. * @return string
  179. */
  180. public static function encode($payload, $connection)
  181. {
  182. if (empty($connection->handshakeStep)) {
  183. self::sendHandshake($connection);
  184. }
  185. $mask = 1;
  186. $mask_key = "\x00\x00\x00\x00";
  187. $pack = '';
  188. $length = $length_flag = strlen($payload);
  189. if (65535 < $length) {
  190. $pack = pack('NN', ($length & 0xFFFFFFFF00000000) >> 0b100000, $length & 0x00000000FFFFFFFF);
  191. $length_flag = 127;
  192. } else if (125 < $length) {
  193. $pack = pack('n*', $length);
  194. $length_flag = 126;
  195. }
  196. $head = ($mask << 7) | $length_flag;
  197. $head = $connection->websocketType . chr($head) . $pack;
  198. $frame = $head . $mask_key;
  199. // append payload to frame:
  200. for ($i = 0; $i < $length; $i++) {
  201. $frame .= $payload[$i] ^ $mask_key[$i % 4];
  202. }
  203. if ($connection->handshakeStep === 1) {
  204. $connection->tmpWebsocketData = isset($connection->tmpWebsocketData) ? $connection->tmpWebsocketData . $frame : $frame;
  205. return '';
  206. }
  207. return $frame;
  208. }
  209. /**
  210. * Websocket decode.
  211. *
  212. * @param string $buffer
  213. * @param ConnectionInterface $connection
  214. * @return string
  215. */
  216. public static function decode($bytes, $connection)
  217. {
  218. $masked = $bytes[1] >> 7;
  219. $data_length = $masked ? ord($bytes[1]) & 127 : ord($bytes[1]);
  220. $decoded_data = '';
  221. if ($masked === true) {
  222. if ($data_length === 126) {
  223. $mask = substr($bytes, 4, 4);
  224. $coded_data = substr($bytes, 8);
  225. } else if ($data_length === 127) {
  226. $mask = substr($bytes, 10, 4);
  227. $coded_data = substr($bytes, 14);
  228. } else {
  229. $mask = substr($bytes, 2, 4);
  230. $coded_data = substr($bytes, 6);
  231. }
  232. for ($i = 0; $i < strlen($coded_data); $i++) {
  233. $decoded_data .= $coded_data[$i] ^ $mask[$i % 4];
  234. }
  235. } else {
  236. if ($data_length === 126) {
  237. $decoded_data = substr($bytes, 4);
  238. } else if ($data_length === 127) {
  239. $decoded_data = substr($bytes, 10);
  240. } else {
  241. $decoded_data = substr($bytes, 2);
  242. }
  243. }
  244. if ($connection->websocketCurrentFrameLength) {
  245. $connection->websocketDataBuffer .= $decoded_data;
  246. return $connection->websocketDataBuffer;
  247. } else {
  248. if ($connection->websocketDataBuffer !== '') {
  249. $decoded_data = $connection->websocketDataBuffer . $decoded_data;
  250. $connection->websocketDataBuffer = '';
  251. }
  252. return $decoded_data;
  253. }
  254. }
  255. /**
  256. * Send websocket handshake.
  257. *
  258. * @param \Workerman\Connection\TcpConnection $connection
  259. * @return void
  260. */
  261. public static function sendHandshake($connection)
  262. {
  263. // Get Host.
  264. $port = $connection->getRemotePort();
  265. $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
  266. // Handshake header.
  267. $header = "GET / HTTP/1.1\r\n".
  268. "Host: $host\r\n".
  269. "Connection: Upgrade\r\n".
  270. "Upgrade: websocket\r\n".
  271. "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
  272. "Sec-WebSocket-Version: 13\r\n".
  273. "Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
  274. $connection->send($header, true);
  275. $connection->handshakeStep = 1;
  276. $connection->websocketCurrentFrameLength = 0;
  277. $connection->websocketDataBuffer = '';
  278. if (empty($connection->websocketType)) {
  279. $connection->websocketType = self::BINARY_TYPE_BLOB;
  280. }
  281. }
  282. /**
  283. * Websocket handshake.
  284. *
  285. * @param string $buffer
  286. * @param \Workerman\Connection\TcpConnection $connection
  287. * @return int
  288. */
  289. public static function dealHandshake($buffer, $connection)
  290. {
  291. $pos = strpos($buffer, "\r\n\r\n");
  292. if ($pos) {
  293. // handshake complete
  294. $connection->handshakeStep = 2;
  295. $handshake_respnse_length = $pos + 4;
  296. // Try to emit onWebSocketConnect callback.
  297. if (isset($connection->onWebSocketConnect)) {
  298. try {
  299. call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_respnse_length));
  300. } catch (\Exception $e) {
  301. echo $e;
  302. exit(250);
  303. }
  304. }
  305. // Headbeat.
  306. if (!empty($connection->websocketPingInterval)) {
  307. $connection->websocketPingTimer = \Workerman\Lib\Timer::add($connection->websocketPingInterval, function() use ($connection){
  308. if (false === $connection->send(pack('H*', '8900'), true)) {
  309. \Workerman\Lib\Timer::del($connection->websocketPingTimer);
  310. }
  311. });
  312. }
  313. $connection->consumeRecvBuffer($handshake_respnse_length);
  314. if (!empty($connection->tmpWebsocketData)) {
  315. $connection->send($connection->tmpWebsocketData, true);
  316. }
  317. if (strlen($buffer > $handshake_respnse_length)) {
  318. return self::input(substr($buffer, $handshake_respnse_length));
  319. }
  320. }
  321. return 0;
  322. }
  323. }