Ws.php 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. <?php
  2. namespace Workerman\Protocols;
  3. /**
  4. * Websocket protocol for client.
  5. */
  6. class Ws
  7. {
  8. /**
  9. * Websocket blob type.
  10. *
  11. * @var string
  12. */
  13. const BINARY_TYPE_BLOB = "\x81";
  14. /**
  15. * Websocket arraybuffer type.
  16. *
  17. * @var string
  18. */
  19. const BINARY_TYPE_ARRAYBUFFER = "\x82";
  20. /**
  21. * Check the integrity of the package.
  22. *
  23. * @param string $buffer
  24. * @param ConnectionInterface $connection
  25. * @return int
  26. */
  27. public static function input($buffer, $connection)
  28. {
  29. if (empty($connection->handshakeStep)) {
  30. echo "recv data before handshake\n";
  31. return false;
  32. }
  33. // Recv handshake response
  34. if ($connection->handshakeStep === 1) {
  35. $pos = strpos($buffer, "\r\n\r\n");
  36. if ($pos) {
  37. // handshake complete
  38. $connection->handshakeStep = 2;
  39. $handshake_respnse_length = $pos + 4;
  40. // Try to emit onWebSocketConnect callback.
  41. if (isset($connection->onWebSocketConnect)) {
  42. try {
  43. call_user_func($connection->onWebSocketConnect, $connection, substr($buffer, 0, $handshake_respnse_length));
  44. } catch (\Exception $e) {
  45. echo $e;
  46. exit(250);
  47. }
  48. }
  49. // Headbeat.
  50. if (!empty($connection->websocketPingInterval)) {
  51. $connection->websocketPingTimer = \Workerman\Lib\Timer::add($connection->websocketPingInterval, function() use ($connection){
  52. if (false === $connection->send(pack('H*', '8900'), true)) {
  53. \Workerman\Lib\Timer::del($connection->websocketPingTimer);
  54. }
  55. });
  56. }
  57. $connection->consumeRecvBuffer($handshake_respnse_length);
  58. if (!empty($connection->tmpWebsocketData)) {
  59. $connection->send($connection->tmpWebsocketData, true);
  60. }
  61. if (strlen($buffer > $handshake_respnse_length)) {
  62. return self::input(substr($buffer, $handshake_respnse_length));
  63. }
  64. }
  65. return 0;
  66. }
  67. if (strlen($buffer) < 2) {
  68. return 0;
  69. }
  70. $opcode = ord($buffer[0]) & 0xf;
  71. $data_len = ord($buffer[1]) & 127;
  72. switch ($opcode) {
  73. case 0x0:
  74. break;
  75. // Blob type.
  76. case 0x1:
  77. break;
  78. // Arraybuffer type.
  79. case 0x2:
  80. break;
  81. // Close package.
  82. case 0x8:
  83. // Try to emit onWebSocketClose callback.
  84. if (isset($connection->onWebSocketClose)) {
  85. try {
  86. call_user_func($connection->onWebSocketClose, $connection);
  87. } catch (\Exception $e) {
  88. echo $e;
  89. exit(250);
  90. }
  91. } else {
  92. // Close connection.
  93. $connection->close();
  94. }
  95. return 0;
  96. // Ping package.
  97. case 0x9:
  98. // Try to emit onWebSocketPing callback.
  99. if (isset($connection->onWebSocketPing)) {
  100. try {
  101. call_user_func($connection->onWebSocketPing, $connection);
  102. } catch (\Exception $e) {
  103. echo $e;
  104. exit(250);
  105. }
  106. } else {
  107. // Send pong package to remote.
  108. $connection->send(pack('H*', '8a00'), true);
  109. }
  110. // Consume data from receive buffer.
  111. if (!$data_len) {
  112. $connection->consumeRecvBuffer(2);
  113. return 0;
  114. }
  115. break;
  116. // Pong package.
  117. case 0xa:
  118. // Try to emit onWebSocketPong callback.
  119. if (isset($connection->onWebSocketPong)) {
  120. try {
  121. call_user_func($connection->onWebSocketPong, $connection);
  122. } catch (\Exception $e) {
  123. echo $e;
  124. exit(250);
  125. }
  126. }
  127. // Consume data from receive buffer.
  128. if (!$data_len) {
  129. $connection->consumeRecvBuffer(2);
  130. return 0;
  131. }
  132. break;
  133. }
  134. if ($data_len === 126) {
  135. if (strlen($buffer) < 6) {
  136. return 0;
  137. }
  138. $pack = unpack('nn/ntotal_len', $buffer);
  139. $data_len = $pack['total_len'] + 4;
  140. } else if ($data_len === 127) {
  141. if (strlen($buffer) < 10) {
  142. return 0;
  143. }
  144. $arr = unpack('n/N2c', $buffer);
  145. $data_len = $arr['c1']*4294967296 + $arr['c2'] + 10;
  146. } else {
  147. $data_len += 2;
  148. }
  149. return $data_len;
  150. }
  151. /**
  152. * Websocket encode.
  153. *
  154. * @param string $buffer
  155. * @param ConnectionInterface $connection
  156. * @return string
  157. */
  158. public static function encode($payload, $connection)
  159. {
  160. if (empty($connection->handshakeStep)) {
  161. // Get Host.
  162. $port = $connection->getRemotePort();
  163. $host = $port === 80 ? $connection->getRemoteHost() : $connection->getRemoteHost() . ':' . $port;
  164. // Handshake header.
  165. $header = "GET / HTTP/1.1\r\n".
  166. "Host: $host\r\n".
  167. "Connection: Upgrade\r\n".
  168. "Upgrade: websocket\r\n".
  169. "Origin: ". (isset($connection->websocketOrigin) ? $connection->websocketOrigin : '*') ."\r\n".
  170. "Sec-WebSocket-Version: 13\r\n".
  171. "Sec-WebSocket-Key: ".base64_encode(sha1(uniqid(mt_rand(), true), true))."\r\n\r\n";
  172. $connection->send($header, true);
  173. $connection->handshakeStep = 1;
  174. if (empty($connection->websocketType)) {
  175. $connection->websocketType = self::BINARY_TYPE_BLOB;
  176. }
  177. }
  178. $mask = 1;
  179. $mask_key = "\x00\x00\x00\x00";
  180. $pack = '';
  181. $length = $length_flag = strlen($payload);
  182. if (65535 < $length) {
  183. $pack = pack('NN', ($length & 0xFFFFFFFF00000000) >> 0b100000, $length & 0x00000000FFFFFFFF);
  184. $length_flag = 127;
  185. } else if (125 < $length) {
  186. $pack = pack('n*', $length);
  187. $length_flag = 126;
  188. }
  189. $head = ($mask << 7) | $length_flag;
  190. $head = $connection->websocketType . chr($head) . $pack;
  191. $frame = $head . $mask_key;
  192. // append payload to frame:
  193. for ($i = 0; $i < $length; $i++) {
  194. $frame .= $payload[$i] ^ $mask_key[$i % 4];
  195. }
  196. if ($connection->handshakeStep === 1) {
  197. $connection->tmpWebsocketData = isset($connection->tmpWebsocketData) ? $connection->tmpWebsocketData . $frame : $frame;
  198. return '';
  199. }
  200. return $frame;
  201. }
  202. /**
  203. * Websocket decode.
  204. *
  205. * @param string $buffer
  206. * @param ConnectionInterface $connection
  207. * @return string
  208. */
  209. public static function decode($bytes, $connection)
  210. {
  211. $masked = $bytes[1] >> 7;
  212. $data_length = $masked ? ord($bytes[1]) & 127 : ord($bytes[1]);
  213. $decoded_data = '';
  214. if ($masked === true) {
  215. if ($data_length === 126) {
  216. $mask = substr($bytes, 4, 4);
  217. $coded_data = substr($bytes, 8);
  218. } else if ($data_length === 127) {
  219. $mask = substr($bytes, 10, 4);
  220. $coded_data = substr($bytes, 14);
  221. } else {
  222. $mask = substr($bytes, 2, 4);
  223. $coded_data = substr($bytes, 6);
  224. }
  225. for ($i = 0; $i < strlen($coded_data); $i++) {
  226. $decoded_data .= $coded_data[$i] ^ $mask[$i % 4];
  227. }
  228. } else {
  229. if ($data_length === 126) {
  230. $decoded_data = substr($bytes, 4);
  231. } else if ($data_length === 127) {
  232. $decoded_data = substr($bytes, 10);
  233. } else {
  234. $decoded_data = substr($bytes, 2);
  235. }
  236. }
  237. return $decoded_data;
  238. }
  239. }