Gateway.php 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. <?php
  2. /**
  3. *
  4. * 暴露给客户端的连接网关 只负责网络io
  5. * 1、监听客户端连接
  6. * 2、监听后端回应并转发回应给前端
  7. *
  8. * @author walkor <workerman.net>
  9. *
  10. */
  11. require_once __DIR__ . '/../Lib/Autoloader.php';
  12. use \Protocols\GatewayProtocol;
  13. use \Lib\Store;
  14. use \Lib\StatisticClient;
  15. class Gateway extends Man\Core\SocketWorker
  16. {
  17. /**
  18. * 内部通信socket udp
  19. * @var resouce
  20. */
  21. protected $innerMainSocketUdp = null;
  22. /**
  23. * 内部通信socket tcp
  24. * @var resouce
  25. */
  26. protected $innerMainSocketTcp = null;
  27. /**
  28. * 内网ip
  29. * @var string
  30. */
  31. protected $lanIp = '127.0.0.1';
  32. /**
  33. * 内部通信端口
  34. * @var int
  35. */
  36. protected $lanPort = 0;
  37. /**
  38. * client_id到连接的映射
  39. * @var array
  40. */
  41. protected $clientConnMap = array();
  42. /**
  43. * 连接到client_id的映射
  44. * @var array
  45. */
  46. protected $connClientMap = array();
  47. /**
  48. * 客户端链接和客户端远程地址映射
  49. * @var array
  50. */
  51. protected $connRemoteAddressMap = array();
  52. /**
  53. * client_id到session的映射
  54. * @var array
  55. */
  56. protected $connSessionMap = array();
  57. /**
  58. * 与worker的连接
  59. * [fd=>fd, $fd=>fd, ..]
  60. * @var array
  61. */
  62. protected $workerConnections = array();
  63. /**
  64. * gateway 发送心跳时间间隔 单位:秒 ,0表示不发送心跳,在配置中设置
  65. * @var integer
  66. */
  67. protected $pingInterval = 0;
  68. /**
  69. * 心跳数据
  70. * 可以是二进制数据(二进制数据保存在文件中,在配置中设置ping数据文件路径 如 ping_data=/yourpath/ping.bin)
  71. * ping数据应该是客户端能够识别的数据格式,只是检测连接的连通性,客户端收到心跳数据可以选择忽略此数据包
  72. * @var string
  73. */
  74. protected $pingData = '';
  75. /**
  76. * 命令字,统计用到
  77. * @var array
  78. */
  79. protected static $interfaceMap = array(
  80. GatewayProtocol::CMD_SEND_TO_ONE => 'CMD_SEND_TO_ONE',
  81. GatewayProtocol::CMD_SEND_TO_ALL => 'CMD_SEND_TO_ALL',
  82. GatewayProtocol::CMD_KICK => 'CMD_KICK',
  83. GatewayProtocol::CMD_UPDATE_SESSION => 'CMD_UPDATE_SESSION',
  84. GatewayProtocol::CMD_GET_ONLINE_STATUS => 'CMD_GET_ONLINE_STATUS',
  85. GatewayProtocol::CMD_IS_ONLINE => 'CMD_IS_ONLINE',
  86. );
  87. /**
  88. * 由于网络延迟或者socket缓冲区大小的限制,客户端发来的数据可能不会都全部到达,需要根据协议判断数据是否完整
  89. * @see Man\Core.SocketWorker::dealInput()
  90. */
  91. public function dealInput($recv_buffer)
  92. {
  93. return call_user_func_array(array('Event', 'onGatewayMessage'), array($recv_buffer));
  94. }
  95. /**
  96. * 用户客户端发来消息时处理
  97. * @see Man\Core.SocketWorker::dealProcess()
  98. */
  99. public function dealProcess($recv_buffer)
  100. {
  101. // 统计打点
  102. StatisticClient::tick();
  103. $module = __CLASS__;
  104. $success = 1;
  105. $code = 0;
  106. $msg = '';
  107. $interface = 'CMD_ON_MESSAGE';
  108. // 触发ON_MESSAGE
  109. $ret =$this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_buffer);
  110. if($ret === false)
  111. {
  112. $success = 0;
  113. $msg = 'sendToWorker(CMD_ON_MESSAGE, '.$this->currentDealFd.', strlen($recv_buffer) = '.strlen($recv_buffer).') fail ';
  114. $code = 102;
  115. }
  116. StatisticClient::report($module, $interface, $success, $code, $msg);
  117. }
  118. /**
  119. * 进程启动
  120. */
  121. public function start()
  122. {
  123. // 安装信号处理函数
  124. $this->installSignal();
  125. // 添加accept事件
  126. $ret = $this->event->add($this->mainSocket, Man\Core\Events\BaseEvent::EV_READ, array($this, 'accept'));
  127. // 创建内部通信套接字,用于与BusinessWorker通讯
  128. $start_port = Man\Core\Lib\Config::get($this->workerName.'.lan_port_start');
  129. // 计算本进程监听的ip端口
  130. $this->lanPort = $start_port - posix_getppid() + posix_getpid();
  131. $this->lanIp = Man\Core\Lib\Config::get($this->workerName.'.lan_ip');
  132. if(!$this->lanIp)
  133. {
  134. $this->notice($this->workerName.'.lan_ip not set');
  135. $this->lanIp = '127.0.0.1';
  136. }
  137. $error_no_udp = $error_no_tcp = 0;
  138. $error_msg_udp = $error_msg_tcp = '';
  139. // 执行监听
  140. $this->innerMainSocketUdp = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no_udp, $error_msg_udp, STREAM_SERVER_BIND);
  141. $this->innerMainSocketTcp = stream_socket_server("tcp://".$this->lanIp.':'.$this->lanPort, $error_no_tcp, $error_msg_tcp, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
  142. // 出错,退出,下次会换个端口
  143. if(!$this->innerMainSocketUdp || !$this->innerMainSocketTcp)
  144. {
  145. $this->notice('create innerMainSocket udp or tcp fail and exit '.$error_msg_udp.$error_msg_tcp);
  146. $this->stop();
  147. }
  148. else
  149. {
  150. stream_set_blocking($this->innerMainSocketUdp , 0);
  151. stream_set_blocking($this->innerMainSocketTcp , 0);
  152. }
  153. // 注册套接字
  154. $this->registerAddress($this->lanIp.':'.$this->lanPort);
  155. // 添加读udp/tcp事件
  156. $this->event->add($this->innerMainSocketUdp, Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvInnerUdp'));
  157. $this->event->add($this->innerMainSocketTcp, Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptInnerTcp'));
  158. // 初始化心跳包时间间隔
  159. $ping_interval = \Man\Core\Lib\Config::get($this->workerName.'.ping_interval');
  160. if((int)$ping_interval > 0)
  161. {
  162. $this->pingInterval = (int)$ping_interval;
  163. }
  164. // 获取心跳包数据
  165. $ping_data_or_path = \Man\Core\Lib\Config::get($this->workerName.'.ping_data');
  166. if(is_file($ping_data_or_path))
  167. {
  168. $this->pingData = file_get_contents($ping_data_or_path);
  169. }
  170. else
  171. {
  172. $this->pingData = $ping_data_or_path;
  173. }
  174. // 设置定时任务,发送心跳
  175. if($this->pingInterval > 0 && $this->pingData)
  176. {
  177. \Man\Core\Lib\Task::init($this->event);
  178. \Man\Core\Lib\Task::add($this->pingInterval, array($this, 'ping'));
  179. }
  180. // 主体循环,整个子进程会阻塞在这个函数上
  181. $ret = $this->event->loop();
  182. // 下面正常不会执行到
  183. $this->notice('worker loop exit');
  184. // 执行到就退出
  185. exit(0);
  186. }
  187. /**
  188. * 接受一个链接
  189. * @param resource $socket
  190. * @param $null_one $flag
  191. * @param $null_two $base
  192. * @return void
  193. */
  194. public function accept($socket, $null_one = null, $null_two = null)
  195. {
  196. // 获得一个连接
  197. $new_connection = @stream_socket_accept($socket, 0);
  198. // 可能是惊群效应
  199. if(false === $new_connection)
  200. {
  201. $this->statusInfo['thunder_herd']++;
  202. return false;
  203. }
  204. // 连接的fd序号
  205. $fd = (int) $new_connection;
  206. $this->connections[$fd] = $new_connection;
  207. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  208. $this->connSessionMap[$fd] = '';
  209. // 非阻塞
  210. stream_set_blocking($this->connections[$fd], 0);
  211. $this->event->add($this->connections[$fd], Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  212. // 全局唯一client_id
  213. $global_client_id = $this->createGlobalClientId();
  214. $this->clientConnMap[$global_client_id] = $fd;
  215. $this->connClientMap[$fd] = $global_client_id;
  216. $address = array('local_ip'=>$this->lanIp, 'local_port'=>$this->lanPort, 'socket_id'=>$fd);
  217. // 保存客户端内部通讯地址
  218. if(!Store::instance('gateway')->set($global_client_id, $address))
  219. {
  220. $this->notice("Store::instance('gateway')->set($global_client_id, ".json_encode($address).") fail");
  221. }
  222. // 客户端保存 ip:port
  223. $address= $this->getRemoteAddress($fd);
  224. if($address)
  225. {
  226. list($client_ip, $client_port) = explode(':', $address, 2);
  227. }
  228. else
  229. {
  230. $client_ip = 0;
  231. $client_port = 0;
  232. }
  233. $this->connRemoteAddressMap[$fd] = array('ip'=>$client_ip, 'port'=>$client_port);
  234. // 触发GatewayOnConnection事件
  235. if(method_exists('Event','onGatewayConnect'))
  236. {
  237. $this->sendToWorker(GatewayProtocol::CMD_ON_GATEWAY_CONNECTION, $fd);
  238. }
  239. return $new_connection;
  240. }
  241. /**
  242. * 存储全局的通信地址
  243. * @param string $address
  244. */
  245. protected function registerAddress($address)
  246. {
  247. // 这里使用了信号量只能实现单机互斥,分布式互斥需要借助于memcache incr 或者其他分布式存储
  248. \Man\Core\Lib\Mutex::get();
  249. $key = 'GLOBAL_GATEWAY_ADDRESS';
  250. $addresses_list = Store::instance('gateway')->get($key);
  251. if(empty($addresses_list))
  252. {
  253. $addresses_list = array();
  254. }
  255. $addresses_list[$address] = $address;
  256. Store::instance('gateway')->set($key, $addresses_list);
  257. \Man\Core\Lib\Mutex::release();
  258. }
  259. /**
  260. * 删除全局的通信地址
  261. * @param string $address
  262. */
  263. protected function unregisterAddress($address)
  264. {
  265. // 这里使用了信号量只能实现单机互斥,分布式互斥需要借助于memcache incr 或者其他分布式存储
  266. \Man\Core\Lib\Mutex::get();
  267. $key = 'GLOBAL_GATEWAY_ADDRESS';
  268. $addresses_list = Store::instance('gateway')->get($key);
  269. if(empty($addresses_list))
  270. {
  271. $addresses_list = array();
  272. }
  273. unset($addresses_list[$address]);
  274. Store::instance('gateway')->set($key, $addresses_list);
  275. \Man\Core\Lib\Mutex::release();
  276. }
  277. /**
  278. * 接收Udp数据
  279. * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
  280. * @param resource $socket
  281. * @param $null_one $flag
  282. * @param $null_two $base
  283. * @return void
  284. */
  285. public function recvInnerUdp($socket, $null_one = null, $null_two = null)
  286. {
  287. $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
  288. // 惊群效应
  289. if(false === $data || empty($address))
  290. {
  291. return false;
  292. }
  293. $this->currentClientAddress = $address;
  294. $this->innerDealProcess($data);
  295. }
  296. /**
  297. * 内部通讯端口接受BusinessWorker连接请求,以便建立起长连接
  298. * @param resouce $socket
  299. * @param null $null_one
  300. * @param null $null_two
  301. */
  302. public function acceptInnerTcp($socket, $null_one = null, $null_two = null)
  303. {
  304. // 获得一个连接
  305. $new_connection = @stream_socket_accept($socket, 0);
  306. if(false === $new_connection)
  307. {
  308. return false;
  309. }
  310. // 连接的fd序号
  311. $fd = (int) $new_connection;
  312. $this->connections[$fd] = $new_connection;
  313. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>GatewayProtocol::HEAD_LEN);
  314. // 非阻塞
  315. stream_set_blocking($this->connections[$fd], 0);
  316. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'recvInnerTcp'), $fd);
  317. // 标记这个连接是内部通讯长连接,区别于客户端连接
  318. $this->workerConnections[$fd] = $fd;
  319. return $new_connection;
  320. }
  321. /**
  322. * 内部通讯判断数据是否全部到达
  323. * @param string $buffer
  324. */
  325. public function dealInnerInput($buffer)
  326. {
  327. return GatewayProtocol::input($buffer);
  328. }
  329. /**
  330. * 处理内部通讯收到的数据
  331. * @param event_buffer $event_buffer
  332. * @param int $fd
  333. * @return void
  334. */
  335. public function recvInnerTcp($connection, $flag, $fd = null)
  336. {
  337. $this->currentDealFd = $fd;
  338. $buffer = stream_socket_recvfrom($connection, $this->recvBuffers[$fd]['remain_len']);
  339. // 出错了
  340. if('' == $buffer && '' == ($buffer = fread($connection, $this->recvBuffers[$fd]['remain_len'])))
  341. {
  342. // 判断是否是链接断开
  343. if(!feof($connection))
  344. {
  345. return;
  346. }
  347. // 如果该链接对应的buffer有数据,说明发生错误
  348. if(!empty($this->recvBuffers[$fd]['buf']))
  349. {
  350. $this->statusInfo['send_fail']++;
  351. $this->notice("INNER_CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".bin2hex($this->recvBuffers[$fd]['buf'])."]\n");
  352. }
  353. // 关闭链接
  354. $this->closeInnerClient($fd);
  355. if($this->workerStatus == self::STATUS_SHUTDOWN)
  356. {
  357. $this->stop();
  358. }
  359. return;
  360. }
  361. $this->recvBuffers[$fd]['buf'] .= $buffer;
  362. $remain_len = $this->dealInnerInput($this->recvBuffers[$fd]['buf']);
  363. // 包接收完毕
  364. if(0 === $remain_len)
  365. {
  366. // 执行处理
  367. try{
  368. // 内部通讯业务处理
  369. $this->innerDealProcess($this->recvBuffers[$fd]['buf']);
  370. }
  371. catch(\Exception $e)
  372. {
  373. $this->notice('CODE:' . $e->getCode() . ' MESSAGE:' . $e->getMessage()."\n".$e->getTraceAsString()."\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  374. $this->statusInfo['throw_exception'] ++;
  375. }
  376. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>GatewayProtocol::HEAD_LEN);
  377. }
  378. // 出错
  379. else if(false === $remain_len)
  380. {
  381. // 出错
  382. $this->statusInfo['packet_err']++;
  383. $this->notice("INNER_PACKET_ERROR\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
  384. $this->closeInnerClient($fd);
  385. }
  386. else
  387. {
  388. $this->recvBuffers[$fd]['remain_len'] = $remain_len;
  389. }
  390. // 检查是否是关闭状态或者是否到达请求上限
  391. if($this->workerStatus == self::STATUS_SHUTDOWN )
  392. {
  393. // 停止服务
  394. $this->stop();
  395. // EXIT_WAIT_TIME秒后退出进程
  396. pcntl_alarm(self::EXIT_WAIT_TIME);
  397. }
  398. }
  399. /**
  400. * 内部通讯处理
  401. * @param string $recv_buffer
  402. */
  403. public function innerDealProcess($recv_buffer)
  404. {
  405. $pack = new GatewayProtocol($recv_buffer);
  406. $cmd = $pack->header['cmd'];
  407. StatisticClient::tick();
  408. $module = __CLASS__;
  409. $interface = isset(self::$interfaceMap[$cmd]) ? self::$interfaceMap[$cmd] : $cmd;
  410. $success = 1;
  411. $code = 0;
  412. $msg = '';
  413. try
  414. {
  415. switch($cmd)
  416. {
  417. // 向某客户端发送数据
  418. case GatewayProtocol::CMD_SEND_TO_ONE:
  419. $this->sendToSocketId($pack->header['socket_id'], $pack->body);
  420. break;
  421. // 踢掉客户端
  422. case GatewayProtocol::CMD_KICK:
  423. $this->closeClient($pack->header['socket_id']);
  424. break;
  425. // 向所客户端发送数据
  426. case GatewayProtocol::CMD_SEND_TO_ALL:
  427. if($pack->ext_data)
  428. {
  429. $client_id_array = unpack('N*', $pack->ext_data);
  430. foreach($client_id_array as $client_id)
  431. {
  432. if(isset($this->clientConnMap[$client_id]))
  433. {
  434. $this->sendToSocketId($this->clientConnMap[$client_id], $pack->body);
  435. }
  436. }
  437. }
  438. else
  439. {
  440. $this->broadCast($pack->body);
  441. }
  442. break;
  443. // 更新某个客户端的session
  444. case GatewayProtocol::CMD_UPDATE_SESSION:
  445. if(isset($this->connSessionMap[$pack->header['socket_id']]))
  446. {
  447. $this->connSessionMap[$pack->header['socket_id']] = $pack->ext_data;
  448. }
  449. break;
  450. // 获得在线状态
  451. case GatewayProtocol::CMD_GET_ONLINE_STATUS:
  452. $online_status = json_encode(array_values($this->connClientMap));
  453. stream_socket_sendto($this->innerMainSocketUdp, $online_status, 0, $this->currentClientAddress);
  454. break;
  455. // 判断某个客户端id是否在线
  456. case GatewayProtocol::CMD_IS_ONLINE:
  457. stream_socket_sendto($this->innerMainSocketUdp, (int)isset($this->clientConnMap[$pack->header['client_id']]), 0, $this->currentClientAddress);
  458. break;
  459. // 未知的命令
  460. default :
  461. $err_msg = "gateway inner pack err cmd=$cmd";
  462. $this->notice($err_msg);
  463. throw new \Exception($err_msg, 501);
  464. }
  465. }
  466. catch(\Exception $e)
  467. {
  468. $success = 0;
  469. $code = $e->getCode() > 0 ? $e->getCode() : 500;
  470. $msg = $e->__toString();
  471. }
  472. StatisticClient::report($module, $interface, $success, $code, $msg);
  473. }
  474. /**
  475. * 广播数据
  476. * @param string $bin_data
  477. */
  478. protected function broadCast($bin_data)
  479. {
  480. foreach($this->clientConnMap as $client_id=>$conn)
  481. {
  482. $this->sendToSocketId($conn, $bin_data);
  483. }
  484. }
  485. /**
  486. * 根据client_id获取client_id对应连接的socket_id
  487. * @param int $client_id
  488. */
  489. protected function getFdByClientId($client_id)
  490. {
  491. if(isset($this->clientConnMap[$client_id]))
  492. {
  493. return $this->clientConnMap[$client_id];
  494. }
  495. return 0;
  496. }
  497. /**
  498. * 根据连接socket_id获取client_id
  499. * @param int $fd
  500. */
  501. protected function getClientIdByFd($fd)
  502. {
  503. if(isset($this->connClientMap[$fd]))
  504. {
  505. return $this->connClientMap[$fd];
  506. }
  507. return 0;
  508. }
  509. /**
  510. * 向某个socket_id的连接发送消息
  511. * @param int $socket_id
  512. * @param string $bin_data
  513. */
  514. public function sendToSocketId($socket_id, $bin_data)
  515. {
  516. if(!isset($this->connections[$socket_id]))
  517. {
  518. return false;
  519. }
  520. $this->currentDealFd = $socket_id;
  521. return $this->sendToClient($bin_data);
  522. }
  523. /**
  524. * 用户客户端关闭连接时触发
  525. * @see Man\Core.SocketWorker::closeClient()
  526. */
  527. protected function closeClient($fd)
  528. {
  529. StatisticClient::tick();
  530. if($client_id = $this->getClientIdByFd($fd))
  531. {
  532. $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $fd);
  533. Store::instance('gateway')->delete($client_id);
  534. unset($this->clientConnMap[$client_id]);
  535. }
  536. unset($this->connClientMap[$fd], $this->connSessionMap[$fd], $this->connRemoteAddressMap[$fd]);
  537. parent::closeClient($fd);
  538. StatisticClient::report(__CLASS__, 'CMD_ON_CLOSE', 1, 0, '');
  539. }
  540. /**
  541. * 内部通讯socket在BusinessWorker主动关闭连接时触发
  542. * @param int $fd
  543. */
  544. protected function closeInnerClient($fd)
  545. {
  546. unset($this->workerConnections[$fd]);
  547. parent::closeClient($fd);
  548. }
  549. /**
  550. * 随机抽取一个与BusinessWorker的长连接,将数据发给一个BusinessWorker
  551. * @param int $cmd
  552. * @param int $socket_id
  553. * @param string $body
  554. */
  555. protected function sendToWorker($cmd, $socket_id, $body = '')
  556. {
  557. $pack = new GatewayProtocol();
  558. $pack->header['cmd'] = $cmd;
  559. $pack->header['local_ip'] = $this->lanIp;
  560. $pack->header['local_port'] = $this->lanPort;
  561. $pack->header['socket_id'] = $socket_id;
  562. $pack->header['client_ip'] = $this->connRemoteAddressMap[$socket_id]['ip'];
  563. $pack->header['client_port'] = $this->connRemoteAddressMap[$socket_id]['port'];
  564. $pack->header['client_id'] = $this->getClientIdByFd($socket_id);
  565. $pack->body = $body;
  566. $pack->ext_data = $this->connSessionMap[$pack->header['socket_id']];
  567. return $this->sendBufferToWorker($pack->getBuffer());
  568. }
  569. /**
  570. * 随机抽取一个与BusinessWorker的长连接,将数据发给一个BusinessWorker
  571. * @param string $bin_data
  572. */
  573. protected function sendBufferToWorker($bin_data)
  574. {
  575. if($this->currentDealFd = array_rand($this->workerConnections))
  576. {
  577. return $this->sendToClient($bin_data);
  578. }
  579. else
  580. {
  581. $this->notice("sendBufferToWorker fail \$this->workerConnections=".var_export($this->workerConnections,true));
  582. }
  583. }
  584. /**
  585. * 打印日志
  586. * @see Man\Core.AbstractWorker::notice()
  587. */
  588. protected function notice($str, $display=true)
  589. {
  590. $str = 'Worker['.get_class($this).']:'."$str ip:".$this->getRemoteIp();
  591. Man\Core\Lib\Log::add($str);
  592. if($display && Man\Core\Lib\Config::get('workerman.debug') == 1)
  593. {
  594. echo $str."\n";
  595. }
  596. }
  597. /**
  598. * 进程停止时,清除一些数据
  599. * @see Man\Core.SocketWorker::onStop()
  600. */
  601. public function onStop()
  602. {
  603. $this->unregisterAddress($this->lanIp.':'.$this->lanPort);
  604. foreach($this->connClientMap as $client_id)
  605. {
  606. Store::instance('gateway')->delete($client_id);
  607. }
  608. }
  609. /**
  610. * 创建全局唯一的id
  611. */
  612. protected function createGlobalClientId()
  613. {
  614. $global_socket_key = 'GLOBAL_SOCKET_ID_KEY';
  615. $global_client_id = Store::instance('gateway')->increment($global_socket_key);
  616. if(!$global_client_id || $global_client_id > 2147483646)
  617. {
  618. Store::instance('gateway')->set($global_socket_key, 1);
  619. }
  620. else
  621. {
  622. return $global_client_id;
  623. }
  624. return Store::instance('gateway')->increment($global_socket_key);
  625. }
  626. /**
  627. * 向客户端发送心跳数据
  628. */
  629. public function ping()
  630. {
  631. $this->broadCast($this->pingData);
  632. }
  633. }