Kaynağa Gözat

文件调整 注释 优化

walkor 11 yıl önce
ebeveyn
işleme
b0d85a9171

+ 19 - 50
applications/ChatDemo/Bootstrap/BusinessWorker.php

@@ -7,24 +7,17 @@
  * 
  */
 define('ROOT_DIR', realpath(__DIR__.'/../'));
-require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
+require_once ROOT_DIR . '/Lib/Gateway.php';
 require_once ROOT_DIR . '/Event.php';
-require_once ROOT_DIR . '/Lib/APLog.php';
 
 class BusinessWorker extends Man\Core\SocketWorker
 {
     /**
-     * BusinessWorker 实例
-     * @var BusinessWorker
-     */
-    protected static $instance = null;
-    
-    /**
      * 与gateway的连接
      * ['ip:port' => conn, 'ip:port' => conn, ...]
      * @var array
      */
-    protected static $gatewayConnections = array();
+    protected $gatewayConnections = array();
     
     /**
      * 进程启动时初始化
@@ -35,27 +28,19 @@ class BusinessWorker extends Man\Core\SocketWorker
         // 定时检查与gateway进程的连接
         \Man\Core\Lib\Task::init($this->event);
         \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
-        self::$instance = $this;
-    }
-    
-    /**
-     * 获取实例
-     */
-    public static function getInstance()
-    {
-        return self::$instance;
+        GateWay::setBusinessWorker($this);
     }
     
     /**
-     * 获取与网关的连接
+     * 获取与gateway的连接
      */
     public static function getGatewayConnections()
     {
-        return self::$gatewayConnections;
+        return $this->gatewayConnections;
     }
     
     /**
-     * 检查请求是否完整
+     * 检查gateway转发来的用户请求是否完整
      * @see Man\Core.SocketWorker::dealInput()
      */
     public function dealInput($recv_str)
@@ -93,7 +78,7 @@ class BusinessWorker extends Man\Core\SocketWorker
     }
     
     /**
-     * 定时检查gateway通信端口
+     * 定时检查gateway通信端口,如果有新的gateway则去建立长连接
      */
     public function checkGatewayConnections()
     {
@@ -104,22 +89,26 @@ class BusinessWorker extends Man\Core\SocketWorker
             return;
         }
        
+        // 循环遍历,查找未连接的gateway ip 端口
         foreach($addresses_list as $addr)
         {
-            if(!isset(self::$gatewayConnections[$addr]))
+            if(!isset($this->gatewayConnections[$addr]))
             {
+                // 执行连接
                 $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
                 if(!$conn)
                 {
                     $this->notice($errstr);
                     continue;
                 }
-                self::$gatewayConnections[$addr] = $conn;
-                stream_set_blocking(self::$gatewayConnections[$addr], 0);
+                $this->gatewayConnections[$addr] = $conn;
+                stream_set_blocking($this->gatewayConnections[$addr], 0);
                 
-                $fd = (int) self::$gatewayConnections[$addr];
-                $this->connections[$fd] = self::$gatewayConnections[$addr];
+                // 初始化一些值
+                $fd = (int) $this->gatewayConnections[$addr];
+                $this->connections[$fd] = $this->gatewayConnections[$addr];
                 $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
+                // 添加数据可读事件
                 $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
             }
         }
@@ -144,36 +133,16 @@ class BusinessWorker extends Man\Core\SocketWorker
      */
     protected function closeClient($fd)
     {
-        foreach(self::$gatewayConnections as $addr => $con)
+        // 清理$this->gatewayConnections对应项
+        foreach($this->gatewayConnections as $addr => $con)
         {
             $the_fd = (int) $con;
             if($the_fd == $fd)
             {
-                unset(self::$gatewayConnections[$addr]);
+                unset($this->gatewayConnections[$addr]);
             }
         }
         parent::closeClient($fd);
     }
     
 }
-
-
-/**
- * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
- * @author walkor
- *
- */
-class Context
-{
-    public static $series_id;
-    public static $local_ip;
-    public static $local_port;
-    public static $socket_id;
-    public static $client_ip;
-    public static $client_port;
-    public static $uid;
-    public static function clear()
-    {
-        self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;
-    }
-}

