Browse Source

Merge branch 'master' of https://github.com/walkor/workerman

liangl 11 years ago
parent
commit
0b6c618241

+ 104 - 21
applications/ChatDemo/Bootstrap/BusinessWorker.php

@@ -7,16 +7,52 @@
  * 
  */
 define('ROOT_DIR', realpath(__DIR__.'/../'));
-require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
+require_once ROOT_DIR . '/Lib/Gateway.php';
 require_once ROOT_DIR . '/Event.php';
 
 class BusinessWorker extends Man\Core\SocketWorker
 {
+    /**
+     * 与gateway的连接
+     * ['ip:port' => conn, 'ip:port' => conn, ...]
+     * @var array
+     */
+    protected $gatewayConnections = array();
+    
+    /**
+     * 进程启动时初始化
+     * @see Man\Core.SocketWorker::onStart()
+     */
+    protected function onStart()
+    {
+        // 定时检查与gateway进程的连接
+        \Man\Core\Lib\Task::init($this->event);
+        \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
+        $this->checkGatewayConnections();
+        GateWay::setBusinessWorker($this);
+    }
+    
+    /**
+     * 获取与gateway的连接
+     */
+    public function getGatewayConnections()
+    {
+        return $this->gatewayConnections;
+    }
+    
+    /**
+     * 检查gateway转发来的用户请求是否完整
+     * @see Man\Core.SocketWorker::dealInput()
+     */
     public function dealInput($recv_str)
     {
         return GatewayProtocol::input($recv_str); 
     }
 
+    /**
+     * 处理请求
+     * @see Man\Core.SocketWorker::dealProcess()
+     */
     public function dealProcess($recv_str)
     {
         $pack = new GatewayProtocol($recv_str);
@@ -26,7 +62,6 @@ class BusinessWorker extends Man\Core\SocketWorker
         Context::$local_port = $pack->header['local_port'];
         Context::$socket_id = $pack->header['socket_id'];
         Context::$uid = $pack->header['uid'];
-        Context::$protocol = $this->protocol;
         switch($pack->header['cmd'])
         {
             case GatewayProtocol::CMD_ON_CONNECTION:
@@ -42,25 +77,73 @@ class BusinessWorker extends Man\Core\SocketWorker
         Context::clear();
         return $ret;
     }
-}
-
-/**
- * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
- * @author walkor
- *
- */
-class Context
-{
-    public static $series_id;
-    public static $local_ip;
-    public static $local_port;
-    public static $socket_id;
-    public static $client_ip;
-    public static $client_port;
-    public static $uid;
-    public static $protocol;
-    public static function clear()
+    
+    /**
+     * 定时检查gateway通信端口,如果有新的gateway则去建立长连接
+     */
+    public function checkGatewayConnections()
     {
-        self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;
+        $key = 'GLOBAL_GATEWAY_ADDRESS';
+        $addresses_list = Store::get($key);
+        if(empty($addresses_list))
+        {
+            return;
+        }
+       
+        // 循环遍历,查找未连接的gateway ip 端口
+        foreach($addresses_list as $addr)
+        {
+            if(!isset($this->gatewayConnections[$addr]))
+            {
+                // 执行连接
+                $conn = stream_socket_client("tcp://$addr", $errno, $errstr, 1);
+                if(!$conn)
+                {
+                    $this->notice($errstr);
+                    continue;
+                }
+                $this->gatewayConnections[$addr] = $conn;
+                stream_set_blocking($this->gatewayConnections[$addr], 0);
+                
+                // 初始化一些值
+                $fd = (int) $this->gatewayConnections[$addr];
+                $this->connections[$fd] = $this->gatewayConnections[$addr];
+                $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
+                // 添加数据可读事件
+                $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
+            }
+        }
+    }
+    
+    /**
+     * 发送数据给客户端
+     * @see Man\Core.SocketWorker::sendToClient()
+     */
+    public function sendToClient($buffer, $con = null)
+    {
+        if($con)
+        {
+            $this->currentDealFd = (int) $con;
+        }
+        return parent::sendToClient($buffer);
+    }
+    
+    /**
+     * 关闭连接
+     * @see Man\Core.SocketWorker::closeClient()
+     */
+    protected function closeClient($fd)
+    {
+        // 清理$this->gatewayConnections对应项
+        foreach($this->gatewayConnections as $addr => $con)
+        {
+            $the_fd = (int) $con;
+            if($the_fd == $fd)
+            {
+                unset($this->gatewayConnections[$addr]);
+            }
+        }
+        parent::closeClient($fd);
     }
+    
 }

+ 132 - 61
applications/ChatDemo/Bootstrap/Gateway.php

@@ -32,7 +32,6 @@ class Gateway extends Man\Core\SocketWorker
      */
     protected $lanIp = '127.0.0.1';
     
-    
     /**
      * 内部通信端口
      * @var int
@@ -51,12 +50,12 @@ class Gateway extends Man\Core\SocketWorker
      */
     protected $connUidMap = array();
     
-    
     /**
-     * 到Worker的通信地址
+     * 与worker的连接
+     * [fd=>fd, $fd=>fd, ..]
      * @var array
-     */ 
-    protected $workerAddresses = array();
+     */
+    protected $workerConnections = array();
     
     /**
      * gateway 发送心跳时间间隔 单位:秒 ,0表示不发送心跳,在配置中设置
@@ -74,6 +73,35 @@ class Gateway extends Man\Core\SocketWorker
     protected $pingData = '';
     
     /**
+     * 由于网络延迟或者socket缓冲区大小的限制,客户端发来的数据可能不会都全部到达,需要根据协议判断数据是否完整
+     * @see Man\Core.SocketWorker::dealInput()
+     */
+    public function dealInput($recv_str)
+    {
+        // 这个聊天demo发送数据量都很小,一般都小于一个ip数据包,所以没有判断长度,直接返回了0,表示数据全部到达
+        // 其它应用应该根据客户端协议来判断数据是否完整
+        return 0;
+    }
+    
+    /**
+     * 用户客户端发来消息时处理
+     * @see Man\Core.SocketWorker::dealProcess()
+     */
+    public function dealProcess($recv_str)
+    {
+        // 判断用户是否认证过
+        $from_uid = $this->getUidByFd($this->currentDealFd);
+        // 触发ON_CONNECTION
+        if(!$from_uid)
+        {
+            return $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $this->currentDealFd, $recv_str);
+        }
+    
+        // 认证过, 触发ON_MESSAGE
+        $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_str);
+    }
+    
+    /**
      * 进程启动
      */
     public function start()
