BusinessWorker.php 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. <?php
  2. namespace GatewayWorker;
  3. use \Workerman\Worker;
  4. use \Workerman\Connection\AsyncTcpConnection;
  5. use \Workerman\Protocols\GatewayProtocol;
  6. use \Workerman\Lib\Timer;
  7. use \GatewayWorker\Lib\Lock;
  8. use \GatewayWorker\Lib\Store;
  9. use \GatewayWorker\Lib\Context;
  10. use \Event;
  11. /**
  12. *
  13. * BusinessWorker 用于处理Gateway转发来的数据
  14. *
  15. * @author walkor<walkor@workerman.net>
  16. *
  17. */
  18. class BusinessWorker extends Worker
  19. {
  20. /**
  21. * 如果连接gateway通讯端口失败,尝试重试多少次
  22. * @var int
  23. */
  24. const MAX_RETRY_COUNT = 5;
  25. /**
  26. * 保存与gateway的连接connection对象
  27. * @var array
  28. */
  29. public $gatewayConnections = array();
  30. /**
  31. * 正在连接的gateway内部通讯地址
  32. * @var array
  33. */
  34. protected $_connectingGatewayAddress = array();
  35. /**
  36. * 连接失败gateway内部通讯地址
  37. * @var array
  38. */
  39. protected $_badGatewayAddress = array();
  40. /**
  41. * 保存用户设置的worker启动回调
  42. * @var callback
  43. */
  44. protected $_onWorkerStart = null;
  45. /**
  46. * 构造函数
  47. * @param string $socket_name
  48. * @param array $context_option
  49. */
  50. public function __construct($socket_name = '', $context_option = array())
  51. {
  52. parent::__construct($socket_name, $context_option);
  53. $backrace = debug_backtrace();
  54. $this->_appInitPath = dirname($backrace[0]['file']);
  55. }
  56. /**
  57. * 运行
  58. * @see Workerman.Worker::run()
  59. */
  60. public function run()
  61. {
  62. $this->_onWorkerStart = $this->onWorkerStart;
  63. $this->onWorkerStart = array($this, 'onWorkerStart');
  64. parent::run();
  65. }
  66. /**
  67. * 当进程启动时一些初始化工作
  68. * @return void
  69. */
  70. protected function onWorkerStart()
  71. {
  72. Timer::add(1, array($this, 'checkGatewayConnections'));
  73. $this->checkGatewayConnections();
  74. \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
  75. if($this->_onWorkerStart)
  76. {
  77. call_user_func($this->_onWorkerStart, $this);
  78. }
  79. }
  80. /**
  81. * 当gateway转发来数据时
  82. * @param TcpConnection $connection
  83. * @param mixed $data
  84. */
  85. public function onGatewayMessage($connection, $data)
  86. {
  87. // 上下文数据
  88. Context::$client_ip = $data['client_ip'];
  89. Context::$client_port = $data['client_port'];
  90. Context::$local_ip = $data['local_ip'];
  91. Context::$local_port = $data['local_port'];
  92. Context::$client_id = $data['client_id'];
  93. // $_SERVER变量
  94. $_SERVER = array(
  95. 'REMOTE_ADDR' => Context::$client_ip,
  96. 'REMOTE_PORT' => Context::$client_port,
  97. 'GATEWAY_ADDR' => Context::$local_ip,
  98. 'GATEWAY_PORT' => Context::$local_port,
  99. 'GATEWAY_CLIENT_ID' => Context::$client_id,
  100. );
  101. // 尝试解析session
  102. if($data['ext_data'] != '')
  103. {
  104. $_SESSION = Context::sessionDecode($data['ext_data']);
  105. }
  106. else
  107. {
  108. $_SESSION = null;
  109. }
  110. // 备份一次$data['ext_data'],请求处理完毕后判断session是否和备份相等,不相等就更新session
  111. $session_str_copy = $data['ext_data'];
  112. $cmd = $data['cmd'];
  113. // 尝试执行Event::onConnection、Event::onMessage、Event::onClose
  114. try{
  115. switch($cmd)
  116. {
  117. case GatewayProtocol::CMD_ON_CONNECTION:
  118. Event::onConnect(Context::$client_id);
  119. break;
  120. case GatewayProtocol::CMD_ON_MESSAGE:
  121. Event::onMessage(Context::$client_id, $data['body']);
  122. break;
  123. case GatewayProtocol::CMD_ON_CLOSE:
  124. Event::onClose(Context::$client_id);
  125. break;
  126. }
  127. }
  128. catch(\Exception $e)
  129. {
  130. $msg = 'client_id:'.Context::$client_id."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
  131. $this->log($msg);
  132. }
  133. // 判断session是否被更改
  134. $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
  135. if($session_str_copy != $session_str_now)
  136. {
  137. \GatewayWorker\Lib\Gateway::updateSocketSession(Context::$client_id, $session_str_now);
  138. }
  139. Context::clear();
  140. }
  141. /**
  142. * 当与Gateway的连接断开时触发
  143. * @param TcpConnection $connection
  144. * @return void
  145. */
  146. public function onClose($connection)
  147. {
  148. unset($this->gatewayConnections[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
  149. }
  150. /**
  151. * 检查gateway的通信端口是否都已经连
  152. * 如果有未连接的端口,则尝试连接
  153. * @return void
  154. */
  155. public function checkGatewayConnections()
  156. {
  157. $key = 'GLOBAL_GATEWAY_ADDRESS';
  158. $addresses_list = Store::instance('gateway')->get($key);
  159. if(empty($addresses_list))
  160. {
  161. return;
  162. }
  163. foreach($addresses_list as $addr)
  164. {
  165. if(!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddress[$addr]) && !isset($this->_badGatewayAddress[$addr]))
  166. {
  167. $gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
  168. $gateway_connection->remoteAddress = $addr;
  169. $gateway_connection->onConnect = array($this, 'onConnectGateway');
  170. $gateway_connection->onMessage = array($this, 'onGatewayMessage');
  171. $gateway_connection->onClose = array($this, 'onClose');
  172. $gateway_connection->onError = array($this, 'onError');
  173. $gateway_connection->connect();
  174. $this->_connectingGatewayAddress[$addr] = 1;
  175. }
  176. }
  177. }
  178. /**
  179. * 当连接上gateway的通讯端口时触发
  180. * 将连接connection对象保存起来
  181. * @param TcpConnection $connection
  182. * @return void
  183. */
  184. public function onConnectGateway($connection)
  185. {
  186. $this->gatewayConnections[$connection->remoteAddress] = $connection;
  187. unset($this->_badGatewayAddress[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
  188. }
  189. /**
  190. * 当与gateway的连接出现错误时触发
  191. * @param TcpConnection $connection
  192. * @param int $error_no
  193. * @param string $error_msg
  194. */
  195. public function onError($connection, $error_no, $error_msg)
  196. {
  197. $this->tryToDeleteGatewayAddress($connection->remoteAddress, $error_msg);
  198. }
  199. /**
  200. * 从存储中删除删除连不上的gateway通讯端口
  201. * @param string $addr
  202. * @param string $errstr
  203. */
  204. public function tryToDeleteGatewayAddress($addr, $errstr)
  205. {
  206. $key = 'GLOBAL_GATEWAY_ADDRESS';
  207. if(!isset($this->_badGatewayAddress[$addr]))
  208. {
  209. $this->_badGatewayAddress[$addr] = 0;
  210. }
  211. // 删除连不上的端口
  212. if($this->_badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
  213. {
  214. Lock::get();
  215. $addresses_list = Store::instance('gateway')->get($key);
  216. unset($addresses_list[$addr]);
  217. Store::instance('gateway')->set($key, $addresses_list);
  218. Lock::release();
  219. $this->log("tcp://$addr ".$errstr." del $addr from store", false);
  220. }
  221. }
  222. }