+ 110 - 45
applications/ChatDemo/Bootstrap/Gateway.php

@@ -32,7 +32,6 @@ class Gateway extends Man\Core\SocketWorker
      */
     protected $lanIp = '127.0.0.1';
     
-    
     /**
      * 内部通信端口
      * @var int
@@ -51,13 +50,6 @@ class Gateway extends Man\Core\SocketWorker
      */
     protected $connUidMap = array();
     
-    
-    /**
-     * 到Worker的通信地址
-     * @var array
-     */ 
-    protected $workerAddresses = array();
-    
     /**
      * 与worker的连接
      * [fd=>fd, $fd=>fd, ..]
@@ -81,6 +73,35 @@ class Gateway extends Man\Core\SocketWorker
     protected $pingData = '';
     
     /**
+     * 由于网络延迟或者socket缓冲区大小的限制,客户端发来的数据可能不会都全部到达,需要根据协议判断数据是否完整
+     * @see Man\Core.SocketWorker::dealInput()
+     */
+    public function dealInput($recv_str)
+    {
+        // 这个聊天demo发送数据量都很小,一般都小于一个ip数据包,所以没有判断长度,直接返回了0,表示数据全部到达
+        // 其它应用应该根据客户端协议来判断数据是否完整
+        return 0;
+    }
+    
+    /**
+     * 用户客户端发来消息时处理
+     * @see Man\Core.SocketWorker::dealProcess()
+     */
+    public function dealProcess($recv_str)
+    {
+        // 判断用户是否认证过
+        $from_uid = $this->getUidByFd($this->currentDealFd);
+        // 触发ON_CONNECTION
+        if(!$from_uid)
+        {
+            return $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $this->currentDealFd, $recv_str);
+        }
+    
+        // 认证过, 触发ON_MESSAGE
+        $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_str);
+    }
+    
+    /**
      * 进程启动
      */
     public function start()
@@ -91,8 +112,9 @@ class Gateway extends Man\Core\SocketWorker
         // 添加accept事件
         $ret = $this->event->add($this->mainSocket,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'accept'));
         
-        // 创建内部通信套接字
+        // 创建内部通信套接字,用于与BusinessWorker通讯
         $start_port = Man\Core\Lib\Config::get($this->workerName.'.lan_port_start');
+        // 计算本进程监听的ip端口
         $this->lanPort = $start_port - posix_getppid() + posix_getpid();
         $this->lanIp = Man\Core\Lib\Config::get($this->workerName.'.lan_ip');
         if(!$this->lanIp)
@@ -102,13 +124,14 @@ class Gateway extends Man\Core\SocketWorker
         }
         $error_no_udp = $error_no_tcp = 0;
         $error_msg_udp = $error_msg_tcp = '';
