BusinessWorker.php 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. <?php
  2. /**
  3. *
  4. * 处理具体逻辑
  5. *
  6. * @author walkor <workerman.net>
  7. *
  8. */
  9. define('ROOT_DIR', realpath(__DIR__.'/../'));
  10. require_once ROOT_DIR . '/Lib/Gateway.php';
  11. require_once ROOT_DIR . '/Lib/StatisticClient.php';
  12. require_once ROOT_DIR . '/Event.php';
  13. class BusinessWorker extends Man\Core\SocketWorker
  14. {
  15. /**
  16. * 与gateway的连接
  17. * ['ip:port' => conn, 'ip:port' => conn, ...]
  18. * @var array
  19. */
  20. protected $gatewayConnections = array();
  21. /**
  22. * 连不上的gateway地址
  23. * ['ip:port' => retry_count, 'ip:port' => retry_count, ...]
  24. * @var array
  25. */
  26. protected $badGatewayAddress = array();
  27. /**
  28. * 连接gateway失败重试次数
  29. * @var int
  30. */
  31. const MAX_RETRY_COUNT = 5;
  32. /**
  33. * 进程启动时初始化
  34. * @see Man\Core.SocketWorker::onStart()
  35. */
  36. protected function onStart()
  37. {
  38. // 定时检查与gateway进程的连接
  39. \Man\Core\Lib\Task::init($this->event);
  40. \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
  41. $this->checkGatewayConnections();
  42. GateWay::setBusinessWorker($this);
  43. }
  44. /**
  45. * 获取与gateway的连接
  46. */
  47. public function getGatewayConnections()
  48. {
  49. return $this->gatewayConnections;
  50. }
  51. /**
  52. * 检查gateway转发来的用户请求是否完整
  53. * @see Man\Core.SocketWorker::dealInput()
  54. */
  55. public function dealInput($recv_str)
  56. {
  57. return GatewayProtocol::input($recv_str);
  58. }
  59. /**
  60. * 处理请求
  61. * @see Man\Core.SocketWorker::dealProcess()
  62. */
  63. public function dealProcess($recv_str)
  64. {
  65. $pack = new GatewayProtocol($recv_str);
  66. Context::$client_ip = $pack->header['client_ip'];
  67. Context::$client_port = $pack->header['client_port'];
  68. Context::$local_ip = $pack->header['local_ip'];
  69. Context::$local_port = $pack->header['local_port'];
  70. Context::$socket_id = $pack->header['socket_id'];
  71. Context::$uid = $pack->header['uid'];
  72. $cmd = $pack->header['cmd'];
  73. $interface_map = array(
  74. GatewayProtocol::CMD_ON_CONNECTION => 'CMD_SEND_TO_ONE',
  75. GatewayProtocol::CMD_ON_MESSAGE => 'CMD_KICK',
  76. GatewayProtocol::CMD_ON_CLOSE => 'CMD_SEND_TO_ALL',
  77. );
  78. $cmd = $pack->header['cmd'];
  79. StatisticClient::tick();
  80. $module = __CLASS__;
  81. $interface = isset($interface_map[$cmd]) ? $interface_map[$cmd] : 'null';
  82. $success = 1;
  83. $code = 0;
  84. $msg = '';
  85. try{
  86. switch($cmd)
  87. {
  88. case GatewayProtocol::CMD_ON_CONNECTION:
  89. call_user_func_array(array('Event', 'onConnect'), array($pack->body));
  90. break;
  91. case GatewayProtocol::CMD_ON_MESSAGE:
  92. call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
  93. break;
  94. case GatewayProtocol::CMD_ON_CLOSE:
  95. call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
  96. break;
  97. }
  98. }
  99. catch(\Exception $e)
  100. {
  101. $success = 0;
  102. $code = $e->getCode() > 0 ? $e->getCode() : 500;
  103. $msg = 'uid:'.Context::$uid."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
  104. }
  105. Context::clear();
  106. StatisticClient::report($module, $interface, $success, $code, $msg);
  107. }
  108. /**
  109. * 定时检查gateway通信端口,如果有新的gateway则去建立长连接
  110. */
  111. public function checkGatewayConnections()
  112. {
  113. $key = 'GLOBAL_GATEWAY_ADDRESS';
  114. $addresses_list = Store::get($key);
  115. if(empty($addresses_list))
  116. {
  117. return;
  118. }
  119. // 循环遍历,查找未连接的gateway ip 端口
  120. foreach($addresses_list as $addr)
  121. {
  122. if(!isset($this->gatewayConnections[$addr]))
  123. {
  124. // 执行连接
  125. $conn = @stream_socket_client("tcp://$addr", $errno, $errstr, 10);
  126. if(!$conn)
  127. {
  128. if(!isset($this->badGatewayAddress[$addr]))
  129. {
  130. $this->badGatewayAddress[$addr] = 0;
  131. }
  132. // 删除连不上的端口
  133. if($this->badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
  134. {
  135. $addresses_list = Store::get($key);
  136. unset($addresses_list[$addr]);
  137. Store::set($key, $addresses_list);
  138. $this->notice("tcp://$addr ".$errstr." del $addr from store");
  139. continue;
  140. }
  141. $this->notice("tcp://$addr ".$errstr);
  142. continue;
  143. }
  144. unset($this->badGatewayAddress[$addr]);
  145. $this->gatewayConnections[$addr] = $conn;
  146. stream_set_blocking($this->gatewayConnections[$addr], 0);
  147. // 初始化一些值
  148. $fd = (int) $this->gatewayConnections[$addr];
  149. $this->connections[$fd] = $this->gatewayConnections[$addr];
  150. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  151. // 添加数据可读事件
  152. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  153. }
  154. }
  155. }
  156. /**
  157. * 发送数据给客户端
  158. * @see Man\Core.SocketWorker::sendToClient()
  159. */
  160. public function sendToClient($buffer, $con = null)
  161. {
  162. if($con)
  163. {
  164. $this->currentDealFd = (int) $con;
  165. }
  166. return parent::sendToClient($buffer);
  167. }
  168. /**
  169. * 关闭连接
  170. * @see Man\Core.SocketWorker::closeClient()
  171. */
  172. protected function closeClient($fd)
  173. {
  174. // 清理$this->gatewayConnections对应项
  175. foreach($this->gatewayConnections as $addr => $con)
  176. {
  177. $the_fd = (int) $con;
  178. if($the_fd == $fd)
  179. {
  180. unset($this->gatewayConnections[$addr]);
  181. }
  182. }
  183. parent::closeClient($fd);
  184. }
  185. }