BusinessWorker.php 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. <?php
  2. namespace GatewayWorker;
  3. use \Workerman\Worker;
  4. use \Workerman\Connection\AsyncTcpConnection;
  5. use \Workerman\Protocols\GatewayProtocol;
  6. use \Workerman\Lib\Timer;
  7. use \GatewayWorker\Lib\Lock;
  8. use \GatewayWorker\Lib\Store;
  9. use \GatewayWorker\Lib\Context;
  10. use \GatewayWorker\Lib\Autoloader;
  11. use \Event;
  12. class BusinessWorker extends Worker
  13. {
  14. const MAX_RETRY_COUNT = 5;
  15. public $gatewayConnections = array();
  16. public $badGatewayAddress = array();
  17. protected $_rootPath = '';
  18. public function __construct($socket_name = '', $context_option = array())
  19. {
  20. $this->onWorkerStart = array($this, 'onWorkerStart');
  21. $backrace = debug_backtrace();
  22. $this->_rootPath = dirname($backrace[0]['file']);
  23. parent::__construct($socket_name, $context_option);
  24. }
  25. protected function onWorkerStart()
  26. {
  27. Autoloader::setRootPath($this->_rootPath);
  28. Timer::add(1, array($this, 'checkGatewayConnections'));
  29. $this->checkGatewayConnections();
  30. \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
  31. }
  32. public function onGatewayMessage($connection, $data)
  33. {
  34. Context::$client_ip = $data['client_ip'];
  35. Context::$client_port = $data['client_port'];
  36. Context::$local_ip = $data['local_ip'];
  37. Context::$local_port = $data['local_port'];
  38. Context::$client_id = $data['client_id'];
  39. $_SERVER = array(
  40. 'REMOTE_ADDR' => Context::$client_ip,
  41. 'REMOTE_PORT' => Context::$client_port,
  42. 'GATEWAY_ADDR' => Context::$local_ip,
  43. 'GATEWAY_PORT' => Context::$local_port,
  44. 'GATEWAY_CLIENT_ID' => Context::$client_id,
  45. );
  46. if($data['ext_data'] != '')
  47. {
  48. $_SESSION = Context::sessionDecode($data['ext_data']);
  49. }
  50. else
  51. {
  52. $_SESSION = null;
  53. }
  54. // 备份一次$data['ext_data'],请求处理完毕后判断session是否和备份相等,不相等就更新session
  55. $session_str_copy = $data['ext_data'];
  56. $cmd = $data['cmd'];
  57. try{
  58. switch($cmd)
  59. {
  60. case GatewayProtocol::CMD_ON_CONNECTION:
  61. Event::onConnect(Context::$client_id);
  62. break;
  63. case GatewayProtocol::CMD_ON_MESSAGE:
  64. Event::onMessage(Context::$client_id, $data['body']);
  65. break;
  66. case GatewayProtocol::CMD_ON_CLOSE:
  67. Event::onClose(Context::$client_id);
  68. break;
  69. }
  70. }
  71. catch(\Exception $e)
  72. {
  73. $msg = 'client_id:'.Context::$client_id."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
  74. $this->log($msg);
  75. }
  76. $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
  77. if($session_str_copy != $session_str_now)
  78. {
  79. \GatewayWorker\Lib\Gateway::updateSocketSession(Context::$client_id, $session_str_now);
  80. }
  81. Context::clear();
  82. }
  83. public function onClose($connection)
  84. {
  85. unset($this->gatewayConnections[$connection->remoteAddress]);
  86. }
  87. public function checkGatewayConnections()
  88. {
  89. $key = 'GLOBAL_GATEWAY_ADDRESS';
  90. $addresses_list = Store::instance('gateway')->get($key);
  91. if(empty($addresses_list))
  92. {
  93. return;
  94. }
  95. foreach($addresses_list as $addr)
  96. {
  97. if(!isset($this->gatewayConnections[$addr]))
  98. {
  99. $gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr", self::$_globalEvent);
  100. $gateway_connection->remoteAddress = $addr;
  101. $gateway_connection->onConnect = array($this, 'onConnectGateway');
  102. $gateway_connection->onMessage = array($this, 'onGatewayMessage');
  103. $gateway_connection->onClose = array($this, 'onClose');
  104. $gateway_connection->onError = array($this, 'onError');
  105. }
  106. }
  107. }
  108. public function onConnectGateway($connection)
  109. {
  110. $this->gatewayConnections[$connection->remoteAddress] = $connection;
  111. unset($this->badGatewayAddress[$connection->remoteAddress]);
  112. }
  113. public function onError($connection, $error_no, $error_msg)
  114. {
  115. $this->tryToDeleteGatewayAddress($connection->remoteAddress, $error_msg);
  116. }
  117. public function tryToDeleteGatewayAddress($addr, $errstr)
  118. {
  119. $key = 'GLOBAL_GATEWAY_ADDRESS';
  120. if(!isset($this->badGatewayAddress[$addr]))
  121. {
  122. $this->badGatewayAddress[$addr] = 0;
  123. }
  124. // 删除连不上的端口
  125. if($this->badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
  126. {
  127. Lock::get();
  128. $addresses_list = Store::instance('gateway')->get($key);
  129. unset($addresses_list[$addr]);
  130. Store::instance('gateway')->set($key, $addresses_list);
  131. Lock::release();
  132. $this->log("tcp://$addr ".$errstr." del $addr from store", false);
  133. }
  134. }
  135. }