BusinessWorker.php 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace GatewayWorker;
  15. use Workerman\Connection\TcpConnection;
  16. use \Workerman\Worker;
  17. use \Workerman\Connection\AsyncTcpConnection;
  18. use \Workerman\Protocols\GatewayProtocol;
  19. use \Workerman\Lib\Timer;
  20. use \GatewayWorker\Lib\Lock;
  21. use \GatewayWorker\Lib\Store;
  22. use \GatewayWorker\Lib\Context;
  23. use \Event;
  24. /**
  25. *
  26. * BusinessWorker 用于处理Gateway转发来的数据
  27. *
  28. * @author walkor<walkor@workerman.net>
  29. *
  30. */
  31. class BusinessWorker extends Worker
  32. {
  33. /**
  34. * 如果连接gateway通讯端口失败,尝试重试多少次
  35. * @var int
  36. */
  37. const MAX_RETRY_COUNT = 5;
  38. /**
  39. * 保存与gateway的连接connection对象
  40. * @var array
  41. */
  42. public $gatewayConnections = array();
  43. /**
  44. * 正在连接的gateway内部通讯地址
  45. * @var array
  46. */
  47. protected $_connectingGatewayAddress = array();
  48. /**
  49. * 连接失败gateway内部通讯地址
  50. * @var array
  51. */
  52. protected $_badGatewayAddress = array();
  53. /**
  54. * 保存用户设置的worker启动回调
  55. * @var callback
  56. */
  57. protected $_onWorkerStart = null;
  58. /**
  59. * 构造函数
  60. * @param string $socket_name
  61. * @param array $context_option
  62. */
  63. public function __construct($socket_name = '', $context_option = array())
  64. {
  65. parent::__construct($socket_name, $context_option);
  66. $backrace = debug_backtrace();
  67. $this->_appInitPath = dirname($backrace[0]['file']);
  68. }
  69. /**
  70. * 运行
  71. * @see Workerman.Worker::run()
  72. */
  73. public function run()
  74. {
  75. $this->_onWorkerStart = $this->onWorkerStart;
  76. $this->onWorkerStart = array($this, 'onWorkerStart');
  77. parent::run();
  78. }
  79. /**
  80. * 当进程启动时一些初始化工作
  81. * @return void
  82. */
  83. protected function onWorkerStart()
  84. {
  85. Timer::add(1, array($this, 'checkGatewayConnections'));
  86. $this->checkGatewayConnections();
  87. \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
  88. if($this->_onWorkerStart)
  89. {
  90. call_user_func($this->_onWorkerStart, $this);
  91. }
  92. }
  93. /**
  94. * 当gateway转发来数据时
  95. * @param TcpConnection $connection
  96. * @param mixed $data
  97. */
  98. public function onGatewayMessage($connection, $data)
  99. {
  100. // 上下文数据
  101. Context::$client_ip = $data['client_ip'];
  102. Context::$client_port = $data['client_port'];
  103. Context::$local_ip = $data['local_ip'];
  104. Context::$local_port = $data['local_port'];
  105. Context::$client_id = $data['client_id'];
  106. // $_SERVER变量
  107. $_SERVER = array(
  108. 'REMOTE_ADDR' => Context::$client_ip,
  109. 'REMOTE_PORT' => Context::$client_port,
  110. 'GATEWAY_ADDR' => Context::$local_ip,
  111. 'GATEWAY_PORT' => Context::$local_port,
  112. 'GATEWAY_CLIENT_ID' => Context::$client_id,
  113. );
  114. // 尝试解析session
  115. if($data['ext_data'] != '')
  116. {
  117. $_SESSION = Context::sessionDecode($data['ext_data']);
  118. }
  119. else
  120. {
  121. $_SESSION = null;
  122. }
  123. // 备份一次$data['ext_data'],请求处理完毕后判断session是否和备份相等,不相等就更新session
  124. $session_str_copy = $data['ext_data'];
  125. $cmd = $data['cmd'];
  126. // 尝试执行Event::onConnection、Event::onMessage、Event::onClose
  127. try{
  128. switch($cmd)
  129. {
  130. case GatewayProtocol::CMD_ON_CONNECTION:
  131. Event::onConnect(Context::$client_id);
  132. break;
  133. case GatewayProtocol::CMD_ON_MESSAGE:
  134. Event::onMessage(Context::$client_id, $data['body']);
  135. break;
  136. case GatewayProtocol::CMD_ON_CLOSE:
  137. Event::onClose(Context::$client_id);
  138. break;
  139. }
  140. }
  141. catch(\Exception $e)
  142. {
  143. $msg = 'client_id:'.Context::$client_id."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
  144. $this->log($msg);
  145. }
  146. // 判断session是否被更改
  147. $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
  148. if($session_str_copy != $session_str_now)
  149. {
  150. \GatewayWorker\Lib\Gateway::updateSocketSession(Context::$client_id, $session_str_now);
  151. }
  152. Context::clear();
  153. }
  154. /**
  155. * 当与Gateway的连接断开时触发
  156. * @param TcpConnection $connection
  157. * @return void
  158. */
  159. public function onClose($connection)
  160. {
  161. unset($this->gatewayConnections[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
  162. }
  163. /**
  164. * 检查gateway的通信端口是否都已经连
  165. * 如果有未连接的端口,则尝试连接
  166. * @return void
  167. */
  168. public function checkGatewayConnections()
  169. {
  170. $key = 'GLOBAL_GATEWAY_ADDRESS';
  171. $addresses_list = Store::instance('gateway')->get($key);
  172. if(empty($addresses_list))
  173. {
  174. return;
  175. }
  176. foreach($addresses_list as $addr)
  177. {
  178. if(!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddress[$addr]) && !isset($this->_badGatewayAddress[$addr]))
  179. {
  180. $gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
  181. $gateway_connection->remoteAddress = $addr;
  182. $gateway_connection->onConnect = array($this, 'onConnectGateway');
  183. $gateway_connection->onMessage = array($this, 'onGatewayMessage');
  184. $gateway_connection->onClose = array($this, 'onClose');
  185. $gateway_connection->onError = array($this, 'onError');
  186. if(TcpConnection::$defaultMaxSendBufferSize == $gateway_connection->maxSendBufferSize)
  187. {
  188. $gateway_connection->maxSendBufferSize = 10*1024*1024;
  189. }
  190. $gateway_connection->connect();
  191. $this->_connectingGatewayAddress[$addr] = 1;
  192. }
  193. }
  194. }
  195. /**
  196. * 当连接上gateway的通讯端口时触发
  197. * 将连接connection对象保存起来
  198. * @param TcpConnection $connection
  199. * @return void
  200. */
  201. public function onConnectGateway($connection)
  202. {
  203. $this->gatewayConnections[$connection->remoteAddress] = $connection;
  204. unset($this->_badGatewayAddress[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
  205. }
  206. /**
  207. * 当与gateway的连接出现错误时触发
  208. * @param TcpConnection $connection
  209. * @param int $error_no
  210. * @param string $error_msg
  211. */
  212. public function onError($connection, $error_no, $error_msg)
  213. {
  214. $this->tryToDeleteGatewayAddress($connection->remoteAddress, $error_msg);
  215. }
  216. /**
  217. * 从存储中删除删除连不上的gateway通讯端口
  218. * @param string $addr
  219. * @param string $errstr
  220. */
  221. public function tryToDeleteGatewayAddress($addr, $errstr)
  222. {
  223. $key = 'GLOBAL_GATEWAY_ADDRESS';
  224. if(!isset($this->_badGatewayAddress[$addr]))
  225. {
  226. $this->_badGatewayAddress[$addr] = 0;
  227. }
  228. // 删除连不上的端口
  229. if($this->_badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
  230. {
  231. Lock::get();
  232. $addresses_list = Store::instance('gateway')->get($key);
  233. unset($addresses_list[$addr]);
  234. Store::instance('gateway')->set($key, $addresses_list);
  235. Lock::release();
  236. $this->log("tcp://$addr ".$errstr." del $addr from store", false);
  237. }
  238. }
  239. }