Gateway.php 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace GatewayWorker\Lib;
  15. /**
  16. * 数据发送相关
  17. */
  18. use \Workerman\Protocols\GatewayProtocol;
  19. use \GatewayWorker\Lib\Store;
  20. use \GatewayWorker\Lib\Context;
  21. class Gateway
  22. {
  23. /**
  24. * gateway实例
  25. * @var object
  26. */
  27. protected static $businessWorker = null;
  28. /**
  29. * 向所有客户端(或者client_id_array指定的客户端)广播消息
  30. * @param string $message 向客户端发送的消息
  31. * @param array $client_id_array 客户端id数组
  32. */
  33. public static function sendToAll($message, $client_id_array = null)
  34. {
  35. $gateway_data = GatewayProtocol::$empty;
  36. $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_ALL;
  37. $gateway_data['body'] = $message;
  38. if($client_id_array)
  39. {
  40. $params = array_merge(array('N*'), $client_id_array);
  41. $gateway_data['ext_data'] = call_user_func_array('pack', $params);
  42. }
  43. elseif(empty($client_id_array) && is_array($client_id_array))
  44. {
  45. return;
  46. }
  47. // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
  48. if(self::$businessWorker)
  49. {
  50. foreach(self::$businessWorker->gatewayConnections as $gateway_connection)
  51. {
  52. $gateway_connection->send($gateway_data);
  53. }
  54. }
  55. // 运行在其它环境中,使用udp向worker发送数据
  56. else
  57. {
  58. $all_addresses = Store::instance('gateway')->get('GLOBAL_GATEWAY_ADDRESS');
  59. if(!$all_addresses)
  60. {
  61. throw new \Exception('GLOBAL_GATEWAY_ADDRESS is ' . var_export($all_addresses, true));
  62. }
  63. foreach($all_addresses as $address)
  64. {
  65. self::sendToGateway($address, $gateway_data);
  66. }
  67. }
  68. }
  69. /**
  70. * 向某个客户端发消息
  71. * @param int $client_id
  72. * @param string $message
  73. */
  74. public static function sendToClient($client_id, $message)
  75. {
  76. return self::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_SEND_TO_ONE, $message);
  77. }
  78. /**
  79. * 向当前客户端发送消息
  80. * @param string $message
  81. */
  82. public static function sendToCurrentClient($message)
  83. {
  84. return self::sendCmdAndMessageToClient(null, GatewayProtocol::CMD_SEND_TO_ONE, $message);
  85. }
  86. /**
  87. * 判断某个客户端是否在线
  88. * @param int $client_id
  89. * @return 0/1
  90. */
  91. public static function isOnline($client_id)
  92. {
  93. $address = Store::instance('gateway')->get('client_id-'.$client_id);
  94. if(!$address)
  95. {
  96. return 0;
  97. }
  98. $gateway_data = GatewayProtocol::$empty;
  99. $gateway_data['cmd'] = GatewayProtocol::CMD_IS_ONLINE;
  100. $gateway_data['client_id'] = $client_id;
  101. return self::sendUdpAndRecv($address, $gateway_data);
  102. }
  103. /**
  104. * 获取在线状态,目前返回一个在线client_id数组
  105. * @return array
  106. */
  107. public static function getOnlineStatus()
  108. {
  109. $gateway_data = GatewayProtocol::$empty;
  110. $gateway_data['cmd'] = GatewayProtocol::CMD_GET_ONLINE_STATUS;
  111. $gateway_buffer = GatewayProtocol::encode($gateway_data);
  112. $all_addresses = Store::instance('gateway')->get('GLOBAL_GATEWAY_ADDRESS');
  113. $client_array = $status_data = array();
  114. // 批量向所有gateway进程发送CMD_GET_ONLINE_STATUS命令
  115. foreach($all_addresses as $address)
  116. {
  117. $client = stream_socket_client("udp://$address", $errno, $errmsg);
  118. if(strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer))
  119. {
  120. $client_id = (int) $client;
  121. $client_array[$client_id] = $client;
  122. }
  123. }
  124. // 超时1秒
  125. $time_out = 1;
  126. $time_start = microtime(true);
  127. // 批量接收请求
  128. while(count($client_array) > 0)
  129. {
  130. $write = $except = array();
  131. $read = $client_array;
  132. if(@stream_select($read, $write, $except, $time_out))
  133. {
  134. foreach($read as $client)
  135. {
  136. // udp
  137. $data = json_decode(fread($client, 65535), true);
  138. if($data)
  139. {
  140. $status_data = array_merge($status_data, $data);
  141. }
  142. unset($client_array[$client]);
  143. }
  144. }
  145. if(microtime(true) - $time_start > $time_out)
  146. {
  147. break;
  148. }
  149. }
  150. return $status_data;
  151. }
  152. /**
  153. * 关闭某个客户端
  154. * @param int $client_id
  155. * @param string $message
  156. */
  157. public static function closeClient($client_id)
  158. {
  159. if($client_id === Context::$client_id)
  160. {
  161. return self::closeCurrentClient();
  162. }
  163. // 不是发给当前用户则使用存储中的地址
  164. else
  165. {
  166. $address = Store::instance('gateway')->get('client_id-'.$client_id);
  167. if(!$address)
  168. {
  169. return false;
  170. }
  171. return self::kickAddress($address, $client_id);
  172. }
  173. }
  174. /**
  175. * 踢掉当前客户端
  176. * @param string $message
  177. */
  178. public static function closeCurrentClient()
  179. {
  180. return self::kickAddress(Context::$local_ip.':'.Context::$local_port, Context::$client_id);
  181. }
  182. /**
  183. * 更新session,框架自动调用,开发者不要调用
  184. * @param int $client_id
  185. * @param string $session_str
  186. */
  187. public static function updateSocketSession($client_id, $session_str)
  188. {
  189. $gateway_data = GatewayProtocol::$empty;
  190. $gateway_data['cmd'] = GatewayProtocol::CMD_UPDATE_SESSION;
  191. $gateway_data['client_id'] = $client_id;
  192. $gateway_data['ext_data'] = $session_str;
  193. return self::sendToGateway(Context::$local_ip . ':' . Context::$local_port, $gateway_data);
  194. }
  195. /**
  196. * 想某个用户网关发送命令和消息
  197. * @param int $client_id
  198. * @param int $cmd
  199. * @param string $message
  200. * @return boolean
  201. */
  202. protected static function sendCmdAndMessageToClient($client_id, $cmd , $message)
  203. {
  204. // 如果是发给当前用户则直接获取上下文中的地址
  205. if($client_id === Context::$client_id || $client_id === null)
  206. {
  207. $address = Context::$local_ip.':'.Context::$local_port;
  208. }
  209. else
  210. {
  211. $address = Store::instance('gateway')->get('client_id-'.$client_id);
  212. if(!$address)
  213. {
  214. return false;
  215. }
  216. }
  217. $gateway_data = GatewayProtocol::$empty;
  218. $gateway_data['cmd'] = $cmd;
  219. $gateway_data['client_id'] = $client_id ? $client_id : Context::$client_id;
  220. $gateway_data['body'] = $message;
  221. return self::sendToGateway($address, $gateway_data);
  222. }
  223. /**
  224. * 发送udp数据并返回
  225. * @param int $address
  226. * @param string $message
  227. * @return boolean
  228. */
  229. protected static function sendUdpAndRecv($address , $data)
  230. {
  231. $buffer = GatewayProtocol::encode($data);
  232. // 非workerman环境,使用udp发送数据
  233. $client = stream_socket_client("udp://$address", $errno, $errmsg);
  234. if(strlen($buffer) == stream_socket_sendto($client, $buffer))
  235. {
  236. // 阻塞读
  237. stream_set_blocking($client, 1);
  238. // 1秒超时
  239. stream_set_timeout($client, 1);
  240. // 读udp数据
  241. $data = fread($client, 655350);
  242. // 返回结果
  243. return json_decode($data, true);
  244. }
  245. else
  246. {
  247. throw new \Exception("sendUdpAndRecv($address, \$bufer) fail ! Can not send UDP data!", 502);
  248. }
  249. }
  250. /**
  251. * 发送数据到网关
  252. * @param string $address
  253. * @param string $buffer
  254. */
  255. protected static function sendToGateway($address, $gateway_data)
  256. {
  257. // 有$businessWorker说明是workerman环境,使用$businessWorker发送数据
  258. if(self::$businessWorker)
  259. {
  260. if(!isset(self::$businessWorker->gatewayConnections[$address]))
  261. {
  262. return false;
  263. }
  264. return self::$businessWorker->gatewayConnections[$address]->send($gateway_data);
  265. }
  266. // 非workerman环境,使用udp发送数据
  267. $gateway_buffer = GatewayProtocol::encode($gateway_data);
  268. $client = stream_socket_client("udp://$address", $errno, $errmsg);
  269. return strlen($gateway_buffer) == stream_socket_sendto($client, $gateway_buffer);
  270. }
  271. /**
  272. * 踢掉某个网关的socket
  273. * @param string $local_ip
  274. * @param int $local_port
  275. * @param int $client_id
  276. * @param string $message
  277. * @param int $client_id
  278. */
  279. protected static function kickAddress($address, $client_id)
  280. {
  281. $gateway_data = GatewayProtocol::$empty;
  282. $gateway_data['cmd'] = GatewayProtocol::CMD_KICK;
  283. $gateway_data['client_id'] = $client_id;
  284. return self::sendToGateway($address, $gateway_data);
  285. }
  286. /**
  287. * 设置gateway实例
  288. * @param Bootstrap/Gateway $gateway_instance
  289. */
  290. public static function setBusinessWorker($business_worker_instance)
  291. {
  292. self::$businessWorker = $business_worker_instance;
  293. }
  294. }