+        // 执行监听
         $this->innerMainSocketUdp = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no_udp, $error_msg_udp, STREAM_SERVER_BIND);
         $this->innerMainSocketTcp = stream_socket_server("tcp://".$this->lanIp.':'.$this->lanPort, $error_no_tcp, $error_msg_tcp, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
+        // 出错,退出,下次会换个端口
         if(!$this->innerMainSocketUdp || !$this->innerMainSocketTcp)
         {
             $this->notice('create innerMainSocket udp or tcp fail and exit '.$error_msg_udp.$error_msg_tcp);
-            sleep(1);
-            exit(0);
+            $this->stop();
         }
         else
         {
@@ -123,9 +146,6 @@ class Gateway extends Man\Core\SocketWorker
         $this->event->add($this->innerMainSocketUdp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
         $this->event->add($this->innerMainSocketTcp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTcp'));
         
-        // 初始化到worker的通信地址
-        $this->initWorkerAddresses();
-        
         // 初始化心跳包时间间隔
         $ping_interval = \Man\Core\Lib\Config::get($this->workerName.'.ping_interval');
         if((int)$ping_interval > 0)
@@ -153,7 +173,9 @@ class Gateway extends Man\Core\SocketWorker
         
         // 主体循环,整个子进程会阻塞在这个函数上
         $ret = $this->event->loop();
+        // 下面正常不会执行到
         $this->notice('worker loop exit');
+        // 执行到就退出
         exit(0);
     }
     
@@ -163,6 +185,7 @@ class Gateway extends Man\Core\SocketWorker
      */
     protected function registerAddress($address)
     {
+        // 这里使用了信号量只能实现单机互斥,分布式互斥需要借助于memcache incr 或者其他分布式存储
         \Man\Core\Lib\Mutex::get();
         $key = 'GLOBAL_GATEWAY_ADDRESS';
         $addresses_list = Store::get($key);
@@ -181,6 +204,7 @@ class Gateway extends Man\Core\SocketWorker
      */
     protected function unregisterAddress($address)
     {
+        // 这里使用了信号量只能实现单机互斥,分布式互斥需要借助于memcache incr 或者其他分布式存储
         \Man\Core\Lib\Mutex::get();
         $key = 'GLOBAL_GATEWAY_ADDRESS';
         $addresses_list = Store::get($key);
@@ -215,12 +239,16 @@ class Gateway extends Man\Core\SocketWorker
         $this->innerDealProcess($data);
     }
     
-    
+    /**
+     * 内部通讯端口接受BusinessWorker连接请求,以便建立起长连接
+     * @param resouce $socket
+     * @param null $null_one
+     * @param null $null_two
+     */
     public function acceptTcp($socket, $null_one = null, $null_two = null)
     {
         // 获得一个连接
         $new_connection = @stream_socket_accept($socket, 0);
-        // 可能是惊群效应
         if(false === $new_connection)
         {
             return false;
@@ -234,10 +262,15 @@ class Gateway extends Man\Core\SocketWorker
         // 非阻塞
         stream_set_blocking($this->connections[$fd], 0);
         $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'recvTcp'), $fd);
+        // 标记这个连接是内部通讯长连接,区别于客户端连接
         $this->workerConnections[$fd] = $fd;
         return $new_connection;
     }
     
+    /**
+     * 内部通讯判断数据是否全部到达
+     * @param string $buffer
+     */
     public function dealInnerInput($buffer)
     {
         return GatewayProtocol::input($buffer);
@@ -285,7 +318,7 @@ class Gateway extends Man\Core\SocketWorker
         {
             // 执行处理
             try{
-                // 业务处理
+                // 内部通讯业务处理
                 $this->innerDealProcess($this->recvBuffers[$fd]['buf']);
             }
             catch(\Exception $e)
@@ -317,21 +350,11 @@ class Gateway extends Man\Core\SocketWorker
             pcntl_alarm(self::EXIT_WAIT_TIME);
         }
     }
-    
-    protected function initWorkerAddresses()
-    {
-        $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.business_worker');
-        if(!$this->workerAddresses)
-        {
-            $this->notice($this->workerName.'business_worker not set');
-        }
-    }
-    
-    public function dealInput($recv_str)
-    {
-        return 0;
-    }
 
+    /**
+     * 内部通讯处理
+     * @param string $recv_str
+     */
     public function innerDealProcess($recv_str)
     {
         $pack = new GatewayProtocol($recv_str);
@@ -355,6 +378,10 @@ class Gateway extends Man\Core\SocketWorker
         }
     }
     
+    /**
+     * 广播数据
+     * @param string $bin_data
+     */
     protected function broadCast($bin_data)
     {
         foreach($this->uidConnMap as $uid=>$conn)
@@ -363,6 +390,10 @@ class Gateway extends Man\Core\SocketWorker
         }
     }
     
+    /**
+     * 根据socket_id关闭与客户端的连接,实际上是踢人操作
+     * @param int $socket_id
+     */
     protected function closeClientBySocketId($socket_id)
     {
         if($uid = $this->getUidByFd($socket_id))
@@ -372,6 +403,10 @@ class Gateway extends Man\Core\SocketWorker
         parent::closeClient($socket_id);
     }
     
+    /**
+     * 根据uid获取uid对应连接的id
+     * @param int $uid
+     */
     protected function getFdByUid($uid)
     {
         if(isset($this->uidConnMap[$uid]))
@@ -381,6 +416,10 @@ class Gateway extends Man\Core\SocketWorker
         return 0;
     }
     
+    /**
+     * 根据连接id获取用户uid
+     * @param int $fd
+     */
     protected function getUidByFd($fd)
     {
         if(isset($this->connUidMap[$fd]))
@@ -390,6 +429,12 @@ class Gateway extends Man\Core\SocketWorker
         return 0;
     }
     
+    /**
+     * BusinessWorker通知本Gateway进程$uid用户合法,绑定到$socket_id
+     * 后面这个socketid再有消息传来,会自动带上uid传递给BusinessWorker
+     * @param int $socket_id
+     * @param int $uid
+     */
     protected function connectSuccess($socket_id, $uid)
     {
         $binded_uid = $this->getUidByFd($socket_id);
@@ -402,6 +447,11 @@ class Gateway extends Man\Core\SocketWorker
         $this->connUidMap[$socket_id] = $uid;
     }
     
+    /**
+     * 向某个socketId的连接发送消息
+     * @param int $socket_id
+     * @param string $bin_data
+     */
     public function sendToSocketId($socket_id, $bin_data)
     {
         if(!isset($this->connections[$socket_id]))
@@ -412,6 +462,10 @@ class Gateway extends Man\Core\SocketWorker
         return $this->sendToClient($bin_data);
     }
 
+    /**
+     * 用户客户端主动关闭连接触发
+     * @see Man\Core.SocketWorker::closeClient()
+     */
     protected function closeClient($fd)
     {
         if($uid = $this->getUidByFd($fd))
@@ -422,26 +476,22 @@ class Gateway extends Man\Core\SocketWorker
         parent::closeClient($fd);
     }
     
+    /**
+     * 内部通讯socket在BusinessWorker主动关闭连接时触发
+     * @param int $fd
+     */
     protected function closeInnerClient($fd)
     {
         unset($this->workerConnections[$fd]);
         parent::closeClient($fd);
     }
     
-    public function dealProcess($recv_str)
-    {
-        // 判断用户是否认证过
-        $from_uid = $this->getUidByFd($this->currentDealFd);
-        // 触发ON_CONNECTION
-        if(!$from_uid)
-        {
-            return $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $this->currentDealFd, $recv_str);
-        }
-        
-        // 认证过, 触发ON_MESSAGE
-        $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_str);
-    }
-    
+    /**
+     * 随机抽取一个与BusinessWorker的长连接,将数据发给一个BusinessWorker
+     * @param int $cmd
+     * @param int $socket_id
+     * @param string $body
+     */
     protected function sendToWorker($cmd, $socket_id, $body = '')
     {
         $address= $this->getRemoteAddress($socket_id);
@@ -458,12 +508,20 @@ class Gateway extends Man\Core\SocketWorker
         return $this->sendBufferToWorker($pack->getBuffer());
     }
     