@@ -84,8 +112,9 @@ class Gateway extends Man\Core\SocketWorker
         // 添加accept事件
         $ret = $this->event->add($this->mainSocket,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'accept'));
         
-        // 创建内部通信套接字
+        // 创建内部通信套接字,用于与BusinessWorker通讯
         $start_port = Man\Core\Lib\Config::get($this->workerName.'.lan_port_start');
+        // 计算本进程监听的ip端口
         $this->lanPort = $start_port - posix_getppid() + posix_getpid();
         $this->lanIp = Man\Core\Lib\Config::get($this->workerName.'.lan_ip');
         if(!$this->lanIp)
@@ -95,13 +124,14 @@ class Gateway extends Man\Core\SocketWorker
         }
         $error_no_udp = $error_no_tcp = 0;
         $error_msg_udp = $error_msg_tcp = '';
+        // 执行监听
         $this->innerMainSocketUdp = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no_udp, $error_msg_udp, STREAM_SERVER_BIND);
         $this->innerMainSocketTcp = stream_socket_server("tcp://".$this->lanIp.':'.$this->lanPort, $error_no_tcp, $error_msg_tcp, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
+        // 出错,退出,下次会换个端口
         if(!$this->innerMainSocketUdp || !$this->innerMainSocketTcp)
         {
             $this->notice('create innerMainSocket udp or tcp fail and exit '.$error_msg_udp.$error_msg_tcp);
-            sleep(1);
-            exit(0);
+            $this->stop();
         }
         else
         {
@@ -110,15 +140,11 @@ class Gateway extends Man\Core\SocketWorker
         }
         
         // 注册套接字
-        $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
-        $this->registerAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
+        $this->registerAddress($this->lanIp.':'.$this->lanPort);
         
-        // 添加读udp事件
+        // 添加读udp/tcp事件
         $this->event->add($this->innerMainSocketUdp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
-        $this->event->add($this->innerMainSocketTcp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTCP'));
-        
-        // 初始化到worker的通信地址
-        $this->initWorkerAddresses();
+        $this->event->add($this->innerMainSocketTcp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTcp'));
         
         // 初始化心跳包时间间隔
         $ping_interval = \Man\Core\Lib\Config::get($this->workerName.'.ping_interval');
@@ -147,7 +173,9 @@ class Gateway extends Man\Core\SocketWorker
         
         // 主体循环,整个子进程会阻塞在这个函数上
         $ret = $this->event->loop();
+        // 下面正常不会执行到
         $this->notice('worker loop exit');
+        // 执行到就退出
         exit(0);
     }
     
@@ -155,10 +183,11 @@ class Gateway extends Man\Core\SocketWorker
      * 存储全局的通信地址 
      * @param string $address
      */
-    protected function registerAddress($address, $protocol)
+    protected function registerAddress($address)
     {
+        // 这里使用了信号量只能实现单机互斥,分布式互斥需要借助于memcache incr 或者其他分布式存储
         \Man\Core\Lib\Mutex::get();
-        $key = 'GLOBAL_GATEWAY_ADDRESS-' . $protocol;
+        $key = 'GLOBAL_GATEWAY_ADDRESS';
         $addresses_list = Store::get($key);
         if(empty($addresses_list))
         {
@@ -173,10 +202,11 @@ class Gateway extends Man\Core\SocketWorker
      * 删除全局的通信地址
      * @param string $address
      */
-    protected function unregisterAddress($address, $protocol)
+    protected function unregisterAddress($address)
     {
+        // 这里使用了信号量只能实现单机互斥,分布式互斥需要借助于memcache incr 或者其他分布式存储
         \Man\Core\Lib\Mutex::get();
-        $key = 'GLOBAL_GATEWAY_ADDRESS-' . $protocol;
+        $key = 'GLOBAL_GATEWAY_ADDRESS';
         $addresses_list = Store::get($key);
         if(empty($addresses_list))
         {
@@ -209,12 +239,16 @@ class Gateway extends Man\Core\SocketWorker
         $this->innerDealProcess($data);
     }
     
-    
+    /**
+     * 内部通讯端口接受BusinessWorker连接请求,以便建立起长连接
+     * @param resouce $socket
+     * @param null $null_one
+     * @param null $null_two
+     */
     public function acceptTcp($socket, $null_one = null, $null_two = null)
     {
         // 获得一个连接
         $new_connection = @stream_socket_accept($socket, 0);
-        // 可能是惊群效应
         if(false === $new_connection)
         {
             return false;
@@ -228,9 +262,15 @@ class Gateway extends Man\Core\SocketWorker
         // 非阻塞
         stream_set_blocking($this->connections[$fd], 0);
         $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'recvTcp'), $fd);
+        // 标记这个连接是内部通讯长连接,区别于客户端连接
+        $this->workerConnections[$fd] = $fd;
         return $new_connection;
     }
     
+    /**
+     * 内部通讯判断数据是否全部到达
+     * @param string $buffer
+     */
     public function dealInnerInput($buffer)
     {
         return GatewayProtocol::input($buffer);
@@ -262,7 +302,7 @@ class Gateway extends Man\Core\SocketWorker
             }
     
             // 关闭链接
-            $this->closeClient($fd);
+            $this->closeInnerClient($fd);
             if($this->workerStatus == self::STATUS_SHUTDOWN)
             {
                 $this->stop();
@@ -278,7 +318,7 @@ class Gateway extends Man\Core\SocketWorker
         {
             // 执行处理
             try{
-                // 业务处理
+                // 内部通讯业务处理
                 $this->innerDealProcess($this->recvBuffers[$fd]['buf']);
             }
             catch(\Exception $e)
@@ -286,12 +326,7 @@ class Gateway extends Man\Core\SocketWorker
                 $this->notice('CODE:' . $e->getCode() . ' MESSAGE:' . $e->getMessage()."\n".$e->getTraceAsString()."\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
                 $this->statusInfo['throw_exception'] ++;
             }
-    
-            // 关闭链接
-            if(empty($this->sendBuffers[$fd]))
-            {
-                $this->closeClient($fd);
-            }
+            $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
         }
         // 出错
         else if(false === $remain_len)
@@ -299,7 +334,7 @@ class Gateway extends Man\Core\SocketWorker
             // 出错
             $this->statusInfo['packet_err']++;
             $this->notice("INNER_PACKET_ERROR\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
-            $this->closeClient($fd);
+            $this->closeInnerClient($fd);
         }
         else
         {
@@ -315,21 +350,11 @@ class Gateway extends Man\Core\SocketWorker
             pcntl_alarm(self::EXIT_WAIT_TIME);
         }
     }
-    
-    protected function initWorkerAddresses()
-    {
-        $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.business_worker');
-        if(!$this->workerAddresses)
-        {
-            $this->notice($this->workerName.'business_worker not set');
-        }
-    }
-    
-    public function dealInput($recv_str)
-    {
-        return 0;
-    }
 
+    /**
+     * 内部通讯处理
+     * @param string $recv_str
+     */
     public function innerDealProcess($recv_str)
     {
         $pack = new GatewayProtocol($recv_str);
@@ -353,6 +378,10 @@ class Gateway extends Man\Core\SocketWorker
         }
     }
     
+    /**
+     * 广播数据
+     * @param string $bin_data
+     */
     protected function broadCast($bin_data)
     {
         foreach($this->uidConnMap as $uid=>$conn)
@@ -361,6 +390,10 @@ class Gateway extends Man\Core\SocketWorker
         }
     }
     
+    /**
+     * 根据socket_id关闭与客户端的连接,实际上是踢人操作
+     * @param int $socket_id
+     */
     protected function closeClientBySocketId($socket_id)
     {
         if($uid = $this->getUidByFd($socket_id))
@@ -370,6 +403,10 @@ class Gateway extends Man\Core\SocketWorker
         parent::closeClient($socket_id);
     }
     
+    /**
+     * 根据uid获取uid对应连接的id
+     * @param int $uid
+     */
     protected function getFdByUid($uid)
     {
         if(isset($this->uidConnMap[$uid]))
@@ -379,6 +416,10 @@ class Gateway extends Man\Core\SocketWorker
         return 0;
     }
     
+    /**
+     * 根据连接id获取用户uid
+     * @param int $fd
+     */
     protected function getUidByFd($fd)
     {
         if(isset($this->connUidMap[$fd]))
@@ -388,18 +429,29 @@ class Gateway extends Man\Core\SocketWorker
         return 0;
     }
     
+    /**
+     * BusinessWorker通知本Gateway进程$uid用户合法,绑定到$socket_id
+     * 后面这个socketid再有消息传来,会自动带上uid传递给BusinessWorker
+     * @param int $socket_id
+     * @param int $uid
+     */
     protected function connectSuccess($socket_id, $uid)
     {
         $binded_uid = $this->getUidByFd($socket_id);
         if($binded_uid)
         {
-            $this->notice('notify connection success fail ' . $socket_id . ' already binded data:'.serialize($data));
+            $this->notice('notify connection success fail ' . $socket_id . ' already binded ');
             return;
         }
         $this->uidConnMap[$uid] = $socket_id;
         $this->connUidMap[$socket_id] = $uid;
     }
     
+    /**
+     * 向某个socketId的连接发送消息
+     * @param int $socket_id
+     * @param string $bin_data
+     */
     public function sendToSocketId($socket_id, $bin_data)
     {
         if(!isset($this->connections[$socket_id]))
@@ -410,6 +462,10 @@ class Gateway extends Man\Core\SocketWorker
         return $this->sendToClient($bin_data);
     }
 
+    /**
+     * 用户客户端主动关闭连接触发
+     * @see Man\Core.SocketWorker::closeClient()
+     */
     protected function closeClient($fd)
     {
         if($uid = $this->getUidByFd($fd))
@@ -420,20 +476,22 @@ class Gateway extends Man\Core\SocketWorker
         parent::closeClient($fd);
     }
     
-    public function dealProcess($recv_str)
+    /**
+     * 内部通讯socket在BusinessWorker主动关闭连接时触发
+     * @param int $fd
+     */
+    protected function closeInnerClient($fd)
     {
-        // 判断用户是否认证过
-        $from_uid = $this->getUidByFd($this->currentDealFd);
-        // 触发ON_CONNECTION
-        if(!$from_uid)
-        {
-            return $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $this->currentDealFd, $recv_str);
-        }
-        
-        // 认证过, 触发ON_MESSAGE
-        $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_str);
+        unset($this->workerConnections[$fd]);
+        parent::closeClient($fd);
     }
     
+    /**
+     * 随机抽取一个与BusinessWorker的长连接,将数据发给一个BusinessWorker
+     * @param int $cmd
+     * @param int $socket_id
+     * @param string $body
+     */
     protected function sendToWorker($cmd, $socket_id, $body = '')
     {
         $address= $this->getRemoteAddress($socket_id);
@@ -450,13 +508,20 @@ class Gateway extends Man\Core\SocketWorker
         return $this->sendBufferToWorker($pack->getBuffer());
     }
     
+    /**
+     * 随机抽取一个与BusinessWorker的长连接,将数据发给一个BusinessWorker
+     * @param string $bin_data
+     */
     protected function sendBufferToWorker($bin_data)
     {
-        $client = stream_socket_client($this->workerAddresses[array_rand($this->workerAddresses)]);
-        $len = fwrite($client, $bin_data);
-        return $len == strlen($bin_data);
+        $this->currentDealFd = array_rand($this->workerConnections);
+        return $this->sendToClient($bin_data);
     }
     
+    /**
+     * 打印日志
+     * @see Man\Core.AbstractWorker::notice()
+     */
     protected function notice($str, $display=true)
     {
         $str = 'Worker['.get_class($this).']:'."$str ip:".$this->getRemoteIp();
@@ -467,16 +532,22 @@ class Gateway extends Man\Core\SocketWorker
         }
     }
     
