|
|
@@ -21,6 +21,19 @@ class BusinessWorker extends Man\Core\SocketWorker
|
|
|
protected $gatewayConnections = array();
|
|
|
|
|
|
/**
|
|
|
+ * 连不上的gateway地址
|
|
|
+ * ['ip:port' => retry_count, 'ip:port' => retry_count, ...]
|
|
|
+ * @var array
|
|
|
+ */
|
|
|
+ protected $badGatewayAddress = array();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 连接gateway失败重试次数
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
+ const MAX_RETRY_COUNT = 5;
|
|
|
+
|
|
|
+ /**
|
|
|
* 进程启动时初始化
|
|
|
* @see Man\Core.SocketWorker::onStart()
|
|
|
*/
|
|
|
@@ -120,12 +133,26 @@ class BusinessWorker extends Man\Core\SocketWorker
|
|
|
if(!isset($this->gatewayConnections[$addr]))
|
|
|
{
|
|
|
// 执行连接
|
|
|
- $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
|
|
|
+ $conn = @stream_socket_client("tcp://$addr", $errno, $errstr, 10);
|
|
|
if(!$conn)
|
|
|
{
|
|
|
- $this->notice($errstr);
|
|
|
+ if(!isset($this->badGatewayAddress[$addr]))
|
|
|
+ {
|
|
|
+ $this->badGatewayAddress[$addr] = 0;
|
|
|
+ }
|
|
|
+ // 删除连不上的端口
|
|
|
+ if($this->badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
|
|
|
+ {
|
|
|
+ $addresses_list = Store::get($key);
|
|
|
+ unset($addresses_list[$addr]);
|
|
|
+ Store::set($key, $addresses_list);
|
|
|
+ $this->notice("tcp://$addr ".$errstr." del $addr from store");
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ $this->notice("tcp://$addr ".$errstr);
|
|
|
continue;
|
|
|
}
|
|
|
+ unset($this->badGatewayAddress[$addr]);
|
|
|
$this->gatewayConnections[$addr] = $conn;
|
|
|
stream_set_blocking($this->gatewayConnections[$addr], 0);
|
|
|
|