BusinessWorker.php 7.6 KB


  1. <?php
  2. namespace GatewayWorker;
  3. use Workerman\Connection\TcpConnection;
  4. use \Workerman\Worker;
  5. use \Workerman\Connection\AsyncTcpConnection;
  6. use \Workerman\Protocols\GatewayProtocol;
  7. use \Workerman\Lib\Timer;
  8. use \GatewayWorker\Lib\Lock;
  9. use \GatewayWorker\Lib\Store;
  10. use \GatewayWorker\Lib\Context;
  11. use \Event;
  12. /**
  13. *
  14. * BusinessWorker 用于处理Gateway转发来的数据
  15. *
  16. * @author walkor<walkor@workerman.net>
  17. *
  18. */
  19. class BusinessWorker extends Worker
  20. {
  21. /**
  22. * 如果连接gateway通讯端口失败,尝试重试多少次
  23. * @var int
  24. */
  25. const MAX_RETRY_COUNT = 5;
  26. /**
  27. * 保存与gateway的连接connection对象
  28. * @var array
  29. */
  30. public $gatewayConnections = array();
  31. /**
  32. * 正在连接的gateway内部通讯地址
  33. * @var array
  34. */
  35. protected $_connectingGatewayAddress = array();
  36. /**
  37. * 连接失败gateway内部通讯地址
  38. * @var array
  39. */
  40. protected $_badGatewayAddress = array();
  41. /**
  42. * 保存用户设置的worker启动回调
  43. * @var callback
  44. */
  45. protected $_onWorkerStart = null;
  46. /**
  47. * 构造函数
  48. * @param string $socket_name
  49. * @param array $context_option
  50. */
  51. public function __construct($socket_name = '', $context_option = array())
  52. {
  53. parent::__construct($socket_name, $context_option);
  54. $backrace = debug_backtrace();
  55. $this->_appInitPath = dirname($backrace[0]['file']);
  56. }
  57. /**
  58. * 运行
  59. * @see Workerman.Worker::run()
  60. */
  61. public function run()
  62. {
  63. $this->_onWorkerStart = $this->onWorkerStart;
  64. $this->onWorkerStart = array($this, 'onWorkerStart');
  65. parent::run();
  66. }
  67. /**
  68. * 当进程启动时一些初始化工作
  69. * @return void
  70. */
  71. protected function onWorkerStart()
  72. {
  73. Timer::add(1, array($this, 'checkGatewayConnections'));
  74. $this->checkGatewayConnections();
  75. \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
  76. if($this->_onWorkerStart)
  77. {
  78. call_user_func($this->_onWorkerStart, $this);
  79. }
  80. }
  81. /**
  82. * 当gateway转发来数据时
  83. * @param TcpConnection $connection
  84. * @param mixed $data
  85. */
  86. public function onGatewayMessage($connection, $data)
  87. {
  88. // 上下文数据
  89. Context::$client_ip = $data['client_ip'];
  90. Context::$client_port = $data['client_port'];
  91. Context::$local_ip = $data['local_ip'];
  92. Context::$local_port = $data['local_port'];
  93. Context::$client_id = $data['client_id'];
  94. // $_SERVER变量
  95. $_SERVER = array(
  96. 'REMOTE_ADDR' => Context::$client_ip,
  97. 'REMOTE_PORT' => Context::$client_port,
  98. 'GATEWAY_ADDR' => Context::$local_ip,
  99. 'GATEWAY_PORT' => Context::$local_port,
  100. 'GATEWAY_CLIENT_ID' => Context::$client_id,
  101. );
  102. // 尝试解析session
  103. if($data['ext_data'] != '')
  104. {
  105. $_SESSION = Context::sessionDecode($data['ext_data']);
  106. }
  107. else
  108. {
  109. $_SESSION = null;
  110. }
  111. // 备份一次$data['ext_data'],请求处理完毕后判断session是否和备份相等,不相等就更新session
  112. $session_str_copy = $data['ext_data'];
  113. $cmd = $data['cmd'];
  114. // 尝试执行Event::onConnection、Event::onMessage、Event::onClose
  115. try{
  116. switch($cmd)
  117. {
  118. case GatewayProtocol::CMD_ON_CONNECTION:
  119. Event::onConnect(Context::$client_id);
  120. break;
  121. case GatewayProtocol::CMD_ON_MESSAGE:
  122. Event::onMessage(Context::$client_id, $data['body']);
  123. break;
  124. case GatewayProtocol::CMD_ON_CLOSE:
  125. Event::onClose(Context::$client_id);
  126. break;
  127. }
  128. }
  129. catch(\Exception $e)
  130. {
  131. $msg = 'client_id:'.Context::$client_id."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
  132. $this->log($msg);
  133. }
  134. // 判断session是否被更改
  135. $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
  136. if($session_str_copy != $session_str_now)
  137. {
  138. \GatewayWorker\Lib\Gateway::updateSocketSession(Context::$client_id, $session_str_now);
  139. }
  140. Context::clear();
  141. }
  142. /**
  143. * 当与Gateway的连接断开时触发
  144. * @param TcpConnection $connection
  145. * @return void
  146. */
  147. public function onClose($connection)
  148. {
  149. unset($this->gatewayConnections[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
  150. }
  151. /**
  152. * 检查gateway的通信端口是否都已经连
  153. * 如果有未连接的端口,则尝试连接
  154. * @return void
  155. */
  156. public function checkGatewayConnections()
  157. {
  158. $key = 'GLOBAL_GATEWAY_ADDRESS';
  159. $addresses_list = Store::instance('gateway')->get($key);
  160. if(empty($addresses_list))
  161. {
  162. return;
  163. }
  164. foreach($addresses_list as $addr)
  165. {
  166. if(!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddress[$addr]) && !isset($this->_badGatewayAddress[$addr]))
  167. {
  168. $gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
  169. $gateway_connection->remoteAddress = $addr;
  170. $gateway_connection->onConnect = array($this, 'onConnectGateway');
  171. $gateway_connection->onMessage = array($this, 'onGatewayMessage');
  172. $gateway_connection->onClose = array($this, 'onClose');
  173. $gateway_connection->onError = array($this, 'onError');
  174. if(TcpConnection::$defaultMaxSendBufferSize == $gateway_connection->maxSendBufferSize)
  175. {
  176. $gateway_connection->maxSendBufferSize = 10*1024*1024;
  177. }
  178. $gateway_connection->connect();
  179. $this->_connectingGatewayAddress[$addr] = 1;
  180. }
  181. }
  182. }
  183. /**
  184. * 当连接上gateway的通讯端口时触发
  185. * 将连接connection对象保存起来
  186. * @param TcpConnection $connection
  187. * @return void
  188. */
  189. public function onConnectGateway($connection)
  190. {
  191. $this->gatewayConnections[$connection->remoteAddress] = $connection;
  192. unset($this->_badGatewayAddress[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
  193. }
  194. /**
  195. * 当与gateway的连接出现错误时触发
  196. * @param TcpConnection $connection
  197. * @param int $error_no
  198. * @param string $error_msg
  199. */
  200. public function onError($connection, $error_no, $error_msg)
  201. {
  202. $this->tryToDeleteGatewayAddress($connection->remoteAddress, $error_msg);
  203. }
  204. /**
  205. * 从存储中删除删除连不上的gateway通讯端口
  206. * @param string $addr
  207. * @param string $errstr
  208. */
  209. public function tryToDeleteGatewayAddress($addr, $errstr)
  210. {
  211. $key = 'GLOBAL_GATEWAY_ADDRESS';
  212. if(!isset($this->_badGatewayAddress[$addr]))
  213. {
  214. $this->_badGatewayAddress[$addr] = 0;
  215. }
  216. // 删除连不上的端口
  217. if($this->_badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
  218. {
  219. Lock::get();
  220. $addresses_list = Store::instance('gateway')->get($key);
  221. unset($addresses_list[$addr]);
  222. Store::instance('gateway')->set($key, $addresses_list);
  223. Lock::release();
  224. $this->log("tcp://$addr ".$errstr." del $addr from store", false);
  225. }
  226. }
  227. }