Преглед на файлове

利用BusinessWorker向gateway发数据

walkor преди 11 години
родител
ревизия
47adce4527
променени са 2 файла, в които са добавени 54 реда и са изтрити 4 реда
  1. 36 0
      applications/ChatDemo/Bootstrap/BusinessWorker.php
  2. 18 4
      applications/ChatDemo/Lib/Gateway.php

+ 36 - 0
applications/ChatDemo/Bootstrap/BusinessWorker.php

@@ -14,6 +14,12 @@ require_once ROOT_DIR . '/Lib/APLog.php';
 class BusinessWorker extends Man\Core\SocketWorker
 {
     /**
+     * BusinessWorker 实例
+     * @var BusinessWorker
+     */
+    protected static $instance = null;
+    
+    /**
      * 与gateway的连接
      * ['ip:port' => conn, 'ip:port' => conn, ...]
      * @var array
@@ -29,6 +35,23 @@ class BusinessWorker extends Man\Core\SocketWorker
         // 定时检查与gateway进程的连接
         \Man\Core\Lib\Task::init($this->event);
         \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
+        self::$instance = $this;
+    }
+    
+    /**
+     * 获取实例
+     */
+    public static function getInstance()
+    {
+        return self::$instance;
+    }
+    
+    /**
+     * 获取与网关的连接
+     */
+    public static function getGatewayConnections()
+    {
+        return self::$gatewayConnections;
     }
     
     /**
@@ -103,6 +126,19 @@ class BusinessWorker extends Man\Core\SocketWorker
     }
     
     /**
+     * 发送数据给客户端
+     * @see Man\Core.SocketWorker::sendToClient()
+     */
+    public function sendToClient($buffer, $con = null)
+    {
+        if($con)
+        {
+            $this->currentDealFd = (int) $con;
+        }
+        return parent::sendToClient($buffer);
+    }
+    
+    /**
      * 关闭连接
      * @see Man\Core.SocketWorker::closeClient()
      */

+ 18 - 4
applications/ChatDemo/Lib/Gateway.php

@@ -28,10 +28,15 @@ class GateWay
        $pack->header['uid'] = Context::$uid;
        $pack->body = (string)$message;
        $buffer = $pack->getBuffer();
-       $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS');
+       /* $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS');
        foreach($all_addresses as $address)
        {
            self::sendToGateway($address, $buffer);
+       } */
+       $worker_instance = BusinessWorker::getInstance();
+       foreach(BusinessWorker::getGatewayConnections() as $con)
+       {
+           $worker_instance->sendToClient($buffer, $con);
        }
    }
    
@@ -198,8 +203,17 @@ class GateWay
     */
    public static function sendToGateway($address, $buffer)
    {
-       $client = stream_socket_client($address, $errno, $errmsg, 1, STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT);
-       $len = stream_socket_sendto($client, $buffer);
-       return $len == strlen($buffer);
+       $connections = BusinessWorker::getGatewayConnections();
+       if(!isset($connections[$address]))
+       {
+           $e = new \Exception("sendToGateway($address, $buffer) fail \$connections:".json_encode($connections));
+           APLog::add($e->__toString());
+           return false;
+       }
+       return BusinessWorker::getInstance()->sendToClient($buffer, $connections[$address]);
+       
+       //$client = stream_socket_client($address, $errno, $errmsg, 1, STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT);
+       //$len = stream_socket_sendto($client, $buffer);
+       //return $len == strlen($buffer);
    }
 }