BusinessWorker.php 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. <?php
  2. namespace GatewayWorker;
  3. use \Workerman\Worker;
  4. use \Workerman\Connection\AsyncTcpConnection;
  5. use \Workerman\Protocols\GatewayProtocol;
  6. use \Workerman\Lib\Timer;
  7. use \GatewayWorker\Lib\Lock;
  8. use \GatewayWorker\Lib\Store;
  9. use \GatewayWorker\Lib\Context;
  10. use \Event;
  11. /**
  12. *
  13. * BusinessWorker 用于处理Gateway转发来的数据
  14. *
  15. * @author walkor<walkor@workerman.net>
  16. *
  17. */
  18. class BusinessWorker extends Worker
  19. {
  20. /**
  21. * 如果连接gateway通讯端口失败,尝试重试多少次
  22. * @var int
  23. */
  24. const MAX_RETRY_COUNT = 5;
  25. /**
  26. * 保存与gateway的连接connection对象
  27. * @var array
  28. */
  29. public $gatewayConnections = array();
  30. /**
  31. * 连接失败gateway内部通讯地址
  32. * @var array
  33. */
  34. protected $_badGatewayAddress = array();
  35. /**
  36. * 保存用户设置的worker启动回调
  37. * @var callback
  38. */
  39. protected $_onWorkerStart = null;
  40. /**
  41. * 构造函数
  42. * @param string $socket_name
  43. * @param array $context_option
  44. */
  45. public function __construct($socket_name = '', $context_option = array())
  46. {
  47. parent::__construct($socket_name, $context_option);
  48. $backrace = debug_backtrace();
  49. $this->_appInitPath = dirname($backrace[0]['file']);
  50. }
  51. /**
  52. * 运行
  53. * @see Workerman.Worker::run()
  54. */
  55. public function run()
  56. {
  57. $this->_onWorkerStart = $this->onWorkerStart;
  58. $this->onWorkerStart = array($this, 'onWorkerStart');
  59. parent::run();
  60. }
  61. /**
  62. * 当进程启动时一些初始化工作
  63. * @return void
  64. */
  65. protected function onWorkerStart()
  66. {
  67. Timer::add(1, array($this, 'checkGatewayConnections'));
  68. $this->checkGatewayConnections();
  69. \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
  70. if($this->_onWorkerStart)
  71. {
  72. call_user_func($this->_onWorkerStart, $this);
  73. }
  74. }
  75. /**
  76. * 当gateway转发来数据时
  77. * @param TcpConnection $connection
  78. * @param mixed $data
  79. */
  80. public function onGatewayMessage($connection, $data)
  81. {
  82. // 上下文数据
  83. Context::$client_ip = $data['client_ip'];
  84. Context::$client_port = $data['client_port'];
  85. Context::$local_ip = $data['local_ip'];
  86. Context::$local_port = $data['local_port'];
  87. Context::$client_id = $data['client_id'];
  88. // $_SERVER变量
  89. $_SERVER = array(
  90. 'REMOTE_ADDR' => Context::$client_ip,
  91. 'REMOTE_PORT' => Context::$client_port,
  92. 'GATEWAY_ADDR' => Context::$local_ip,
  93. 'GATEWAY_PORT' => Context::$local_port,
  94. 'GATEWAY_CLIENT_ID' => Context::$client_id,
  95. );
  96. // 尝试解析session
  97. if($data['ext_data'] != '')
  98. {
  99. $_SESSION = Context::sessionDecode($data['ext_data']);
  100. }
  101. else
  102. {
  103. $_SESSION = null;
  104. }
  105. // 备份一次$data['ext_data'],请求处理完毕后判断session是否和备份相等,不相等就更新session
  106. $session_str_copy = $data['ext_data'];
  107. $cmd = $data['cmd'];
  108. // 尝试执行Event::onConnection、Event::onMessage、Event::onClose
  109. try{
  110. switch($cmd)
  111. {
  112. case GatewayProtocol::CMD_ON_CONNECTION:
  113. Event::onConnect(Context::$client_id);
  114. break;
  115. case GatewayProtocol::CMD_ON_MESSAGE:
  116. Event::onMessage(Context::$client_id, $data['body']);
  117. break;
  118. case GatewayProtocol::CMD_ON_CLOSE:
  119. Event::onClose(Context::$client_id);
  120. break;
  121. }
  122. }
  123. catch(\Exception $e)
  124. {
  125. $msg = 'client_id:'.Context::$client_id."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
  126. $this->log($msg);
  127. }
  128. // 判断session是否被更改
  129. $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
  130. if($session_str_copy != $session_str_now)
  131. {
  132. \GatewayWorker\Lib\Gateway::updateSocketSession(Context::$client_id, $session_str_now);
  133. }
  134. Context::clear();
  135. }
  136. /**
  137. * 当与Gateway的连接断开时触发
  138. * @param TcpConnection $connection
  139. * @return void
  140. */
  141. public function onClose($connection)
  142. {
  143. unset($this->gatewayConnections[$connection->remoteAddress]);
  144. }
  145. /**
  146. * 检查gateway的通信端口是否都已经连
  147. * 如果有未连接的端口,则尝试连接
  148. * @return void
  149. */
  150. public function checkGatewayConnections()
  151. {
  152. $key = 'GLOBAL_GATEWAY_ADDRESS';
  153. $addresses_list = Store::instance('gateway')->get($key);
  154. if(empty($addresses_list))
  155. {
  156. return;
  157. }
  158. foreach($addresses_list as $addr)
  159. {
  160. if(!isset($this->gatewayConnections[$addr]))
  161. {
  162. $gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
  163. $gateway_connection->remoteAddress = $addr;
  164. $gateway_connection->onConnect = array($this, 'onConnectGateway');
  165. $gateway_connection->onMessage = array($this, 'onGatewayMessage');
  166. $gateway_connection->onClose = array($this, 'onClose');
  167. $gateway_connection->onError = array($this, 'onError');
  168. }
  169. }
  170. }
  171. /**
  172. * 当连接上gateway的通讯端口时触发
  173. * 将连接connection对象保存起来
  174. * @param TcpConnection $connection
  175. * @return void
  176. */
  177. public function onConnectGateway($connection)
  178. {
  179. $this->gatewayConnections[$connection->remoteAddress] = $connection;
  180. unset($this->_badGatewayAddress[$connection->remoteAddress]);
  181. }
  182. /**
  183. * 当与gateway的连接出现错误时触发
  184. * @param TcpConnection $connection
  185. * @param int $error_no
  186. * @param string $error_msg
  187. */
  188. public function onError($connection, $error_no, $error_msg)
  189. {
  190. $this->tryToDeleteGatewayAddress($connection->remoteAddress, $error_msg);
  191. }
  192. /**
  193. * 从存储中删除删除连不上的gateway通讯端口
  194. * @param string $addr
  195. * @param string $errstr
  196. */
  197. public function tryToDeleteGatewayAddress($addr, $errstr)
  198. {
  199. $key = 'GLOBAL_GATEWAY_ADDRESS';
  200. if(!isset($this->_badGatewayAddress[$addr]))
  201. {
  202. $this->_badGatewayAddress[$addr] = 0;
  203. }
  204. // 删除连不上的端口
  205. if($this->_badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
  206. {
  207. Lock::get();
  208. $addresses_list = Store::instance('gateway')->get($key);
  209. unset($addresses_list[$addr]);
  210. Store::instance('gateway')->set($key, $addresses_list);
  211. Lock::release();
  212. $this->log("tcp://$addr ".$errstr." del $addr from store", false);
  213. }
  214. }
  215. }