+    /**
+     * 随机抽取一个与BusinessWorker的长连接,将数据发给一个BusinessWorker
+     * @param string $bin_data
+     */
     protected function sendBufferToWorker($bin_data)
     {
         $this->currentDealFd = array_rand($this->workerConnections);
         return $this->sendToClient($bin_data);
     }
     
+    /**
+     * 打印日志
+     * @see Man\Core.AbstractWorker::notice()
+     */
     protected function notice($str, $display=true)
     {
         $str = 'Worker['.get_class($this).']:'."$str ip:".$this->getRemoteIp();
@@ -474,6 +532,10 @@ class Gateway extends Man\Core\SocketWorker
         }
     }
     
+    /**
+     * 进程停止时,清除一些数据
+     * @see Man\Core.SocketWorker::onStop()
+     */
     public function onStop()
     {
         $this->unregisterAddress($this->lanIp.':'.$this->lanPort);
@@ -483,6 +545,9 @@ class Gateway extends Man\Core\SocketWorker
         }
     }
     
+    /**
+     * 向认证的用户发送心跳数据
+     */
     public function ping()
     {
         $this->broadCast($this->pingData);

+ 19 - 0
applications/ChatDemo/Lib/Context.php

@@ -0,0 +1,19 @@
+<?php
+/**
+ * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
+ * @author walkor
+ */
+class Context
+{
+    public static $series_id;
+    public static $local_ip;
+    public static $local_port;
+    public static $socket_id;
+    public static $client_ip;
+    public static $client_port;
+    public static $uid;
+    public static function clear()
+    {
+        self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;
+    }
+}

+ 51 - 17
applications/ChatDemo/Lib/Gateway.php

@@ -7,10 +7,34 @@
  * 
  */
 
+if(!defined('ROOT_DIR'))
+{
+    define('ROOT_DIR', __DIR__."/../");
+}
+
 require_once ROOT_DIR . '/Lib/Store.php';
+require_once ROOT_DIR . '/Lib/Context.php';
+require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
+require_once ROOT_DIR . '/Protocols/APLog.php';
 
 class GateWay
 {
+    
+    /**
+     * gateway实例
+     * @var object
+     */
+    protected static  $businessWorker = null;
+    
+    /**
+     * 设置gateway实例,用于与
+     * @param unknown_type $gateway_instance
+     */
+    public static function setBusinessWorker($business_worker_instance)
+    {
+        self::$businessWorker = $business_worker_instance;
+    }
+    
    /**
     * 向所有客户端广播消息
     * @param string $message
@@ -28,15 +52,22 @@ class GateWay
        $pack->header['uid'] = Context::$uid;
        $pack->body = (string)$message;
        $buffer = $pack->getBuffer();
-       /* $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS');
-       foreach($all_addresses as $address)
+       // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
+       if(self::$businessWorker)
        {
-           self::sendToGateway($address, $buffer);
-       } */
-       $worker_instance = BusinessWorker::getInstance();
-       foreach(BusinessWorker::getGatewayConnections() as $con)
+           foreach(self::$businessWorker->getGatewayConnections() as $con)
+           {
+               self::$businessWorker->sendToClient($buffer, $con);
+           }
+       }
+       // 运行在其它环境中,使用udp向worker发送数据
+       else
        {
-           $worker_instance->sendToClient($buffer, $con);
+           $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS');
+           foreach($all_addresses as $address)
+           {
+               self::sendToGateway($address, $buffer);
+           }
        }
    }
    
