BusinessWorker.php 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. <?php
  2. namespace GatewayWorker;
  3. use \Workerman\Worker;
  4. use \Workerman\Connection\AsyncTcpConnection;
  5. use \Workerman\Protocols\GatewayProtocol;
  6. use \Workerman\Lib\Timer;
  7. use \GatewayWorker\Lib\Lock;
  8. use \GatewayWorker\Lib\Store;
  9. use \GatewayWorker\Lib\Context;
  10. use \Event;
  11. /**
  12. *
  13. * BusinessWorker 用于处理Gateway转发来的数据
  14. *
  15. * @author walkor<walkor@workerman.net>
  16. *
  17. */
  18. class BusinessWorker extends Worker
  19. {
  20. /**
  21. * 如果连接gateway通讯端口失败,尝试重试多少次
  22. * @var int
  23. */
  24. const MAX_RETRY_COUNT = 5;
  25. /**
  26. * 保存与gateway的连接connection对象
  27. * @var array
  28. */
  29. public $gatewayConnections = array();
  30. /**
  31. * 连接失败gateway内部通讯地址
  32. * @var array
  33. */
  34. public $badGatewayAddress = array();
  35. /**
  36. * 构造函数
  37. * @param string $socket_name
  38. * @param array $context_option
  39. */
  40. public function __construct($socket_name = '', $context_option = array())
  41. {
  42. $this->onWorkerStart = array($this, 'onWorkerStart');
  43. parent::__construct($socket_name, $context_option);
  44. $backrace = debug_backtrace();
  45. $this->_appInitPath = dirname($backrace[0]['file']);
  46. }
  47. /**
  48. * 当进程启动时一些初始化工作
  49. * @return void
  50. */
  51. protected function onWorkerStart()
  52. {
  53. Timer::add(1, array($this, 'checkGatewayConnections'));
  54. $this->checkGatewayConnections();
  55. \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
  56. }
  57. /**
  58. * 当gateway转发来数据时
  59. * @param TcpConnection $connection
  60. * @param mixed $data
  61. */
  62. public function onGatewayMessage($connection, $data)
  63. {
  64. // 上下文数据
  65. Context::$client_ip = $data['client_ip'];
  66. Context::$client_port = $data['client_port'];
  67. Context::$local_ip = $data['local_ip'];
  68. Context::$local_port = $data['local_port'];
  69. Context::$client_id = $data['client_id'];
  70. // $_SERVER变量
  71. $_SERVER = array(
  72. 'REMOTE_ADDR' => Context::$client_ip,
  73. 'REMOTE_PORT' => Context::$client_port,
  74. 'GATEWAY_ADDR' => Context::$local_ip,
  75. 'GATEWAY_PORT' => Context::$local_port,
  76. 'GATEWAY_CLIENT_ID' => Context::$client_id,
  77. );
  78. // 尝试解析session
  79. if($data['ext_data'] != '')
  80. {
  81. $_SESSION = Context::sessionDecode($data['ext_data']);
  82. }
  83. else
  84. {
  85. $_SESSION = null;
  86. }
  87. // 备份一次$data['ext_data'],请求处理完毕后判断session是否和备份相等,不相等就更新session
  88. $session_str_copy = $data['ext_data'];
  89. $cmd = $data['cmd'];
  90. // 尝试执行Event::onConnection、Event::onMessage、Event::onClose
  91. try{
  92. switch($cmd)
  93. {
  94. case GatewayProtocol::CMD_ON_CONNECTION:
  95. Event::onConnect(Context::$client_id);
  96. break;
  97. case GatewayProtocol::CMD_ON_MESSAGE:
  98. Event::onMessage(Context::$client_id, $data['body']);
  99. break;
  100. case GatewayProtocol::CMD_ON_CLOSE:
  101. Event::onClose(Context::$client_id);
  102. break;
  103. }
  104. }
  105. catch(\Exception $e)
  106. {
  107. $msg = 'client_id:'.Context::$client_id."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
  108. $this->log($msg);
  109. }
  110. // 判断session是否被更改
  111. $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
  112. if($session_str_copy != $session_str_now)
  113. {
  114. \GatewayWorker\Lib\Gateway::updateSocketSession(Context::$client_id, $session_str_now);
  115. }
  116. Context::clear();
  117. }
  118. /**
  119. * 当与Gateway的连接断开时触发
  120. * @param TcpConnection $connection
  121. * @return void
  122. */
  123. public function onClose($connection)
  124. {
  125. unset($this->gatewayConnections[$connection->remoteAddress]);
  126. }
  127. /**
  128. * 检查gateway的通信端口是否都已经连
  129. * 如果有未连接的端口,则尝试连接
  130. * @return void
  131. */
  132. public function checkGatewayConnections()
  133. {
  134. $key = 'GLOBAL_GATEWAY_ADDRESS';
  135. $addresses_list = Store::instance('gateway')->get($key);
  136. if(empty($addresses_list))
  137. {
  138. return;
  139. }
  140. foreach($addresses_list as $addr)
  141. {
  142. if(!isset($this->gatewayConnections[$addr]))
  143. {
  144. $gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
  145. $gateway_connection->remoteAddress = $addr;
  146. $gateway_connection->onConnect = array($this, 'onConnectGateway');
  147. $gateway_connection->onMessage = array($this, 'onGatewayMessage');
  148. $gateway_connection->onClose = array($this, 'onClose');
  149. $gateway_connection->onError = array($this, 'onError');
  150. }
  151. }
  152. }
  153. /**
  154. * 当连接上gateway的通讯端口时触发
  155. * 将连接connection对象保存起来
  156. * @param TcpConnection $connection
  157. * @return void
  158. */
  159. public function onConnectGateway($connection)
  160. {
  161. $this->gatewayConnections[$connection->remoteAddress] = $connection;
  162. unset($this->badGatewayAddress[$connection->remoteAddress]);
  163. }
  164. /**
  165. * 当与gateway的连接出现错误时触发
  166. * @param TcpConnection $connection
  167. * @param int $error_no
  168. * @param string $error_msg
  169. */
  170. public function onError($connection, $error_no, $error_msg)
  171. {
  172. $this->tryToDeleteGatewayAddress($connection->remoteAddress, $error_msg);
  173. }
  174. /**
  175. * 从存储中删除删除连不上的gateway通讯端口
  176. * @param string $addr
  177. * @param string $errstr
  178. */
  179. public function tryToDeleteGatewayAddress($addr, $errstr)
  180. {
  181. $key = 'GLOBAL_GATEWAY_ADDRESS';
  182. if(!isset($this->badGatewayAddress[$addr]))
  183. {
  184. $this->badGatewayAddress[$addr] = 0;
  185. }
  186. // 删除连不上的端口
  187. if($this->badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
  188. {
  189. Lock::get();
  190. $addresses_list = Store::instance('gateway')->get($key);
  191. unset($addresses_list[$addr]);
  192. Store::instance('gateway')->set($key, $addresses_list);
  193. Lock::release();
  194. $this->log("tcp://$addr ".$errstr." del $addr from store", false);
  195. }
  196. }
  197. }