Gateway.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. <?php
  2. /**
  3. *
  4. * 暴露给客户端的连接网关 只负责网络io
  5. * 1、监听客户端连接
  6. * 2、监听后端回应并转发回应给前端
  7. *
  8. * @author walkor <workerman.net>
  9. *
  10. */
  11. define('ROOT_DIR', realpath(__DIR__.'/../'));
  12. require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
  13. require_once ROOT_DIR . '/Lib/Store.php';
  14. class Gateway extends Man\Core\SocketWorker
  15. {
  16. /**
  17. * 内部通信socket
  18. * @var resouce
  19. */
  20. protected $innerMainSocket_udp = null;
  21. /**
  22. * 内网ip
  23. * @var string
  24. */
  25. protected $lanIp = '127.0.0.1';
  26. /**
  27. * 内部通信端口
  28. * @var int
  29. */
  30. protected $lanPort = 0;
  31. /**
  32. * uid到连接的映射
  33. * @var array
  34. */
  35. protected $uidConnMap = array();
  36. /**
  37. * 连接到uid的映射
  38. * @var array
  39. */
  40. protected $connUidMap = array();
  41. /**
  42. * 到Worker的通信地址
  43. * @var array
  44. */
  45. protected $workerAddresses = array();
  46. /**
  47. * 进程启动
  48. */
  49. public function start()
  50. {
  51. // 安装信号处理函数
  52. $this->installSignal();
  53. // 添加accept事件
  54. $ret = $this->event->add($this->mainSocket, Man\Core\Events\BaseEvent::EV_READ, array($this, 'accept'));
  55. // 创建内部通信套接字
  56. $start_port = Man\Core\Lib\Config::get($this->workerName.'.lan_port_start');
  57. $this->lanPort = $start_port - posix_getppid() + posix_getpid();
  58. $this->lanIp = Man\Core\Lib\Config::get($this->workerName.'.lan_ip');
  59. if(!$this->lanIp)
  60. {
  61. $this->notice($this->workerName.'.lan_ip not set');
  62. $this->lanIp = '127.0.0.1';
  63. }
  64. $error_no_udp = $error_no_tcp = 0;
  65. $error_msg_udp = $error_msg_tcp = '';
  66. $this->innerMainSocket_udp = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no_udp, $error_msg_udp, STREAM_SERVER_BIND);
  67. $this->innerMainSocket_tcp = stream_socket_server("tcp://".$this->lanIp.':'.$this->lanPort, $error_no_tcp, $error_msg_tcp, STREAM_SERVER_BIND);
  68. if(!$this->innerMainSocket_udp || !$this->innerMainSocket_tcp)
  69. {
  70. $this->notice('create innerMainSocket udp or tcp fail and exit '.$error_msg_udp.$error_msg_tcp);
  71. sleep(1);
  72. exit(0);
  73. }
  74. else
  75. {
  76. stream_set_blocking($this->innerMainSocket_udp , 0);
  77. stream_set_blocking($this->innerMainSocket_tcp , 0);
  78. }
  79. $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
  80. $this->registerAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
  81. // 添加读udp事件
  82. $this->event->add($this->innerMainSocket_udp, Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
  83. $this->event->add($this->innerMainSocket_tcp, Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTCP'));
  84. // 初始化到worker的通信地址
  85. $this->initWorkerAddresses();
  86. // 主体循环,整个子进程会阻塞在这个函数上
  87. $ret = $this->event->loop();
  88. $this->notice('worker loop exit');
  89. exit(0);
  90. }
  91. /**
  92. * 存储全局的通信地址
  93. * @param string $address
  94. */
  95. protected function registerAddress($address, $protocol)
  96. {
  97. \Man\Core\Lib\Mutex::get();
  98. $key = 'GLOBAL_GATEWAY_ADDRESS-' . $protocol;
  99. $addresses_list = Store::get($key);
  100. if(empty($addresses_list))
  101. {
  102. $addresses_list = array();
  103. }
  104. $addresses_list[$address] = $address;
  105. Store::set($key, $addresses_list);
  106. \Man\Core\Lib\Mutex::release();
  107. }
  108. /**
  109. * 接收Udp数据
  110. * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
  111. * @param resource $socket
  112. * @param $null_one $flag
  113. * @param $null_two $base
  114. * @return void
  115. */
  116. public function recvUdp($socket, $null_one = null, $null_two = null)
  117. {
  118. $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
  119. // 惊群效应
  120. if(false === $data || empty($address))
  121. {
  122. return false;
  123. }
  124. $this->currentClientAddress = $address;
  125. $this->innerDealProcess($data);
  126. }
  127. public function acceptTcp($socket, $null_one = null, $null_two = null)
  128. {
  129. // 获得一个连接
  130. $new_connection = @stream_socket_accept($socket, 0);
  131. // 可能是惊群效应
  132. if(false === $new_connection)
  133. {
  134. return false;
  135. }
  136. // 连接的fd序号
  137. $fd = (int) $new_connection;
  138. $this->connections[$fd] = $new_connection;
  139. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>GatewayProtocol::HEAD_LEN);
  140. // 非阻塞
  141. stream_set_blocking($this->connections[$fd], 0);
  142. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'recvTcp'), $fd);
  143. return $new_connection;
  144. }
  145. public function dealInnerInput($buffer)
  146. {
  147. return GatewayProtocol::input($buffer);
  148. }
  149. /**
  150. * 处理受到的数据
  151. * @param event_buffer $event_buffer
  152. * @param int $fd
  153. * @return void
  154. */
  155. public function recvTcp($connection, $flag, $fd = null)
  156. {
  157. $this->currentDealFd = $fd;
  158. $buffer = stream_socket_recvfrom($connection, $this->recvBuffers[$fd]['remain_len']);
  159. // 出错了
  160. if('' == $buffer && '' == ($buffer = fread($connection, $this->recvBuffers[$fd]['remain_len'])))
  161. {
  162. if(!feof($connection))
  163. {
  164. return;
  165. }
  166. // 如果该链接对应的buffer有数据,说明发生错误
  167. if(!empty($this->recvBuffers[$fd]['buf']))
  168. {
  169. $this->statusInfo['send_fail']++;
  170. $this->notice("INNER_CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  171. }
  172. // 关闭链接
  173. $this->closeClient($fd);
  174. if($this->workerStatus == self::STATUS_SHUTDOWN)
  175. {
  176. $this->stop();
  177. }
  178. return;
  179. }
  180. $this->recvBuffers[$fd]['buf'] .= $buffer;
  181. $remain_len = $this->dealInnerInput($this->recvBuffers[$fd]['buf']);
  182. // 包接收完毕
  183. if(0 === $remain_len)
  184. {
  185. // 执行处理
  186. try{
  187. // 业务处理
  188. $this->innerDealProcess($this->recvBuffers[$fd]['buf']);
  189. }
  190. catch(\Exception $e)
  191. {
  192. $this->notice('CODE:' . $e->getCode() . ' MESSAGE:' . $e->getMessage()."\n".$e->getTraceAsString()."\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  193. $this->statusInfo['throw_exception'] ++;
  194. }
  195. // 关闭链接
  196. if(empty($this->sendBuffers[$fd]))
  197. {
  198. $this->closeClient($fd);
  199. }
  200. }
  201. // 出错
  202. else if(false === $remain_len)
  203. {
  204. // 出错
  205. $this->statusInfo['packet_err']++;
  206. $this->notice("INNER_PACKET_ERROR\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  207. $this->closeClient($fd);
  208. }
  209. else
  210. {
  211. $this->recvBuffers[$fd]['remain_len'] = $remain_len;
  212. }
  213. // 检查是否是关闭状态或者是否到达请求上限
  214. if($this->workerStatus == self::STATUS_SHUTDOWN )
  215. {
  216. // 停止服务
  217. $this->stop();
  218. // EXIT_WAIT_TIME秒后退出进程
  219. pcntl_alarm(self::EXIT_WAIT_TIME);
  220. }
  221. }
  222. protected function initWorkerAddresses()
  223. {
  224. $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.game_worker');
  225. if(!$this->workerAddresses)
  226. {
  227. $this->notice($this->workerName.'game_worker not set');
  228. }
  229. }
  230. public function dealInput($recv_str)
  231. {
  232. return 0;
  233. }
  234. public function innerDealProcess($recv_str)
  235. {
  236. $pack = new GatewayProtocol($recv_str);
  237. switch($pack->header['cmd'])
  238. {
  239. case GatewayProtocol::CMD_SEND_TO_ONE:
  240. return $this->sendToSocketId($pack->header['socket_id'], $pack->body);
  241. case GatewayProtocol::CMD_KICK:
  242. if($pack->body)
  243. {
  244. $this->sendToSocketId($pack->header['socket_id'], $pack->body);
  245. }
  246. return $this->closeClientBySocketId($pack->header['socket_id']);
  247. case GatewayProtocol::CMD_SEND_TO_ALL:
  248. return $this->broadCast($pack->body);
  249. case GatewayProtocol::CMD_CONNECT_SUCCESS:
  250. return $this->connectSuccess($pack->header['socket_id'], $pack->header['uid']);
  251. default :
  252. $this->notice('gateway inner pack cmd err data:' .$recv_str );
  253. }
  254. }
  255. protected function broadCast($bin_data)
  256. {
  257. foreach($this->uidConnMap as $uid=>$conn)
  258. {
  259. $this->sendToSocketId($conn, $bin_data);
  260. }
  261. }
  262. protected function closeClientBySocketId($socket_id)
  263. {
  264. if($uid = $this->getUidByFd($socket_id))
  265. {
  266. unset($this->uidConnMap[$uid], $this->connUidMap[$socket_id]);
  267. }
  268. parent::closeClient($socket_id);
  269. }
  270. protected function getFdByUid($uid)
  271. {
  272. if(isset($this->uidConnMap[$uid]))
  273. {
  274. return $this->uidConnMap[$uid];
  275. }
  276. return 0;
  277. }
  278. protected function getUidByFd($fd)
  279. {
  280. if(isset($this->connUidMap[$fd]))
  281. {
  282. return $this->connUidMap[$fd];
  283. }
  284. return 0;
  285. }
  286. protected function connectSuccess($socket_id, $uid)
  287. {
  288. $binded_uid = $this->getUidByFd($socket_id);
  289. if($binded_uid)
  290. {
  291. $this->notice('notify connection success fail ' . $socket_id . ' already binded data:'.serialize($data));
  292. return;
  293. }
  294. $this->uidConnMap[$uid] = $socket_id;
  295. $this->connUidMap[$socket_id] = $uid;
  296. }
  297. public function sendToSocketId($socket_id, $bin_data)
  298. {
  299. if(!isset($this->connections[$socket_id]))
  300. {
  301. return false;
  302. }
  303. $this->currentDealFd = $socket_id;
  304. return $this->sendToClient($bin_data);
  305. }
  306. protected function closeClient($fd)
  307. {
  308. if($uid = $this->getUidByFd($fd))
  309. {
  310. $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $fd);
  311. unset($this->uidConnMap[$uid], $this->connUidMap[$fd]);
  312. }
  313. parent::closeClient($fd);
  314. }
  315. public function dealProcess($recv_str)
  316. {
  317. // 判断用户是否认证过
  318. $from_uid = $this->getUidByFd($this->currentDealFd);
  319. // 触发ON_CONNECTION
  320. if(!$from_uid)
  321. {
  322. return $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $this->currentDealFd, $recv_str);
  323. }
  324. // 认证过, 触发ON_MESSAGE
  325. $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_str);
  326. }
  327. protected function sendToWorker($cmd, $socket_id, $body = '')
  328. {
  329. $address= $this->getRemoteAddress($socket_id);
  330. list($client_ip, $client_port) = explode(':', $address, 2);
  331. $pack = new GatewayProtocol();
  332. $pack->header['cmd'] = $cmd;
  333. $pack->header['local_ip'] = $this->lanIp;
  334. $pack->header['local_port'] = $this->lanPort;
  335. $pack->header['socket_id'] = $socket_id;
  336. $pack->header['client_ip'] = $client_ip;
  337. $pack->header['client_port'] = $client_ip;
  338. $pack->header['uid'] = $this->getUidByFd($socket_id);
  339. $pack->body = $body;
  340. return $this->sendBufferToWorker($pack->getBuffer());
  341. }
  342. protected function sendBufferToWorker($bin_data)
  343. {
  344. $client = stream_socket_client($this->workerAddresses[array_rand($this->workerAddresses)]);
  345. $len = fwrite($client, $bin_data);
  346. return $len == strlen($bin_data);
  347. }
  348. protected function notice($str, $display=true)
  349. {
  350. $str = 'Worker['.get_class($this).']:'."$str ip:".$this->getRemoteIp();
  351. Man\Core\Lib\Log::add($str);
  352. if($display && Man\Core\Lib\Config::get('workerman.debug') == 1)
  353. {
  354. echo $str."\n";
  355. }
  356. }
  357. public function onStop()
  358. {
  359. Store::deleteAll();
  360. }
  361. }