BusinessWorker.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. <?php
  2. /**
  3. *
  4. * 处理具体逻辑
  5. *
  6. * @author walkor <workerman.net>
  7. *
  8. */
  9. define('ROOT_DIR', realpath(__DIR__.'/../'));
  10. require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
  11. require_once ROOT_DIR . '/Event.php';
  12. require_once ROOT_DIR . '/Lib/APLog.php';
  13. class BusinessWorker extends Man\Core\SocketWorker
  14. {
  15. /**
  16. * BusinessWorker 实例
  17. * @var BusinessWorker
  18. */
  19. protected static $instance = null;
  20. /**
  21. * 与gateway的连接
  22. * ['ip:port' => conn, 'ip:port' => conn, ...]
  23. * @var array
  24. */
  25. protected static $gatewayConnections = array();
  26. /**
  27. * 进程启动时初始化
  28. * @see Man\Core.SocketWorker::onStart()
  29. */
  30. protected function onStart()
  31. {
  32. // 定时检查与gateway进程的连接
  33. \Man\Core\Lib\Task::init($this->event);
  34. \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
  35. self::$instance = $this;
  36. }
  37. /**
  38. * 获取实例
  39. */
  40. public static function getInstance()
  41. {
  42. return self::$instance;
  43. }
  44. /**
  45. * 获取与网关的连接
  46. */
  47. public static function getGatewayConnections()
  48. {
  49. return self::$gatewayConnections;
  50. }
  51. /**
  52. * 检查请求是否完整
  53. * @see Man\Core.SocketWorker::dealInput()
  54. */
  55. public function dealInput($recv_str)
  56. {
  57. return GatewayProtocol::input($recv_str);
  58. }
  59. /**
  60. * 处理请求
  61. * @see Man\Core.SocketWorker::dealProcess()
  62. */
  63. public function dealProcess($recv_str)
  64. {
  65. $pack = new GatewayProtocol($recv_str);
  66. Context::$client_ip = $pack->header['client_ip'];
  67. Context::$client_port = $pack->header['client_port'];
  68. Context::$local_ip = $pack->header['local_ip'];
  69. Context::$local_port = $pack->header['local_port'];
  70. Context::$socket_id = $pack->header['socket_id'];
  71. Context::$uid = $pack->header['uid'];
  72. switch($pack->header['cmd'])
  73. {
  74. case GatewayProtocol::CMD_ON_CONNECTION:
  75. $ret = call_user_func_array(array('Event', 'onConnect'), array($pack->body));
  76. break;
  77. case GatewayProtocol::CMD_ON_MESSAGE:
  78. $ret = call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
  79. break;
  80. case GatewayProtocol::CMD_ON_CLOSE:
  81. $ret = call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
  82. break;
  83. }
  84. Context::clear();
  85. return $ret;
  86. }
  87. /**
  88. * 定时检查gateway通信端口
  89. */
  90. public function checkGatewayConnections()
  91. {
  92. $key = 'GLOBAL_GATEWAY_ADDRESS';
  93. $addresses_list = Store::get($key);
  94. if(empty($addresses_list))
  95. {
  96. return;
  97. }
  98. foreach($addresses_list as $addr)
  99. {
  100. if(!isset(self::$gatewayConnections[$addr]))
  101. {
  102. $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
  103. if(!$conn)
  104. {
  105. $this->notice($errstr);
  106. continue;
  107. }
  108. self::$gatewayConnections[$addr] = $conn;
  109. stream_set_blocking(self::$gatewayConnections[$addr], 0);
  110. $fd = (int) self::$gatewayConnections[$addr];
  111. $this->connections[$fd] = self::$gatewayConnections[$addr];
  112. $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
  113. $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
  114. }
  115. }
  116. }
  117. /**
  118. * 发送数据给客户端
  119. * @see Man\Core.SocketWorker::sendToClient()
  120. */
  121. public function sendToClient($buffer, $con = null)
  122. {
  123. if($con)
  124. {
  125. $this->currentDealFd = (int) $con;
  126. }
  127. return parent::sendToClient($buffer);
  128. }
  129. /**
  130. * 关闭连接
  131. * @see Man\Core.SocketWorker::closeClient()
  132. */
  133. protected function closeClient($fd)
  134. {
  135. foreach(self::$gatewayConnections as $addr => $con)
  136. {
  137. $the_fd = (int) $con;
  138. if($the_fd == $fd)
  139. {
  140. unset(self::$gatewayConnections[$addr]);
  141. }
  142. }
  143. parent::closeClient($fd);
  144. }
  145. }
  146. /**
  147. * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
  148. * @author walkor
  149. *
  150. */
  151. class Context
  152. {
  153. public static $series_id;
  154. public static $local_ip;
  155. public static $local_port;
  156. public static $socket_id;
  157. public static $client_ip;
  158. public static $client_port;
  159. public static $uid;
  160. public static function clear()
  161. {
  162. self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;
  163. }
  164. }