BusinessWorker.php 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. <?php
  2. /**
  3. *
  4. * 处理具体逻辑
  5. *
  6. * @author walkor <workerman.net>
  7. *
  8. */
  9. require_once __DIR__ . '/../Lib/Autoloader.php';
  10. use \Protocols\GatewayProtocol;
  11. use \Lib\Store;
  12. use \Lib\Gateway;
  13. use \Lib\StatisticClient;
  14. use \Lib\Context;
  15. class BusinessWorker extends Man\Core\SocketWorker
  16. {
  17. /**
  18. * 与gateway的连接
  19. * ['ip:port' => conn, 'ip:port' => conn, ...]
  20. * @var array
  21. */
  22. protected $gatewayConnections = array();
  23. /**
  24. * 连不上的gateway地址
  25. * ['ip:port' => retry_count, 'ip:port' => retry_count, ...]
  26. * @var array
  27. */
  28. protected $badGatewayAddress = array();
  29. /**
  30. * 连接gateway失败重试次数
  31. * @var int
  32. */
  33. const MAX_RETRY_COUNT = 5;
  34. /**
  35. * 命令字映射 统计用到
  36. * @var array
  37. */
  38. protected static $interfaceMap = array(
  39. GatewayProtocol::CMD_ON_GATEWAY_CONNECTION => 'CMD_ON_GATEWAY_CONNECTION',
  40. GatewayProtocol::CMD_ON_CONNECTION => 'CMD_ON_CONNECTION',
  41. GatewayProtocol::CMD_ON_MESSAGE => 'CMD_ON_MESSAGE',
  42. GatewayProtocol::CMD_ON_CLOSE => 'CMD_ON_CLOSE',
  43. );
  44. /**
  45. * 进程启动时初始化
  46. * @see Man\Core.SocketWorker::onStart()
  47. */
  48. protected function onStart()
  49. {
  50. // 定时检查与gateway进程的连接
  51. \Man\Core\Lib\Task::init($this->event);
  52. \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
  53. $this->checkGatewayConnections();
  54. Gateway::setBusinessWorker($this);
  55. }
  56. /**
  57. * 获取与gateway的连接
  58. */
  59. public function getGatewayConnections()
  60. {
  61. return $this->gatewayConnections;
  62. }
  63. /**
  64. * 检查gateway转发来的用户请求是否完整
  65. * @see Man\Core.SocketWorker::dealInput()
  66. */
  67. public function dealInput($recv_str)
  68. {
  69. return GatewayProtocol::input($recv_str);
  70. }
  71. /**
  72. * 处理请求
  73. * @see Man\Core.SocketWorker::dealProcess()
  74. */
  75. public function dealProcess($recv_str)
  76. {
  77. $pack = new GatewayProtocol($recv_str);
  78. Context::$client_ip = $pack->header['client_ip'];
  79. Context::$client_port = $pack->header['client_port'];
  80. Context::$local_ip = $pack->header['local_ip'];
  81. Context::$local_port = $pack->header['local_port'];
  82. Context::$socket_id = $pack->header['socket_id'];
  83. Context::$uid = $pack->header['uid'];
  84. $_SERVER = array(
  85. 'REMOTE_ADDR' => Context::$client_ip,
  86. 'REMOTE_PORT' => Context::$client_port,
  87. 'GATEWAY_ADDR' => Context::$local_ip,
  88. 'GATEWAY_PORT' => Context::$local_port,
  89. 'GATEWAY_SOCKET_ID' => Context::$socket_id,
  90. );
  91. if($pack->ext_data != '')
  92. {
  93. $_SESSION = Context::sessionDecode($pack->ext_data);
  94. }
  95. else
  96. {
  97. $_SESSION = null;
  98. }
  99. // 备份一次$pack->ext_data,请求处理完毕后判断session是否和备份相等
  100. $session_str_copy = $pack->ext_data;
  101. $cmd = $pack->header['cmd'];
  102. StatisticClient::tick();
  103. $module = __CLASS__;
  104. $interface = isset(self::$interfaceMap[$cmd]) ? self::$interfaceMap[$cmd] : 'null';
  105. $success = 1;
  106. $code = 0;
  107. $msg = '';
  108. try{
  109. switch($cmd)
  110. {
  111. case GatewayProtocol::CMD_ON_GATEWAY_CONNECTION:
  112. call_user_func_array(array('Event', 'onGatewayConnect'), array());
  113. break;
  114. case GatewayProtocol::CMD_ON_CONNECTION:
  115. call_user_func_array(array('Event', 'onConnect'), array($pack->body));
  116. break;
  117. case GatewayProtocol::CMD_ON_MESSAGE:
  118. call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
  119. break;
  120. case GatewayProtocol::CMD_ON_CLOSE:
  121. call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
  122. break;
  123. }
  124. }
  125. catch(\Exception $e)
  126. {
  127. $success = 0;
  128. $code = $e->getCode() > 0 ? $e->getCode() : 500;
  129. $msg = 'uid:'.Context::$uid."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
  130. }
  131. $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
  132. if($session_str_copy != $session_str_now)
  133. {
  134. Gateway::updateSocketSession(Context::$socket_id, $session_str_now);
  135. }
  136. Context::clear();
  137. StatisticClient::report($module, $interface, $success, $code, $msg);
  138. }
  139. /**
  140. * 定时检查gateway通信端口,如果有新的gateway则去建立长连接
  141. */
  142. public function checkGatewayConnections()
  143. {
  144. $key = 'GLOBAL_GATEWAY_ADDRESS';
  145. $addresses_list = Store::instance('gateway')->get($key);
  146. if(empty($addresses_list))
  147. {
  148. return;
  149. }
  150. // 循环遍历,查找未连接的gateway ip 端口
  151. foreach($addresses_list as $addr)
  152. {
  153. if(!isset($this->gatewayConnections[$addr]))
  154. {
  155. // 执行连接
  156. $conn = @stream_socket_client("tcp://$addr", $errno, $errstr, 1);
  157. if(!$conn)
  158. {
  159. if(!isset($this->badGatewayAddress[$addr]))
  160. {
  161. $this->badGatewayAddress[$addr] = 0;
  162. }
  163. // 删除连不上的端口
  164. if($this->badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
  165. {
  166. $addresses_list = Store::instance('gateway')->get($key);
  167. unset($addresses_list[$addr]);
  168. Store::instance('gateway')->set($key, $addresses_list);
  169. $this->notice("tcp://$addr ".$errstr." del $addr from store", false);
  170. }
  171. continue;
  172. }
  173. unset($this->badGatewayAddress[$addr]);
  174. $this->gatewayConnections[$addr] = $conn;
  175. stream_set_blocking($this->gatewayConnections[$addr], 0);
  176. // 初始化一些值
  177. $fd = (int) $this->gatewayConnections[$addr];
  178. $this->connections[$fd] = $this->gatewayConnections[$addr];
  179. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  180. // 添加数据可读事件
  181. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  182. }
  183. }
  184. }
  185. /**
  186. * 发送数据给客户端
  187. * @see Man\Core.SocketWorker::sendToClient()
  188. */
  189. public function sendToClient($buffer, $con = null)
  190. {
  191. if($con)
  192. {
  193. $this->currentDealFd = (int) $con;
  194. }
  195. return parent::sendToClient($buffer);
  196. }
  197. /**
  198. * 关闭连接
  199. * @see Man\Core.SocketWorker::closeClient()
  200. */
  201. protected function closeClient($fd)
  202. {
  203. // 清理$this->gatewayConnections对应项
  204. foreach($this->gatewayConnections as $addr => $con)
  205. {
  206. $the_fd = (int) $con;
  207. if($the_fd == $fd)
  208. {
  209. unset($this->gatewayConnections[$addr]);
  210. }
  211. }
  212. parent::closeClient($fd);
  213. }
  214. }