BusinessWorker.php 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. <?php
  2. /**
  3. *
  4. * 处理具体逻辑
  5. *
  6. * @author walkor <workerman.net>
  7. *
  8. */
  9. define('ROOT_DIR', realpath(__DIR__.'/../'));
  10. require_once ROOT_DIR . '/Lib/Gateway.php';
  11. require_once ROOT_DIR . '/Lib/StatisticsClient.php';
  12. require_once ROOT_DIR . '/Event.php';
  13. class BusinessWorker extends Man\Core\SocketWorker
  14. {
  15. /**
  16. * 与gateway的连接
  17. * ['ip:port' => conn, 'ip:port' => conn, ...]
  18. * @var array
  19. */
  20. protected $gatewayConnections = array();
  21. /**
  22. * 进程启动时初始化
  23. * @see Man\Core.SocketWorker::onStart()
  24. */
  25. protected function onStart()
  26. {
  27. // 定时检查与gateway进程的连接
  28. \Man\Core\Lib\Task::init($this->event);
  29. \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
  30. $this->checkGatewayConnections();
  31. GateWay::setBusinessWorker($this);
  32. }
  33. /**
  34. * 获取与gateway的连接
  35. */
  36. public function getGatewayConnections()
  37. {
  38. return $this->gatewayConnections;
  39. }
  40. /**
  41. * 检查gateway转发来的用户请求是否完整
  42. * @see Man\Core.SocketWorker::dealInput()
  43. */
  44. public function dealInput($recv_str)
  45. {
  46. return GatewayProtocol::input($recv_str);
  47. }
  48. /**
  49. * 处理请求
  50. * @see Man\Core.SocketWorker::dealProcess()
  51. */
  52. public function dealProcess($recv_str)
  53. {
  54. $pack = new GatewayProtocol($recv_str);
  55. Context::$client_ip = $pack->header['client_ip'];
  56. Context::$client_port = $pack->header['client_port'];
  57. Context::$local_ip = $pack->header['local_ip'];
  58. Context::$local_port = $pack->header['local_port'];
  59. Context::$socket_id = $pack->header['socket_id'];
  60. Context::$uid = $pack->header['uid'];
  61. $cmd = $pack->header['cmd'];
  62. $interface_map = array(
  63. GatewayProtocol::CMD_ON_CONNECTION => 'CMD_SEND_TO_ONE',
  64. GatewayProtocol::CMD_ON_MESSAGE => 'CMD_KICK',
  65. GatewayProtocol::CMD_ON_CLOSE => 'CMD_SEND_TO_ALL',
  66. );
  67. $cmd = $pack->header['cmd'];
  68. StatisticClient::tick();
  69. $module = __CLASS__;
  70. $interface = isset($interface_map[$cmd]) ? $interface_map[$cmd] : 'null';
  71. $success = 1;
  72. $code = 0;
  73. $msg = '';
  74. try{
  75. switch($cmd)
  76. {
  77. case GatewayProtocol::CMD_ON_CONNECTION:
  78. call_user_func_array(array('Event', 'onConnect'), array($pack->body));
  79. break;
  80. case GatewayProtocol::CMD_ON_MESSAGE:
  81. call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
  82. break;
  83. case GatewayProtocol::CMD_ON_CLOSE:
  84. call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
  85. break;
  86. }
  87. }
  88. catch(\Exception $e)
  89. {
  90. $success = 0;
  91. $code = $e->getCode() > 0 ? $e->getCode() : 500;
  92. $msg = 'uid:'.Context::$uid."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
  93. }
  94. Context::clear();
  95. StatisticClient::report($module, $interface, $success, $code, $msg);
  96. }
  97. /**
  98. * 定时检查gateway通信端口,如果有新的gateway则去建立长连接
  99. */
  100. public function checkGatewayConnections()
  101. {
  102. $key = 'GLOBAL_GATEWAY_ADDRESS';
  103. $addresses_list = Store::get($key);
  104. if(empty($addresses_list))
  105. {
  106. return;
  107. }
  108. // 循环遍历,查找未连接的gateway ip 端口
  109. foreach($addresses_list as $addr)
  110. {
  111. if(!isset($this->gatewayConnections[$addr]))
  112. {
  113. // 执行连接
  114. $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
  115. if(!$conn)
  116. {
  117. $this->notice($errstr);
  118. continue;
  119. }
  120. $this->gatewayConnections[$addr] = $conn;
  121. stream_set_blocking($this->gatewayConnections[$addr], 0);
  122. // 初始化一些值
  123. $fd = (int) $this->gatewayConnections[$addr];
  124. $this->connections[$fd] = $this->gatewayConnections[$addr];
  125. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  126. // 添加数据可读事件
  127. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  128. }
  129. }
  130. }
  131. /**
  132. * 发送数据给客户端
  133. * @see Man\Core.SocketWorker::sendToClient()
  134. */
  135. public function sendToClient($buffer, $con = null)
  136. {
  137. if($con)
  138. {
  139. $this->currentDealFd = (int) $con;
  140. }
  141. return parent::sendToClient($buffer);
  142. }
  143. /**
  144. * 关闭连接
  145. * @see Man\Core.SocketWorker::closeClient()
  146. */
  147. protected function closeClient($fd)
  148. {
  149. // 清理$this->gatewayConnections对应项
  150. foreach($this->gatewayConnections as $addr => $con)
  151. {
  152. $the_fd = (int) $con;
  153. if($the_fd == $fd)
  154. {
  155. unset($this->gatewayConnections[$addr]);
  156. }
  157. }
  158. parent::closeClient($fd);
  159. }
  160. }