GameGateway.php 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. <?php
  2. /**
  3. *
  4. * 暴露给客户端的连接网关 只负责网络io
  5. * 1、监听客户端连接
  6. * 2、监听后端回应并转发回应给前端
  7. *
  8. * @author walkor <worker-man@qq.com>
  9. *
  10. */
  11. require_once WORKERMAN_ROOT_DIR . 'Core/SocketWorker.php';
  12. require_once WORKERMAN_ROOT_DIR . 'Applications/GameBuffer.php';
  13. require_once WORKERMAN_ROOT_DIR . 'Applications/Store.php';
  14. class GameGateway extends WORKERMAN\Core\SocketWorker
  15. {
  16. // 内部通信socket
  17. protected $innerMainSocket = null;
  18. // uid到连接的映射
  19. protected $uidConnMap = array();
  20. // 连接到uid的映射
  21. protected $connUidMap = array();
  22. // 到GameWorker的通信地址
  23. protected $workerAddresses = array();
  24. // 当前处理的包数据
  25. protected $data = array();
  26. protected $onConnectBuffer = '';
  27. protected $onCloseBuffer = '';
  28. public function start()
  29. {
  30. // 安装信号处理函数
  31. $this->installSignal();
  32. // 添加accept事件
  33. $ret = $this->event->add($this->mainSocket, WORKERMAN\Core\Events\BaseEvent::EV_READ, array($this, 'accept'));
  34. // 创建内部通信套接字
  35. $inner_port = posix_getpid();
  36. $lan_ip = WORKERMAN\Core\Lib\Config::get('workers.'.$this->workerName.'.lan_ip');
  37. if(!$lan_ip)
  38. {
  39. $this->notice($this->workerName.'.lan_ip not set');
  40. $lan_ip = '127.0.0.1';
  41. }
  42. $error_no = 0;
  43. $error_msg = '';
  44. $this->innerMainSocket = stream_socket_server("udp://$lan_ip:$inner_port", $error_no, $error_msg, STREAM_SERVER_BIND);
  45. if(!$this->innerMainSocket)
  46. {
  47. $this->notice('create innerMainSocket fail and exit '.$error_no . ':'.$error_msg);
  48. sleep(1);
  49. exit(0);
  50. }
  51. else
  52. {
  53. stream_set_blocking($this->innerMainSocket , 0);
  54. }
  55. $this->registerAddress("udp://$lan_ip:$inner_port");
  56. // 添加读udp事件
  57. $this->event->add($this->innerMainSocket, WORKERMAN\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
  58. // 初始化onConnetct / onclose buffer
  59. $this->initOnBuffer();
  60. // 初始化到worker的通信地址
  61. $this->initWorkerAddresses();
  62. // 主体循环,整个子进程会阻塞在这个函数上
  63. $ret = $this->event->loop();
  64. $this->notice('worker loop exit');
  65. exit(0);
  66. }
  67. /**
  68. * 存储全局的通信地址
  69. * @param string $address
  70. * @todo 用锁机制等保证数据完整性
  71. */
  72. protected function registerAddress($address)
  73. {
  74. $key = 'GLOBAL_GATEWAY_ADDRESS';
  75. $addresses = Store::get($key);
  76. if(empty($addresses))
  77. {
  78. $addresses = array($address);
  79. }
  80. else
  81. {
  82. $addresses[] = $address;
  83. }
  84. Store::set($key, $addresses);
  85. }
  86. /**
  87. * 接收Udp数据
  88. * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
  89. * @param resource $socket
  90. * @param $null_one $flag
  91. * @param $null_two $base
  92. * @return void
  93. */
  94. public function recvUdp($socket, $null_one = null, $null_two = null)
  95. {
  96. $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
  97. // 惊群效应
  98. if(false === $data || empty($address))
  99. {
  100. return false;
  101. }
  102. $this->currentClientAddress = $address;
  103. $this->innerDealProcess($data);
  104. }
  105. protected function initOnBuffer()
  106. {
  107. $buffer = new GameBuffer();
  108. $buffer->header['cmd'] = GameBuffer::CMD_SYSTEM;
  109. $buffer->header['sub_cmd'] = GameBuffer::SCMD_ON_CONNECT;
  110. $buffer->body = '';
  111. $this->onConnectBuffer = $buffer->getBuffer();
  112. $buffer->header['sub_cmd'] = GameBuffer::SCMD_ON_CLOSE;
  113. $this->onCloseBuffer = $buffer->getBuffer();
  114. }
  115. protected function initWorkerAddresses()
  116. {
  117. $this->workerAddresses = WORKERMAN\Core\Lib\Config::get('workers.'.$this->workerName.'.game_worker');
  118. if(!$this->workerAddresses)
  119. {
  120. $this->notice($this->workerName.'game_worker not set');
  121. }
  122. }
  123. public function dealInput($recv_str)
  124. {
  125. return GameBuffer::input($recv_str, $this->data);
  126. }
  127. public function innerDealProcess($recv_str)
  128. {
  129. $data = GameBuffer::decode($recv_str);
  130. if($data['cmd'] != GameBuffer::CMD_GATEWAY)
  131. {
  132. $this->notice('gateway inner pack err data:' .$recv_str . ' serialize:' . serialize($data) );
  133. return;
  134. }
  135. switch($data['sub_cmd'])
  136. {
  137. case GameBuffer::SCMD_SEND_DATA:
  138. return $this->sendToUid($data['to_uid'], $recv_str);
  139. case GameBuffer::SCMD_KICK_UID:
  140. return $this->closeClientByUid($data['to_uid'] );
  141. case GameBuffer::SCMD_KICK_ADDRESS:
  142. $fd = (int)trim($data['body']);
  143. $uid = $this->getUidByFd($fd);
  144. if($uid)
  145. {
  146. return $this->closeClientByUid($uid);
  147. }
  148. return;
  149. case GameBuffer::SCMD_BROADCAST:
  150. return $this->broadCast($recv_str);
  151. }
  152. }
  153. protected function broadCast($bin_data)
  154. {
  155. foreach($this->uidConnMap as $uid=>$conn)
  156. {
  157. $this->sendToUid($uid, $bin_data);
  158. }
  159. }
  160. public function closeClientByUid($uid)
  161. {
  162. $fd = $this->getFdByUid($uid);
  163. if($fd)
  164. {
  165. unset($this->uidConnMap[$uid], $this->connUidMap[$fd]);
  166. parent::closeClient($fd);
  167. }
  168. }
  169. protected function getFdByUid($uid)
  170. {
  171. if(isset($this->uidConnMap[$uid]))
  172. {
  173. return $this->uidConnMap[$uid];
  174. }
  175. return 0;
  176. }
  177. protected function getUidByFd($fd)
  178. {
  179. if(isset($this->connUidMap[$fd]))
  180. {
  181. return $this->connUidMap[$fd];
  182. }
  183. return 0;
  184. }
  185. public function sendToUid($uid, $bin_data)
  186. {
  187. if(!isset($this->uidConnMap[$uid]))
  188. {
  189. return false;
  190. }
  191. $send_len = fwrite($this->connections[$this->uidConnMap[$uid]], $bin_data);
  192. return $send_len == strlen($bin_data);
  193. }
  194. public function dealProcess($recv_str)
  195. {
  196. // 判断用户是否认证过
  197. $from_uid = $this->getUidByFd($this->currentDealFd);
  198. if(!$from_uid)
  199. {
  200. // 没传sid
  201. if(empty($this->data->body))
  202. {
  203. $this->notice("onConnect miss sid ip:".$this->getRemoteIp(). " data[".serialize($this->data)."]");
  204. $this->closeClient($this->currentDealFd);
  205. return;
  206. }
  207. // 发送onconnet事件包,包体是sid
  208. $this->sendToWorker($this->onConnectBuffer.$this->data->body);
  209. return;
  210. }
  211. // 认证过
  212. $this->fillFromUid($recv_str, $from_uid);
  213. $this->sendToWorker($recv_str);
  214. }
  215. // 讲协议的from_uid填充为正确的值
  216. protected function fillFromUid(&$bin_data, $from_uid)
  217. {
  218. // from_uid在包头的12-15字节
  219. $bin_data = substr_replace($bin_data, pack('I', $from_uid), 11, 4);
  220. }
  221. protected function sendToWorker($bin_data)
  222. {
  223. $len = stream_socket_sendto($this->innerMainSocket, $bin_data, 0, $this->workerAddresses[array_rand($this->workerAddresses)]);
  224. return $len == strlen($bin_data);
  225. }
  226. protected function notice($str, $display=true)
  227. {
  228. $str = 'Worker['.get_class($this).']:'."$str ip:".$this->getRemoteIp();
  229. WORKERMAN\Core\Lib\Log::add($str);
  230. if($display && WORKERMAN\Core\Lib\Config::get('debug') == 1)
  231. {
  232. echo $str."\n";
  233. }
  234. }
  235. }