Gateway.php 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. <?php
  2. /**
  3. *
  4. * 暴露给客户端的连接网关 只负责网络io
  5. * 1、监听客户端连接
  6. * 2、监听后端回应并转发回应给前端
  7. *
  8. * @author walkor <worker-man@qq.com>
  9. *
  10. */
  11. define('ROOT_DIR', realpath(__DIR__.'/../'));
  12. require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
  13. require_once ROOT_DIR . '/Lib/Store.php';
  14. class Gateway extends Man\Core\SocketWorker
  15. {
  16. /**
  17. * 内部通信socket
  18. * @var resouce
  19. */
  20. protected $innerMainSocket = null;
  21. /**
  22. * 内网ip
  23. * @var string
  24. */
  25. protected $lanIp = '127.0.0.1';
  26. /**
  27. * 内部通信端口
  28. * @var int
  29. */
  30. protected $lanPort = 0;
  31. /**
  32. * uid到连接的映射
  33. * @var array
  34. */
  35. protected $uidConnMap = array();
  36. /**
  37. * 连接到uid的映射
  38. * @var array
  39. */
  40. protected $connUidMap = array();
  41. /**
  42. * 到Worker的通信地址
  43. * @var array
  44. */
  45. protected $workerAddresses = array();
  46. /**
  47. * 进程启动
  48. */
  49. public function start()
  50. {
  51. // 安装信号处理函数
  52. $this->installSignal();
  53. // 添加accept事件
  54. $ret = $this->event->add($this->mainSocket, Man\Core\Events\BaseEvent::EV_READ, array($this, 'accept'));
  55. // 创建内部通信套接字
  56. $start_port = Man\Core\Lib\Config::get($this->workerName.'.lan_port_start');
  57. $this->lanPort = $start_port - posix_getppid() + posix_getpid();
  58. $this->lanIp = Man\Core\Lib\Config::get($this->workerName.'.lan_ip');
  59. if(!$this->lanIp)
  60. {
  61. $this->notice($this->workerName.'.lan_ip not set');
  62. $this->lanIp = '127.0.0.1';
  63. }
  64. $error_no = 0;
  65. $error_msg = '';
  66. $this->innerMainSocket = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no, $error_msg, STREAM_SERVER_BIND);
  67. if(!$this->innerMainSocket)
  68. {
  69. $this->notice('create innerMainSocket fail and exit '.$error_no . ':'.$error_msg);
  70. sleep(1);
  71. exit(0);
  72. }
  73. else
  74. {
  75. stream_set_blocking($this->innerMainSocket , 0);
  76. }
  77. $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort);
  78. // 添加读udp事件
  79. $this->event->add($this->innerMainSocket, Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
  80. // 初始化到worker的通信地址
  81. $this->initWorkerAddresses();
  82. // 主体循环,整个子进程会阻塞在这个函数上
  83. $ret = $this->event->loop();
  84. $this->notice('worker loop exit');
  85. exit(0);
  86. }
  87. /**
  88. * 存储全局的通信地址
  89. * @param string $address
  90. */
  91. protected function registerAddress($address)
  92. {
  93. \Man\Core\Lib\Mutex::get();
  94. $key = 'GLOBAL_GATEWAY_ADDRESS';
  95. $addresses_list = Store::get($key);
  96. if(empty($addresses_list))
  97. {
  98. $addresses_list = array();
  99. }
  100. $addresses_list[$address] = $address;
  101. Store::set($key, $addresses_list);
  102. \Man\Core\Lib\Mutex::release();
  103. }
  104. /**
  105. * 接收Udp数据
  106. * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
  107. * @param resource $socket
  108. * @param $null_one $flag
  109. * @param $null_two $base
  110. * @return void
  111. */
  112. public function recvUdp($socket, $null_one = null, $null_two = null)
  113. {
  114. $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
  115. // 惊群效应
  116. if(false === $data || empty($address))
  117. {
  118. return false;
  119. }
  120. $this->currentClientAddress = $address;
  121. $this->innerDealProcess($data);
  122. }
  123. protected function initWorkerAddresses()
  124. {
  125. $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.game_worker');
  126. if(!$this->workerAddresses)
  127. {
  128. $this->notice($this->workerName.'game_worker not set');
  129. }
  130. }
  131. public function dealInput($recv_str)
  132. {
  133. return 0;
  134. }
  135. public function innerDealProcess($recv_str)
  136. {
  137. $pack = new GatewayProtocol($recv_str);
  138. switch($pack->header['cmd'])
  139. {
  140. case GatewayProtocol::CMD_SEND_TO_ONE:
  141. return $this->sendToSocketId($pack->header['socket_id'], $pack->body);
  142. case GatewayProtocol::CMD_KICK:
  143. if($pack->body)
  144. {
  145. $this->sendToSocketId($pack->header['socket_id'], $pack->body);
  146. }
  147. return $this->closeClientBySocketId($pack->header['socket_id']);
  148. case GatewayProtocol::CMD_SEND_TO_ALL:
  149. return $this->broadCast($pack->body);
  150. case GatewayProtocol::CMD_CONNECT_SUCCESS:
  151. return $this->connectSuccess($pack->header['socket_id'], $pack->header['uid']);
  152. default :
  153. $this->notice('gateway inner pack cmd err data:' .$recv_str );
  154. }
  155. }
  156. protected function broadCast($bin_data)
  157. {
  158. foreach($this->uidConnMap as $uid=>$conn)
  159. {
  160. $this->sendToSocketId($conn, $bin_data);
  161. }
  162. }
  163. protected function closeClientBySocketId($socket_id)
  164. {
  165. if($uid = $this->getUidByFd($socket_id))
  166. {
  167. unset($this->uidConnMap[$uid], $this->connUidMap[$socket_id]);
  168. }
  169. parent::closeClient($socket_id);
  170. }
  171. protected function getFdByUid($uid)
  172. {
  173. if(isset($this->uidConnMap[$uid]))
  174. {
  175. return $this->uidConnMap[$uid];
  176. }
  177. return 0;
  178. }
  179. protected function getUidByFd($fd)
  180. {
  181. if(isset($this->connUidMap[$fd]))
  182. {
  183. return $this->connUidMap[$fd];
  184. }
  185. return 0;
  186. }
  187. protected function connectSuccess($socket_id, $uid)
  188. {
  189. $binded_uid = $this->getUidByFd($socket_id);
  190. if($binded_uid)
  191. {
  192. $this->notice('notify connection success fail ' . $socket_id . ' already binded data:'.serialize($data));
  193. return;
  194. }
  195. $this->uidConnMap[$uid] = $socket_id;
  196. $this->connUidMap[$socket_id] = $uid;
  197. }
  198. public function sendToSocketId($socket_id, $bin_data)
  199. {
  200. if(!isset($this->connections[$socket_id]))
  201. {
  202. return false;
  203. }
  204. $this->currentDealFd = $socket_id;
  205. return $this->sendToClient($bin_data);
  206. }
  207. protected function closeClient($fd)
  208. {
  209. if($uid = $this->getUidByFd($fd))
  210. {
  211. $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $fd);
  212. unset($this->uidConnMap[$uid], $this->connUidMap[$fd]);
  213. }
  214. parent::closeClient($fd);
  215. }
  216. public function dealProcess($recv_str)
  217. {
  218. // 判断用户是否认证过
  219. $from_uid = $this->getUidByFd($this->currentDealFd);
  220. // 触发ON_CONNECTION
  221. if(!$from_uid)
  222. {
  223. return $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $this->currentDealFd, $recv_str);
  224. }
  225. // 认证过, 触发ON_MESSAGE
  226. $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_str);
  227. }
  228. protected function sendToWorker($cmd, $socket_id, $body = '')
  229. {
  230. $address= $this->getRemoteAddress($socket_id);
  231. list($client_ip, $client_port) = explode(':', $address, 2);
  232. $pack = new GatewayProtocol();
  233. $pack->header['cmd'] = $cmd;
  234. $pack->header['local_ip'] = $this->lanIp;
  235. $pack->header['local_port'] = $this->lanPort;
  236. $pack->header['socket_id'] = $socket_id;
  237. $pack->header['client_ip'] = $client_ip;
  238. $pack->header['client_port'] = $client_ip;
  239. $pack->header['uid'] = $this->getUidByFd($socket_id);
  240. $pack->body = $body;
  241. return $this->sendBufferToWorker($pack->getBuffer());
  242. }
  243. protected function sendBufferToWorker($bin_data)
  244. {
  245. $client = stream_socket_client($this->workerAddresses[array_rand($this->workerAddresses)]);
  246. $len = stream_socket_sendto($client, $bin_data);
  247. return $len == strlen($bin_data);
  248. }
  249. protected function notice($str, $display=true)
  250. {
  251. $str = 'Worker['.get_class($this).']:'."$str ip:".$this->getRemoteIp();
  252. Man\Core\Lib\Log::add($str);
  253. if($display && Man\Core\Lib\Config::get('workerman.debug') == 1)
  254. {
  255. echo $str."\n";
  256. }
  257. }
  258. public function onStop()
  259. {
  260. Store::deleteAll();
  261. }
  262. }