Gateway.php 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. <?php
  2. namespace GatewayWorker;
  3. use \Workerman\Worker;
  4. use \Workerman\Lib\Timer;
  5. use \Workerman\Protocols\GatewayProtocol;
  6. use \GatewayWorker\Lib\Lock;
  7. use \GatewayWorker\Lib\Store;
  8. class Gateway extends Worker
  9. {
  10. public $lanIp = '127.0.0.1';
  11. public $startPort = 2000;
  12. public $reloadable = false;
  13. public $pingInterval = 0;
  14. public $pingNotResponseLimit = 0;
  15. public $pingData = '';
  16. protected $_clientConnections = array();
  17. protected $_workerConnections = array();
  18. protected $_innerTcpWorker = null;
  19. protected $_innerUdpWorker = null;
  20. public function __construct($socket_name, $context_option = array())
  21. {
  22. $this->onWorkerStart = array($this, 'onWorkerStart');
  23. $this->onConnect = array($this, 'onClientConnect');
  24. $this->onMessage = array($this, 'onClientMessage');
  25. $this->onClose = array($this, 'onClientClose');
  26. $this->onWorkerStop = array($this, 'onWorkerStop');
  27. parent::__construct($socket_name, $context_option);
  28. }
  29. public function onClientMessage($connection, $data)
  30. {
  31. $connection->pingNotResponseCount = 0;
  32. $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data);
  33. }
  34. public function onClientConnect($connection)
  35. {
  36. $connection->globalClientId = $this->createGlobalClientId();
  37. $connection->gatewayHeader = array(
  38. 'local_ip' => $this->lanIp,
  39. 'local_port' => $this->lanPort,
  40. 'client_ip'=>$connection->getRemoteIp(),
  41. 'client_port'=>$connection->getRemotePort(),
  42. 'client_id'=>$connection->globalClientId,
  43. );
  44. $connection->session = '';
  45. $connection->pingNotResponseCount = 0;
  46. $this->_clientConnections[$connection->globalClientId] = $connection;
  47. $address = $this->lanIp.':'.$this->lanPort;
  48. $this->storeClientAddress($connection->globalClientId, $address);
  49. if(method_exists('Event','onConnect'))
  50. {
  51. $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $connection);
  52. }
  53. }
  54. protected function sendToWorker($cmd, $connection, $body = '')
  55. {
  56. $gateway_data = $connection->gatewayHeader;
  57. $gateway_data['cmd'] = $cmd;
  58. $gateway_data['body'] = $body;
  59. $gateway_data['ext_data'] = $connection->session;
  60. $key = array_rand($this->_workerConnections);
  61. if($key)
  62. {
  63. if(false === $this->_workerConnections[$key]->send($gateway_data))
  64. {
  65. $msg = "sendBufferToWorker fail. May be the send buffer are overflow";
  66. $this->log($msg);
  67. return false;
  68. }
  69. }
  70. else
  71. {
  72. $msg = "endBufferToWorker fail. the connections between Gateway and BusinessWorker are not ready";
  73. $this->log($msg);
  74. return false;
  75. }
  76. return true;
  77. }
  78. /**
  79. * @param int $global_client_id
  80. * @param string $address
  81. */
  82. protected function storeClientAddress($global_client_id, $address)
  83. {
  84. if(!Store::instance('gateway')->set('gateway-'.$global_client_id, $address))
  85. {
  86. $msg = 'storeClientAddress fail.';
  87. if(get_class(Store::instance('gateway')) == 'Memcached')
  88. {
  89. $msg .= " reason :".Store::instance('gateway')->getResultMessage();
  90. }
  91. $this->log($msg);
  92. return false;
  93. }
  94. return true;
  95. }
  96. protected function delClientAddress($global_client_id)
  97. {
  98. Store::instance('gateway')->delete('gateway-'.$global_client_id);
  99. }
  100. public function onClientClose($connection)
  101. {
  102. $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);
  103. $this->delClientAddress($connection->globalClientId);
  104. unset($this->_clientConnections[$connection->globalClientId]);
  105. }
  106. protected function createGlobalClientId()
  107. {
  108. $global_socket_key = 'GLOBAL_SOCKET_ID_KEY';
  109. $store = Store::instance('gateway');
  110. $global_client_id = $store->increment($global_socket_key);
  111. if(!$global_client_id || $global_client_id > 2147483646)
  112. {
  113. $store->set($global_socket_key, 0);
  114. $global_client_id = $store->increment($global_socket_key);
  115. }
  116. if(!$global_client_id)
  117. {
  118. $msg .= "createGlobalClientId fail :";
  119. if(get_class($store) == 'Memcached')
  120. {
  121. $msg .= $store->getResultMessage();
  122. }
  123. $this->log($msg);
  124. }
  125. return $global_client_id;
  126. }
  127. public function onWorkerStart()
  128. {
  129. $this->lanPort = $this->startPort - posix_getppid() + posix_getpid();
  130. if($this->lanPort<0 || $this->lanPort >=65535)
  131. {
  132. $this->lanPort = rand($this->startPort, 65535);
  133. }
  134. if($this->pingInterval > 0)
  135. {
  136. Timer::add($this->pingInterval, array($this, 'ping'));
  137. }
  138. $this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
  139. $this->_innerTcpWorker->listen();
  140. $this->_innerUdpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
  141. $this->_innerUdpWorker->transport = 'udp';
  142. $this->_innerUdpWorker->listen();
  143. $this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');
  144. $this->_innerUdpWorker->onMessage = array($this, 'onWorkerMessage');
  145. $this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
  146. $this->_innerTcpWorker->onClose = array($this, 'onWorkerClose');
  147. if(!$this->registerAddress())
  148. {
  149. $this->log('registerAddress fail and exit');
  150. Worker::stopAll();
  151. }
  152. }
  153. public function onWorkerConnect($connection)
  154. {
  155. $connection->remoteAddress = $connection->getRemoteIp().':'.$connection->getRemotePort();
  156. $this->_workerConnections[$connection->remoteAddress] = $connection;
  157. }
  158. public function onWorkerMessage($connection, $data)
  159. {
  160. $cmd = $data['cmd'];
  161. switch($cmd)
  162. {
  163. // 向某客户端发送数据
  164. case GatewayProtocol::CMD_SEND_TO_ONE:
  165. if(isset($this->_clientConnections[$data['client_id']]))
  166. {
  167. $this->_clientConnections[$data['client_id']]->send($data['body']);
  168. }
  169. break;
  170. case GatewayProtocol::CMD_KICK:
  171. if(isset($this->_clientConnections[$data['client_id']]))
  172. {
  173. $this->_clientConnections[$data['client_id']]->close();
  174. }
  175. break;
  176. case GatewayProtocol::CMD_SEND_TO_ALL:
  177. if($data['ext_data'])
  178. {
  179. $client_id_array = unpack('N*', $data['ext_data']);
  180. foreach($client_id_array as $client_id)
  181. {
  182. if(isset($this->_clientConnections[$client_id]))
  183. {
  184. $this->_clientConnections[$client_id]->send($data['body']);
  185. }
  186. }
  187. }
  188. else
  189. {
  190. foreach($this->_clientConnections as $client_connection)
  191. {
  192. $client_connection->send($data['body']);
  193. }
  194. }
  195. break;
  196. case GatewayProtocol::CMD_UPDATE_SESSION:
  197. if(isset($this->_clientConnections[$data['client_id']]))
  198. {
  199. $this->_clientConnections[$data['client_id']]->session = $data['ext_data'];
  200. }
  201. break;
  202. case GatewayProtocol::CMD_GET_ONLINE_STATUS:
  203. $online_status = json_encode(array_keys($this->_clientConnections));
  204. $connection->send($online_status);
  205. break;
  206. case GatewayProtocol::CMD_IS_ONLINE:
  207. $connection->send((int)isset($this->_clientConnections[$data['client_id']]));
  208. break;
  209. default :
  210. $err_msg = "gateway inner pack err cmd=$cmd";
  211. throw new \Exception($err_msg);
  212. }
  213. }
  214. public function onWorkerClose($connection)
  215. {
  216. $this->log("{$connection->remoteAddress} CLOSE INNER_CONNECTION\n");
  217. unset($this->_workerConnections[$connection->remoteAddress]);
  218. }
  219. /**
  220. * 存储全局的通信地址
  221. * @param string $address
  222. */
  223. protected function registerAddress()
  224. {
  225. $address = $this->lanIp.':'.$this->lanPort;
  226. // key
  227. $key = 'GLOBAL_GATEWAY_ADDRESS';
  228. try
  229. {
  230. $store = Store::instance('gateway');
  231. }
  232. catch(\Exception $msg)
  233. {
  234. $this->log($msg);
  235. return false;
  236. }
  237. Lock::get();
  238. $addresses_list = $store->get($key);
  239. if(empty($addresses_list))
  240. {
  241. $addresses_list = array();
  242. }
  243. $addresses_list[$address] = $address;
  244. if(!$store->set($key, $addresses_list))
  245. {
  246. Lock::release();
  247. if(get_class($store) == 'Memcached')
  248. {
  249. $msg = " registerAddress fail : " . $store->getResultMessage();
  250. }
  251. $this->log($msg);
  252. return false;
  253. }
  254. Lock::release();
  255. return true;
  256. }
  257. /**
  258. * 删除全局的通信地址
  259. * @param string $address
  260. */
  261. protected function unregisterAddress()
  262. {
  263. $address = $this->lanIp.':'.$this->lanPort;
  264. $key = 'GLOBAL_GATEWAY_ADDRESS';
  265. try
  266. {
  267. $store = Store::instance('gateway');
  268. }
  269. catch (\Exception $msg)
  270. {
  271. $this->log($msg);
  272. return false;
  273. }
  274. Lock::get();
  275. $addresses_list = $store->get($key);
  276. if(empty($addresses_list))
  277. {
  278. $addresses_list = array();
  279. }
  280. unset($addresses_list[$address]);
  281. if(!$store->set($key, $addresses_list))
  282. {
  283. Lock::release();
  284. $msg = "unregisterAddress fail";
  285. if(get_class($store) == 'Memcached')
  286. {
  287. $msg .= " reason:".$store->getResultMessage();
  288. }
  289. $this->log($msg);
  290. return;
  291. }
  292. Lock::release();
  293. return true;
  294. }
  295. public function ping()
  296. {
  297. // 关闭未回复心跳的连接
  298. foreach($this->_clientConnections as $connection)
  299. {
  300. // 上次发送的心跳还没有回复次数大于限定值就断开
  301. if($this->pingNotResponseLimit > 0 && $connection->pingNotResponseCount >= $this->pingNotResponseLimit)
  302. {
  303. $connection->close();
  304. continue;
  305. }
  306. $connection->pingNotResponseCount++;
  307. $connection->send($this->pingData);
  308. }
  309. }
  310. public function onWorkerStop()
  311. {
  312. $this->unregisterAddress();
  313. foreach($this->_clientConnections as $connection)
  314. {
  315. $this->delClientAddress($connection->globalClientId);
  316. }
  317. }
  318. }