+    /**
+     * 进程停止时,清除一些数据
+     * @see Man\Core.SocketWorker::onStop()
+     */
     public function onStop()
     {
-        $this->unregisterAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
-        $this->unregisterAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
+        $this->unregisterAddress($this->lanIp.':'.$this->lanPort);
         foreach($this->connUidMap as $uid)
         {
             Store::delete($uid);
         }
     }
     
+    /**
+     * 向认证的用户发送心跳数据
+     */
     public function ping()
     {
         $this->broadCast($this->pingData);

+ 32 - 0
applications/ChatDemo/Lib/APLog.php

@@ -0,0 +1,32 @@
+<?php
+/**
+ * 
+ * 日志类
+ * 
+* @author walkor <workerman.net>
+ */
+class APLog
+{
+    /**
+     * 添加日志
+     * @param string $msg
+     * @return void
+     */
+    public static function add($msg)
+    {
+        $log_dir = ROOT_DIR. '/Logs/'.date('Y-m-d');
+        umask(0);
+        // 没有log目录创建log目录
+        if(!is_dir($log_dir))
+        {
+            mkdir($log_dir,  0777, true);
+        }
+        if(!is_readable($log_dir))
+        {
+            return false;
+        }
+        
+        $log_file = $log_dir . "/applications.log";
+        file_put_contents($log_file, date('Y-m-d H:i:s') . " " . $msg . "\n", FILE_APPEND);
+    }
+}

+ 19 - 0
applications/ChatDemo/Lib/Context.php

@@ -0,0 +1,19 @@
+<?php
+/**
+ * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
+ * @author walkor
+ */
+class Context
+{
+    public static $series_id;
+    public static $local_ip;
+    public static $local_port;
+    public static $socket_id;
+    public static $client_ip;
+    public static $client_port;
+    public static $uid;
+    public static function clear()
+    {
+        self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;
+    }
+}

+ 54 - 20
applications/ChatDemo/Lib/Gateway.php

@@ -7,10 +7,34 @@
  * 
  */
 
+if(!defined('ROOT_DIR'))
+{
+    define('ROOT_DIR', __DIR__."/../");
+}
+
 require_once ROOT_DIR . '/Lib/Store.php';
+require_once ROOT_DIR . '/Lib/Context.php';
+require_once ROOT_DIR . '/Lib/APLog.php';
+require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
 
 class GateWay
 {
+    
+    /**
+     * gateway实例
+     * @var object
+     */
+    protected static  $businessWorker = null;
+    
+    /**
+     * 设置gateway实例,用于与
+     * @param unknown_type $gateway_instance
+     */
+    public static function setBusinessWorker($business_worker_instance)
+    {
+        self::$businessWorker = $business_worker_instance;
+    }
+    
    /**
     * 向所有客户端广播消息
     * @param string $message
@@ -28,14 +52,22 @@ class GateWay
        $pack->header['uid'] = Context::$uid;
        $pack->body = (string)$message;
        $buffer = $pack->getBuffer();
-       if(empty(Context::$protocol))
+       // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
+       if(self::$businessWorker)
        {
-           Context::$protocol = 'tcp';
+           foreach(self::$businessWorker->getGatewayConnections() as $con)
+           {
+               self::$businessWorker->sendToClient($buffer, $con);
+           }
        }
-       $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS-' . Context::$protocol);
-       foreach($all_addresses as $address)
+       // 运行在其它环境中,使用udp向worker发送数据
+       else
        {
-           self::sendToGateway($address, $buffer);
+           $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS');
+           foreach($all_addresses as $address)
+           {
+               self::sendToGateway($address, $buffer);
+           }
        }
    }
    
@@ -127,12 +159,7 @@ class GateWay
        $pack->header['uid'] = empty($uid) ? 0 : $uid;
        $pack->body = (string)$message;
        
-       if(empty(Context::$protocol))
-       {
-           Context::$protocol = 'tcp';
-       }
-        
-       return self::sendToGateway(Context::$protocol."://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
+       return self::sendToGateway("{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
    }
    
    
@@ -160,12 +187,7 @@ class GateWay
        $pack->header['uid'] = $uid ? $uid : 0;
        $pack->body = (string)$message;
        
-       if(empty(Context::$protocol))
-       {
-           Context::$protocol = 'tcp';
-       }
-       
-       return self::sendToGateway(Context::$protocol."://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
+       return self::sendToGateway("{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
    }
    
    /**
@@ -212,8 +234,20 @@ class GateWay
     */
    public static function sendToGateway($address, $buffer)
    {
-       $client = stream_socket_client($address);
-       $len = stream_socket_sendto($client, $buffer);
-       return $len == strlen($buffer);
+       // 有$businessWorker说明是workerman环境,使用$businessWorker发送数据
+       if(self::$businessWorker)
+       {
+           $connections = self::$businessWorker->getGatewayConnections();
+           if(!isset($connections[$address]))
+           {
+               $e = new \Exception("sendToGateway($address, $buffer) fail \$connections:".json_encode($connections));
+               APLog::add($e->__toString());
+               return false;
+           }
+           return self::$businessWorker->sendToClient($buffer, $connections[$address]);
+       }
+       // 非workerman环境,使用udp发送数据
+       $client = stream_socket_client("udp://$address", $errno, $errmsg);
+       return strlen($buffer) == stream_socket_sendto($client, $buffer);
    }
 }

+ 48 - 0
applications/ChatDemo/README.md

@@ -0,0 +1,48 @@
+基于TCP的一个聊天的Demo,该架构适用于绝大部分即时通讯应用,如PC\手机app IM、游戏后台、企业通讯软件、与硬件通讯等
+=========
+
+### Demo测试方法 
+  * 运行 cd Tests/
+  * 运行 php Chat.php
+  * 直接打字回车是向所有人发消息
+  * $uid:xxxxxx 是向$uid用户发送消息
+
+目录结构
+========
+
+<pre>
+.
+├── Bootstrap  // 进程入口目录,分为gateway进程和BusinessWorker进程。gateway进程负责接收用户连接,转发用户请求给BusinessWorker进程,接收BusinessWorker进程的结果转发给用户
+│   │
+│   ├── BusinessWorker.php // 业务进程,接收Gateway进程的转发来的用户请求并处理,如果有需要将结果发给其它用户则通过Gateway进程转发
+│   │
+│   └── Gateway.php  // gateway进程,负责客户端连接,转发用户请求给BusinessWorker进程处理,并接收BusinessWorker进程的处理结果转发给用户
+│ 
+├── Lib  // 通用的库
+│   ├── Gateway.php  // gateway进程的接口,BusinessWorker进程通过此文件的接口向gateway进程发送数据
+│   │
+│   ├── Store.php    // 用户存储用户连接信息(存储于logs/data.php文件中,高并发应用请使用Store.php.for-memcache替换)
+│   │
+│   └── Store.php.for-memcache  // 使用memcache储存用户连接信息,功能等同Store.php,支持分布式,性能更高,可以替换Store.php
+│ 
+├── Protocols // 协议相关
+│   └── GatewayProtocol.php  // gateway与BusinessWorker通讯的协议,开发者无需关注
+│ 
+├──Tests
+│   └── Chat.php // php测试客户端 
+│ 
+└── Event.php // 聊天所有的业务代码在此目录,群聊、私聊等
+</pre>
+
+为什么使用gateway worker模型
+===========================
+
+gateway worker模型非常适合长链接应用,例如聊天、游戏后台等。如果是短链接应用,则建议使用上面FileRecevierDemo基础的master slave模型。
+###1、gateway只负责网络IO,worker主要负责业务逻辑。各司其职,非常高效。
+打个比方,一个餐馆有4工人(进程),他们即负责招呼客人(网络IO),又负责在厨房做菜(业务逻辑)。当客人一下子来很多的时候(很多链接或很多数据),大家有可能都去招待客人了(都处理网络IO),厨房没人做菜(做业务)。当大家都做菜的时候(做业务),又没人招呼客人(接收链接),导致客人(用户)都在等待。但是当我们把工人(进程)分工一下,2个人专门招呼客人(geteway进程),两个人专门做菜(worker进程),这样每个时刻都有有人(进程)招待客人(接收数据),都有人(进程)做菜(处理业务)。当gateway不够用的时候(一般都是够用的)增加gateway,worker忙不过来的时候增加worker进程。这样效率会提升很多。
+###2、提高稳定性
+gateway进程因为要维持用户链接,这要求gateway进程一定要非常稳定,不然如果gateway进程出问题,则这个进程上的所有用户都会断开链接。让gateway只负责网络IO,不负责业务,就是因为业务频繁变化,可能会有致命的错误(例如调用了一个不存在的函数)导致进程退出,进而导致用户链接断开。而让gateway只负责网络IO,就是要避免这种风险。而worker进程是无状态的(没有保存用户链接等状态信息),即使偶尔出现FatalErr,也只会影响当前的这次请求,而不会对整个服务造成大的影响。
+###3、热更新
+由于gateway进程没有业务逻辑,所以geteway进程极少有代码更新。而worker进程由于负责业务逻辑,会有经常性的代码更新。这样看来我们每次代码更新,只要重启worker进程就可以实现运行新的业务代码。实际上也是这样,当更新程序逻辑时,我们只需要重启worker进程就可以了,这样就不会导致更新代码的时候用户链接会断开,达到不影响用户的情况下热更新后台程序。
+###4、扩展容易
+当worker进程不够用的时候,我们可以水平扩展它,可增加worker的进程数量,甚至可以增加服务器专门运行worker进程,达到水平扩展的目的,以支持更大的用户量。gateway进程也是同样的道理。

+ 5 - 0
applications/FileReceiverDemo/README.md

@@ -0,0 +1,5 @@
+基于TCP的文件传输的例子
+=========
+
+### Demo测试方法 
+  * 运行 php ClientDemo.php workerman.png 2  // 此命令的意思是通过ClientDemo.php客户端 将 workerman.png 文件传输给workerman,由workerman保存到workerman所在服务器。参数 2 代表文件类型是png

+ 8 - 0
applications/README.md

@@ -0,0 +1,8 @@
+Benchmark 用于压力压测
+============
+
+ChatDemo 用于基于TCP的聊天程序
+============
+
+FileRecvierDemo 文件传输的demo
+============

+ 1 - 1
workerman/Common/FileMonitor.php

@@ -175,7 +175,7 @@ class FileMonitor extends Man\Core\AbstractWorker
      */
     public function checkTty()
     {
-        if(!$this->terminalClosed && !posix_ttyname(STDOUT))
+        if(!$this->terminalClosed && !@posix_ttyname(STDOUT))
         {
             $this->resetFd();
             // 日志

+ 1 - 2
workerman/Core/AbstractWorker.php

@@ -204,8 +204,7 @@ abstract class AbstractWorker
             {
                 $error_msg .= $this->getErrorType($errors['type']) . " {$errors['message']} in {$errors['file']} on line {$errors['line']}";
             }
-            $e = new \Exception();
-            $this->notice($error_msg."\n" . $e->getTraceAsString());
+            $this->notice($error_msg);
         }
     }
     

+ 1 - 1
workerman/Core/Master.php

@@ -33,7 +33,7 @@ class Master
      * 版本
      * @var string
      */
-    const VERSION = '2.0.1';
+    const VERSION = '2.1.1';
     
     /**
      * 服务名

+ 6 - 6
workerman/conf/conf.d/BusinessWorker.conf

@@ -1,12 +1,12 @@
-;业务进程入口文件
+;涓氬姟杩涚▼鍏ュ彛鏂囦欢
 worker_file = ../applications/ChatDemo/Bootstrap/BusinessWorker.php
-;传输层协议 ip 及端口
+;浼犺緭灞傚崗璁� ip 鍙婄�鍙�
 listen = tcp://0.0.0.0:8483
-;启动多少服务进程
+;鍚�姩澶氬皯鏈嶅姟杩涚▼
 start_workers = 5
-;以哪个用户运行该进程,为了安全请使用权限较低的用户,例如www-data nobody
+;浠ュ摢涓�敤鎴疯繍琛岃�杩涚▼锛屼负浜嗗畨鍏ㄨ�浣跨敤鏉冮檺杈冧綆鐨勭敤鎴凤紝渚嬪�www-data nobody
 user = root
-;请求到来时预读长度,这里固定27
+;璇锋眰鍒版潵鏃堕�璇婚暱搴︼紝杩欓噷鍥哄畾27
 preread_length = 27
-;设置最大请求数,超过这个请求数后会安全重启该进程(主要是避免因业务代码不规范导致的内存泄露)
+;璁剧疆鏈€澶ц�姹傛暟锛岃秴杩囪繖涓��姹傛暟鍚庝細瀹夊叏閲嶅惎璇ヨ繘绋嬶紙涓昏�鏄�伩鍏嶅洜涓氬姟浠g爜涓嶈�鑼冨�鑷寸殑鍐呭瓨娉勯湶锛�
 max_requests=10000

+ 0 - 4
workerman/conf/conf.d/Gateway.conf

@@ -34,10 +34,6 @@ lan_ip = 127.0.0.1
 ;内部通讯端口起始值,假如开启5个gateway进程,则每个进程会监听一个端口,40001 40002 40003 40004 40005
 lan_port_start = 40000
 
-;业务进程通讯传输层协议、ip、端口
-business_worker[] = tcp://127.0.0.1:8483
-business_worker[] = tcp://127.0.0.1:8483
-
 ;此gateway进程向客户端发送心跳时间间隔 单位:秒
 ping_interval = 10