GameGateway.php 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. <?php
  2. /**
  3. *
  4. * 暴露给客户端的连接网关 只负责网络io
  5. * 1、监听客户端连接
  6. * 2、监听后端回应并转发回应给前端
  7. *
  8. * @author walkor <worker-man@qq.com>
  9. *
  10. */
  11. require_once WORKERMAN_ROOT_DIR . 'man/Core/SocketWorker.php';
  12. require_once WORKERMAN_ROOT_DIR . 'applications/Game/GameBuffer.php';
  13. require_once WORKERMAN_ROOT_DIR . 'applications/Game/Store.php';
  14. class GameGateway extends Man\Core\SocketWorker
  15. {
  16. // 内部通信socket
  17. protected $innerMainSocket = null;
  18. // 内网ip
  19. protected $lanIp = '127.0.0.1';
  20. // 内部通信端口
  21. protected $lanPort = 0;
  22. // uid到连接的映射
  23. protected $uidConnMap = array();
  24. // 连接到uid的映射
  25. protected $connUidMap = array();
  26. // 到GameWorker的通信地址
  27. protected $workerAddresses = array();
  28. // 当前处理的包数据
  29. protected $data = array();
  30. public function start()
  31. {
  32. // 安装信号处理函数
  33. $this->installSignal();
  34. // 添加accept事件
  35. $ret = $this->event->add($this->mainSocket, Man\Core\Events\BaseEvent::EV_READ, array($this, 'accept'));
  36. // 创建内部通信套接字
  37. $start_port = Man\Core\Lib\Config::get($this->workerName.'.lan_port_start');
  38. $this->lanPort = $start_port - posix_getppid() + posix_getpid();
  39. $this->lanIp = Man\Core\Lib\Config::get($this->workerName.'.lan_ip');
  40. if(!$this->lanIp)
  41. {
  42. $this->notice($this->workerName.'.lan_ip not set');
  43. $this->lanIp = '127.0.0.1';
  44. }
  45. $error_no = 0;
  46. $error_msg = '';
  47. $this->innerMainSocket = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no, $error_msg, STREAM_SERVER_BIND);
  48. if(!$this->innerMainSocket)
  49. {
  50. $this->notice('create innerMainSocket fail and exit '.$error_no . ':'.$error_msg);
  51. sleep(1);
  52. exit(0);
  53. }
  54. else
  55. {
  56. stream_set_blocking($this->innerMainSocket , 0);
  57. }
  58. $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort);
  59. // 添加读udp事件
  60. $this->event->add($this->innerMainSocket, Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
  61. // 初始化到worker的通信地址
  62. $this->initWorkerAddresses();
  63. // 主体循环,整个子进程会阻塞在这个函数上
  64. $ret = $this->event->loop();
  65. $this->notice('worker loop exit');
  66. exit(0);
  67. }
  68. /**
  69. * 存储全局的通信地址
  70. * @param string $address
  71. * @todo 用锁机制等保证数据完整性
  72. */
  73. protected function registerAddress($address)
  74. {
  75. $key = 'GLOBAL_GATEWAY_ADDRESS';
  76. $addresses = Store::get($key);
  77. if(empty($addresses))
  78. {
  79. $addresses = array($address);
  80. }
  81. else
  82. {
  83. $addresses[] = $address;
  84. }
  85. Store::set($key, $addresses);
  86. }
  87. /**
  88. * 接收Udp数据
  89. * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
  90. * @param resource $socket
  91. * @param $null_one $flag
  92. * @param $null_two $base
  93. * @return void
  94. */
  95. public function recvUdp($socket, $null_one = null, $null_two = null)
  96. {
  97. $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
  98. // 惊群效应
  99. if(false === $data || empty($address))
  100. {
  101. return false;
  102. }
  103. $this->currentClientAddress = $address;
  104. $this->innerDealProcess($data);
  105. }
  106. protected function initWorkerAddresses()
  107. {
  108. $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.game_worker');
  109. if(!$this->workerAddresses)
  110. {
  111. $this->notice($this->workerName.'game_worker not set');
  112. }
  113. }
  114. public function dealInput($recv_str)
  115. {
  116. return GameBuffer::input($recv_str, $this->data);
  117. }
  118. public function innerDealProcess($recv_str)
  119. {
  120. $data = GameBuffer::decode($recv_str);
  121. if($data['cmd'] != GameBuffer::CMD_GATEWAY)
  122. {
  123. $this->notice('gateway inner pack err data:' .$recv_str . ' serialize:' . serialize($data) );
  124. return;
  125. }
  126. switch($data['sub_cmd'])
  127. {
  128. case GameBuffer::SCMD_SEND_DATA:
  129. return $this->sendToUid($data['to_uid'], $recv_str);
  130. case GameBuffer::SCMD_KICK_UID:
  131. return $this->closeClientByUid($data['to_uid'] );
  132. case GameBuffer::SCMD_KICK_ADDRESS:
  133. $fd = (int)trim($data['body']);
  134. $uid = $this->getUidByFd($fd);
  135. if($uid)
  136. {
  137. return $this->closeClientByUid($uid);
  138. }
  139. return;
  140. case GameBuffer::SCMD_BROADCAST:
  141. return $this->broadCast($recv_str);
  142. case GameBuffer::SCMD_CONNECT_SUCCESS:
  143. $socket_id = $data['from_uid'];
  144. $uid = $data['to_uid'];
  145. // 查看是否已经绑定uid
  146. $binded_uid = $this->getUidByFd($socket_id);
  147. if($binded_uid)
  148. {
  149. $this->notice('notify connection success fail ' . $socket_id . ' already binded data:'.serialize($data));
  150. return;
  151. }
  152. $this->uidConnMap[$uid] = $socket_id;
  153. $this->connUidMap[$socket_id] = $uid;
  154. $this->sendToUid($uid, $recv_str);
  155. return;
  156. default :
  157. $this->notice('gateway inner pack sub_cmd err data:' .$recv_str . ' serialize:' . serialize($data) );
  158. }
  159. }
  160. protected function broadCast($bin_data)
  161. {
  162. foreach($this->uidConnMap as $uid=>$conn)
  163. {
  164. $this->sendToUid($uid, $bin_data);
  165. }
  166. }
  167. public function closeClientByUid($uid)
  168. {
  169. $fd = $this->getFdByUid($uid);
  170. if($fd)
  171. {
  172. unset($this->uidConnMap[$uid], $this->connUidMap[$fd]);
  173. parent::closeClient($fd);
  174. }
  175. }
  176. protected function getFdByUid($uid)
  177. {
  178. if(isset($this->uidConnMap[$uid]))
  179. {
  180. return $this->uidConnMap[$uid];
  181. }
  182. return 0;
  183. }
  184. protected function getUidByFd($fd)
  185. {
  186. if(isset($this->connUidMap[$fd]))
  187. {
  188. return $this->connUidMap[$fd];
  189. }
  190. return 0;
  191. }
  192. public function sendToUid($uid, $bin_data)
  193. {
  194. if(!isset($this->uidConnMap[$uid]))
  195. {
  196. return false;
  197. }
  198. $send_len = fwrite($this->connections[$this->uidConnMap[$uid]], $bin_data);
  199. return $send_len == strlen($bin_data);
  200. }
  201. public function dealProcess($recv_str)
  202. {
  203. // 判断用户是否认证过
  204. $from_uid = $this->getUidByFd($this->currentDealFd);
  205. if(!$from_uid)
  206. {
  207. // 没传sid
  208. if(empty($this->data['body']))
  209. {
  210. $this->notice("onConnect miss sid ip:".$this->getRemoteIp(). " data[".serialize($this->data)."]");
  211. $this->closeClient($this->currentDealFd);
  212. return;
  213. }
  214. // 发送onconnet事件包,包体是sid
  215. $on_buffer = new GameBuffer();
  216. $on_buffer->header['cmd'] = GameBuffer::CMD_SYSTEM;
  217. $on_buffer->header['sub_cmd'] = GameBuffer::SCMD_ON_CONNECT;
  218. // 用from_uid来临时存储socketid
  219. $on_buffer->header['from_uid'] = $this->currentDealFd;
  220. // 用to_uid来临时存储通信端口号
  221. $on_buffer->header['to_uid'] = $this->lanPort;
  222. $on_buffer->body = $this->data['body'];
  223. $this->sendToWorker($on_buffer->getBuffer());
  224. return;
  225. }
  226. // 认证过
  227. $this->fillFromUid($recv_str, $from_uid);
  228. $this->sendToWorker($recv_str);
  229. }
  230. // 讲协议的from_uid填充为正确的值
  231. protected function fillFromUid(&$bin_data, $from_uid)
  232. {
  233. // from_uid在包头的12-15字节
  234. $bin_data = substr_replace($bin_data, pack('I', $from_uid), 11, 4);
  235. }
  236. protected function sendToWorker($bin_data)
  237. {
  238. $client = stream_socket_client($this->workerAddresses[array_rand($this->workerAddresses)]);
  239. $len = stream_socket_sendto($client, $bin_data);
  240. return $len == strlen($bin_data);
  241. }
  242. protected function notice($str, $display=true)
  243. {
  244. $str = 'Worker['.get_class($this).']:'."$str ip:".$this->getRemoteIp();
  245. Man\Core\Lib\Log::add($str);
  246. if($display && Man\Core\Lib\Config::get('workerman.debug') == 1)
  247. {
  248. echo $str."\n";
  249. }
  250. }
  251. }