Gateway.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. <?php
  2. /**
  3. *
  4. * 暴露给客户端的连接网关 只负责网络io
  5. * 1、监听客户端连接
  6. * 2、监听后端回应并转发回应给前端
  7. *
  8. * @author walkor <workerman.net>
  9. *
  10. */
  11. define('ROOT_DIR', realpath(__DIR__.'/../'));
  12. require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
  13. require_once ROOT_DIR . '/Lib/Store.php';
  14. class Gateway extends Man\Core\SocketWorker
  15. {
  16. /**
  17. * 内部通信socket udp
  18. * @var resouce
  19. */
  20. protected $innerMainSocketUdp = null;
  21. /**
  22. * 内部通信socket tcp
  23. * @var resouce
  24. */
  25. protected $innerMainSocketTcp = null;
  26. /**
  27. * 内网ip
  28. * @var string
  29. */
  30. protected $lanIp = '127.0.0.1';
  31. /**
  32. * 内部通信端口
  33. * @var int
  34. */
  35. protected $lanPort = 0;
  36. /**
  37. * uid到连接的映射
  38. * @var array
  39. */
  40. protected $uidConnMap = array();
  41. /**
  42. * 连接到uid的映射
  43. * @var array
  44. */
  45. protected $connUidMap = array();
  46. /**
  47. * 到Worker的通信地址
  48. * @var array
  49. */
  50. protected $workerAddresses = array();
  51. /**
  52. * gateway 发送心跳时间间隔 单位:秒 ,0表示不发送心跳,在配置中设置
  53. * @var integer
  54. */
  55. protected $pingInterval = 0;
  56. /**
  57. * 心跳数据
  58. * 可以是字符串(在配置中直接设置字符串如 ping_data=ping),
  59. * 可以是二进制数据(二进制数据保存在文件中,在配置中设置ping数据文件路径 如 ping_data=/yourpath/ping.bin)
  60. * ping数据应该是客户端能够识别的数据格式,只是检测连接的连通性,客户端收到心跳数据可以选择忽略此数据包
  61. * @var string
  62. */
  63. protected $pingData = '';
  64. /**
  65. * 进程启动
  66. */
  67. public function start()
  68. {
  69. // 安装信号处理函数
  70. $this->installSignal();
  71. // 添加accept事件
  72. $ret = $this->event->add($this->mainSocket, Man\Core\Events\BaseEvent::EV_READ, array($this, 'accept'));
  73. // 创建内部通信套接字
  74. $start_port = Man\Core\Lib\Config::get($this->workerName.'.lan_port_start');
  75. $this->lanPort = $start_port - posix_getppid() + posix_getpid();
  76. $this->lanIp = Man\Core\Lib\Config::get($this->workerName.'.lan_ip');
  77. if(!$this->lanIp)
  78. {
  79. $this->notice($this->workerName.'.lan_ip not set');
  80. $this->lanIp = '127.0.0.1';
  81. }
  82. $error_no_udp = $error_no_tcp = 0;
  83. $error_msg_udp = $error_msg_tcp = '';
  84. $this->innerMainSocketUdp = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no_udp, $error_msg_udp, STREAM_SERVER_BIND);
  85. $this->innerMainSocketTcp = stream_socket_server("tcp://".$this->lanIp.':'.$this->lanPort, $error_no_tcp, $error_msg_tcp, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
  86. if(!$this->innerMainSocketUdp || !$this->innerMainSocketTcp)
  87. {
  88. $this->notice('create innerMainSocket udp or tcp fail and exit '.$error_msg_udp.$error_msg_tcp);
  89. sleep(1);
  90. exit(0);
  91. }
  92. else
  93. {
  94. stream_set_blocking($this->innerMainSocketUdp , 0);
  95. stream_set_blocking($this->innerMainSocketTcp , 0);
  96. }
  97. // 注册套接字
  98. $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
  99. $this->registerAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
  100. // 添加读udp事件
  101. $this->event->add($this->innerMainSocketUdp, Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
  102. $this->event->add($this->innerMainSocketTcp, Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTCP'));
  103. // 初始化到worker的通信地址
  104. $this->initWorkerAddresses();
  105. // 初始化心跳包时间间隔
  106. $ping_interval = \Man\Core\Lib\Config::get($this->workerName.'.ping_interval');
  107. if((int)$ping_interval > 0)
  108. {
  109. $this->pingInterval = (int)$ping_interval;
  110. }
  111. // 获取心跳包数据
  112. $ping_data_or_path = \Man\Core\Lib\Config::get($this->workerName.'.ping_data');
  113. if(is_file($ping_data_or_path))
  114. {
  115. $this->pingData = file_get_contents($ping_data_or_path);
  116. }
  117. else
  118. {
  119. $this->pingData = $ping_data_or_path;
  120. }
  121. // 设置定时任务,发送心跳
  122. if($this->pingInterval > 0 && $this->pingData)
  123. {
  124. \Man\Core\Lib\Task::init($this->event);
  125. \Man\Core\Lib\Task::add($this->pingInterval, array($this, 'ping'));
  126. }
  127. // 主体循环,整个子进程会阻塞在这个函数上
  128. $ret = $this->event->loop();
  129. $this->notice('worker loop exit');
  130. exit(0);
  131. }
  132. /**
  133. * 存储全局的通信地址
  134. * @param string $address
  135. */
  136. protected function registerAddress($address, $protocol)
  137. {
  138. \Man\Core\Lib\Mutex::get();
  139. $key = 'GLOBAL_GATEWAY_ADDRESS-' . $protocol;
  140. $addresses_list = Store::get($key);
  141. if(empty($addresses_list))
  142. {
  143. $addresses_list = array();
  144. }
  145. $addresses_list[$address] = $address;
  146. Store::set($key, $addresses_list);
  147. \Man\Core\Lib\Mutex::release();
  148. }
  149. /**
  150. * 删除全局的通信地址
  151. * @param string $address
  152. */
  153. protected function unregisterAddress($address, $protocol)
  154. {
  155. \Man\Core\Lib\Mutex::get();
  156. $key = 'GLOBAL_GATEWAY_ADDRESS-' . $protocol;
  157. $addresses_list = Store::get($key);
  158. if(empty($addresses_list))
  159. {
  160. $addresses_list = array();
  161. }
  162. unset($addresses_list[$address]);
  163. Store::set($key, $addresses_list);
  164. \Man\Core\Lib\Mutex::release();
  165. }
  166. /**
  167. * 接收Udp数据
  168. * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
  169. * @param resource $socket
  170. * @param $null_one $flag
  171. * @param $null_two $base
  172. * @return void
  173. */
  174. public function recvUdp($socket, $null_one = null, $null_two = null)
  175. {
  176. $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
  177. // 惊群效应
  178. if(false === $data || empty($address))
  179. {
  180. return false;
  181. }
  182. $this->currentClientAddress = $address;
  183. $this->innerDealProcess($data);
  184. }
  185. public function acceptTcp($socket, $null_one = null, $null_two = null)
  186. {
  187. // 获得一个连接
  188. $new_connection = @stream_socket_accept($socket, 0);
  189. // 可能是惊群效应
  190. if(false === $new_connection)
  191. {
  192. return false;
  193. }
  194. // 连接的fd序号
  195. $fd = (int) $new_connection;
  196. $this->connections[$fd] = $new_connection;
  197. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>GatewayProtocol::HEAD_LEN);
  198. // 非阻塞
  199. stream_set_blocking($this->connections[$fd], 0);
  200. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'recvTcp'), $fd);
  201. return $new_connection;
  202. }
  203. public function dealInnerInput($buffer)
  204. {
  205. return GatewayProtocol::input($buffer);
  206. }
  207. /**
  208. * 处理受到的数据
  209. * @param event_buffer $event_buffer
  210. * @param int $fd
  211. * @return void
  212. */
  213. public function recvTcp($connection, $flag, $fd = null)
  214. {
  215. $this->currentDealFd = $fd;
  216. $buffer = stream_socket_recvfrom($connection, $this->recvBuffers[$fd]['remain_len']);
  217. // 出错了
  218. if('' == $buffer && '' == ($buffer = fread($connection, $this->recvBuffers[$fd]['remain_len'])))
  219. {
  220. if(!feof($connection))
  221. {
  222. return;
  223. }
  224. // 如果该链接对应的buffer有数据,说明发生错误
  225. if(!empty($this->recvBuffers[$fd]['buf']))
  226. {
  227. $this->statusInfo['send_fail']++;
  228. $this->notice("INNER_CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  229. }
  230. // 关闭链接
  231. $this->closeClient($fd);
  232. if($this->workerStatus == self::STATUS_SHUTDOWN)
  233. {
  234. $this->stop();
  235. }
  236. return;
  237. }
  238. $this->recvBuffers[$fd]['buf'] .= $buffer;
  239. $remain_len = $this->dealInnerInput($this->recvBuffers[$fd]['buf']);
  240. // 包接收完毕
  241. if(0 === $remain_len)
  242. {
  243. // 执行处理
  244. try{
  245. // 业务处理
  246. $this->innerDealProcess($this->recvBuffers[$fd]['buf']);
  247. }
  248. catch(\Exception $e)
  249. {
  250. $this->notice('CODE:' . $e->getCode() . ' MESSAGE:' . $e->getMessage()."\n".$e->getTraceAsString()."\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  251. $this->statusInfo['throw_exception'] ++;
  252. }
  253. // 关闭链接
  254. if(empty($this->sendBuffers[$fd]))
  255. {
  256. $this->closeClient($fd);
  257. }
  258. }
  259. // 出错
  260. else if(false === $remain_len)
  261. {
  262. // 出错
  263. $this->statusInfo['packet_err']++;
  264. $this->notice("INNER_PACKET_ERROR\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  265. $this->closeClient($fd);
  266. }
  267. else
  268. {
  269. $this->recvBuffers[$fd]['remain_len'] = $remain_len;
  270. }
  271. // 检查是否是关闭状态或者是否到达请求上限
  272. if($this->workerStatus == self::STATUS_SHUTDOWN )
  273. {
  274. // 停止服务
  275. $this->stop();
  276. // EXIT_WAIT_TIME秒后退出进程
  277. pcntl_alarm(self::EXIT_WAIT_TIME);
  278. }
  279. }
  280. protected function initWorkerAddresses()
  281. {
  282. $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.business_worker');
  283. if(!$this->workerAddresses)
  284. {
  285. $this->notice($this->workerName.'business_worker not set');
  286. }
  287. }
  288. public function dealInput($recv_str)
  289. {
  290. return 0;
  291. }
  292. public function innerDealProcess($recv_str)
  293. {
  294. $pack = new GatewayProtocol($recv_str);
  295. switch($pack->header['cmd'])
  296. {
  297. case GatewayProtocol::CMD_SEND_TO_ONE:
  298. return $this->sendToSocketId($pack->header['socket_id'], $pack->body);
  299. case GatewayProtocol::CMD_KICK:
  300. if($pack->body)
  301. {
  302. $this->sendToSocketId($pack->header['socket_id'], $pack->body);
  303. }
  304. return $this->closeClientBySocketId($pack->header['socket_id']);
  305. case GatewayProtocol::CMD_SEND_TO_ALL:
  306. return $this->broadCast($pack->body);
  307. case GatewayProtocol::CMD_CONNECT_SUCCESS:
  308. return $this->connectSuccess($pack->header['socket_id'], $pack->header['uid']);
  309. default :
  310. $this->notice('gateway inner pack cmd err data:' .$recv_str );
  311. }
  312. }
  313. protected function broadCast($bin_data)
  314. {
  315. foreach($this->uidConnMap as $uid=>$conn)
  316. {
  317. $this->sendToSocketId($conn, $bin_data);
  318. }
  319. }
  320. protected function closeClientBySocketId($socket_id)
  321. {
  322. if($uid = $this->getUidByFd($socket_id))
  323. {
  324. unset($this->uidConnMap[$uid], $this->connUidMap[$socket_id]);
  325. }
  326. parent::closeClient($socket_id);
  327. }
  328. protected function getFdByUid($uid)
  329. {
  330. if(isset($this->uidConnMap[$uid]))
  331. {
  332. return $this->uidConnMap[$uid];
  333. }
  334. return 0;
  335. }
  336. protected function getUidByFd($fd)
  337. {
  338. if(isset($this->connUidMap[$fd]))
  339. {
  340. return $this->connUidMap[$fd];
  341. }
  342. return 0;
  343. }
  344. protected function connectSuccess($socket_id, $uid)
  345. {
  346. $binded_uid = $this->getUidByFd($socket_id);
  347. if($binded_uid)
  348. {
  349. $this->notice('notify connection success fail ' . $socket_id . ' already binded data:'.serialize($data));
  350. return;
  351. }
  352. $this->uidConnMap[$uid] = $socket_id;
  353. $this->connUidMap[$socket_id] = $uid;
  354. }
  355. public function sendToSocketId($socket_id, $bin_data)
  356. {
  357. if(!isset($this->connections[$socket_id]))
  358. {
  359. return false;
  360. }
  361. $this->currentDealFd = $socket_id;
  362. return $this->sendToClient($bin_data);
  363. }
  364. protected function closeClient($fd)
  365. {
  366. if($uid = $this->getUidByFd($fd))
  367. {
  368. $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $fd);
  369. unset($this->uidConnMap[$uid], $this->connUidMap[$fd]);
  370. }
  371. parent::closeClient($fd);
  372. }
  373. public function dealProcess($recv_str)
  374. {
  375. // 判断用户是否认证过
  376. $from_uid = $this->getUidByFd($this->currentDealFd);
  377. // 触发ON_CONNECTION
  378. if(!$from_uid)
  379. {
  380. return $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $this->currentDealFd, $recv_str);
  381. }
  382. // 认证过, 触发ON_MESSAGE
  383. $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_str);
  384. }
  385. protected function sendToWorker($cmd, $socket_id, $body = '')
  386. {
  387. $address= $this->getRemoteAddress($socket_id);
  388. list($client_ip, $client_port) = explode(':', $address, 2);
  389. $pack = new GatewayProtocol();
  390. $pack->header['cmd'] = $cmd;
  391. $pack->header['local_ip'] = $this->lanIp;
  392. $pack->header['local_port'] = $this->lanPort;
  393. $pack->header['socket_id'] = $socket_id;
  394. $pack->header['client_ip'] = $client_ip;
  395. $pack->header['client_port'] = $client_ip;
  396. $pack->header['uid'] = $this->getUidByFd($socket_id);
  397. $pack->body = $body;
  398. return $this->sendBufferToWorker($pack->getBuffer());
  399. }
  400. protected function sendBufferToWorker($bin_data)
  401. {
  402. $client = stream_socket_client($this->workerAddresses[array_rand($this->workerAddresses)]);
  403. $len = fwrite($client, $bin_data);
  404. return $len == strlen($bin_data);
  405. }
  406. protected function notice($str, $display=true)
  407. {
  408. $str = 'Worker['.get_class($this).']:'."$str ip:".$this->getRemoteIp();
  409. Man\Core\Lib\Log::add($str);
  410. if($display && Man\Core\Lib\Config::get('workerman.debug') == 1)
  411. {
  412. echo $str."\n";
  413. }
  414. }
  415. public function onStop()
  416. {
  417. $this->unregisterAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
  418. $this->unregisterAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
  419. foreach($this->connUidMap as $uid)
  420. {
  421. Store::delete($uid);
  422. }
  423. }
  424. public function ping()
  425. {
  426. $this->broadCast($this->pingData);
  427. }
  428. }