@@ -203,17 +234,20 @@ class GateWay
     */
    public static function sendToGateway($address, $buffer)
    {
-       $connections = BusinessWorker::getGatewayConnections();
-       if(!isset($connections[$address]))
+       // 有$businessWorker说明是workerman环境,使用$businessWorker发送数据
+       if(self::$businessWorker)
        {
-           $e = new \Exception("sendToGateway($address, $buffer) fail \$connections:".json_encode($connections));
-           APLog::add($e->__toString());
-           return false;
+           $connections = self::$businessWorker->getGatewayConnections();
+           if(!isset($connections[$address]))
+           {
+               $e = new \Exception("sendToGateway($address, $buffer) fail \$connections:".json_encode($connections));
+               APLog::add($e->__toString());
+               return false;
+           }
+           return BusinessWorker::getInstance()->sendToClient($buffer, $connections[$address]);
        }
-       return BusinessWorker::getInstance()->sendToClient($buffer, $connections[$address]);
-       
-       //$client = stream_socket_client($address, $errno, $errmsg, 1, STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT);
-       //$len = stream_socket_sendto($client, $buffer);
-       //return $len == strlen($buffer);
+       // 非workerman环境,使用udp发送数据
+       $client = stream_socket_client("udp://$address", $errno, $errmsg);
+       return strlen($buffer) == stream_socket_sendto($client, $buffer);
    }
 }

+ 1 - 2
workerman/Core/AbstractWorker.php

@@ -204,8 +204,7 @@ abstract class AbstractWorker
             {
                 $error_msg .= $this->getErrorType($errors['type']) . " {$errors['message']} in {$errors['file']} on line {$errors['line']}";
             }
-            $e = new \Exception();
-            $this->notice($error_msg."\n" . $e->getTraceAsString());
+            $this->notice($error_msg);
         }
     }
     

+ 0 - 4
workerman/conf/conf.d/Gateway.conf

@@ -34,10 +34,6 @@ lan_ip = 127.0.0.1
 ;内部通讯端口起始值,假如开启5个gateway进程,则每个进程会监听一个端口,40001 40002 40003 40004 40005
 lan_port_start = 40000
 
-;业务进程通讯传输层协议、ip、端口
-business_worker[] = tcp://127.0.0.1:8483
-business_worker[] = tcp://127.0.0.1:8483
-
 ;此gateway进程向客户端发送心跳时间间隔 单位:秒
 ping_interval = 10