Parcourir la source

workerman升级

walkor il y a 11 ans
Parent
commit
314d226f0e

+ 16 - 16
applications/Demo/Bootstrap/BusinessWorker.php

@@ -42,7 +42,6 @@ class BusinessWorker extends Man\Core\SocketWorker
      */
     protected static $interfaceMap = array(
         GatewayProtocol::CMD_ON_GATEWAY_CONNECTION => 'CMD_ON_GATEWAY_CONNECTION',
-        GatewayProtocol::CMD_ON_CONNECTION         => 'CMD_ON_CONNECTION',
         GatewayProtocol::CMD_ON_MESSAGE            => 'CMD_ON_MESSAGE',
         GatewayProtocol::CMD_ON_CLOSE              => 'CMD_ON_CLOSE',
     );
@@ -53,6 +52,8 @@ class BusinessWorker extends Man\Core\SocketWorker
      */
     protected function onStart()
     {
+        // 强制设置成长链接
+        $this->isPersistentConnection = true;
         // 定时检查与gateway进程的连接
         \Man\Core\Lib\Task::init($this->event);
         \Man\Core\Lib\Task::add(1, array($this, 'checkGatewayConnections'));
@@ -72,30 +73,30 @@ class BusinessWorker extends Man\Core\SocketWorker
      * 检查gateway转发来的用户请求是否完整
      * @see Man\Core.SocketWorker::dealInput()
      */
-    public function dealInput($recv_str)
+    public function dealInput($recv_buffer)
     {
-        return GatewayProtocol::input($recv_str); 
+        return GatewayProtocol::input($recv_buffer); 
     }
 
     /**
      * 处理请求
      * @see Man\Core.SocketWorker::dealProcess()
      */
-    public function dealProcess($recv_str)
+    public function dealProcess($recv_buffer)
     {
-        $pack = new GatewayProtocol($recv_str);
+        $pack = new GatewayProtocol($recv_buffer);
         Context::$client_ip = $pack->header['client_ip'];
         Context::$client_port = $pack->header['client_port'];
         Context::$local_ip = $pack->header['local_ip'];
         Context::$local_port = $pack->header['local_port'];
         Context::$socket_id = $pack->header['socket_id'];
-        Context::$uid = $pack->header['uid'];
+        Context::$client_id = $pack->header['client_id'];
         $_SERVER = array(
             'REMOTE_ADDR' => Context::$client_ip,
             'REMOTE_PORT' => Context::$client_port,
             'GATEWAY_ADDR' => Context::$local_ip,
             'GATEWAY_PORT'  => Context::$local_port,
-            'GATEWAY_SOCKET_ID' => Context::$socket_id,
+            'GATEWAY_CLIENT_ID' => Context::$client_id,
         );
         if($pack->ext_data != '')
         {
@@ -105,13 +106,13 @@ class BusinessWorker extends Man\Core\SocketWorker
         {
             $_SESSION = null;
         }
-        // 备份一次$pack->ext_data,请求处理完毕后判断session是否和备份相等
+        // 备份一次$pack->ext_data,请求处理完毕后判断session是否和备份相等,不相等就更新session
         $session_str_copy = $pack->ext_data;
         $cmd = $pack->header['cmd'];
         
         StatisticClient::tick();
         $module = __CLASS__;
-        $interface = isset(self::$interfaceMap[$cmd]) ? self::$interfaceMap[$cmd] : 'null';
+        $interface = isset(self::$interfaceMap[$cmd]) ? self::$interfaceMap[$cmd] : $cmd;
         $success = 1;
         $code = 0;
         $msg = '';
@@ -121,14 +122,11 @@ class BusinessWorker extends Man\Core\SocketWorker
                 case GatewayProtocol::CMD_ON_GATEWAY_CONNECTION:
                     call_user_func_array(array('Event', 'onGatewayConnect'), array());
                     break;
-                case GatewayProtocol::CMD_ON_CONNECTION:
-                    call_user_func_array(array('Event', 'onConnect'), array($pack->body));
-                    break;
                 case GatewayProtocol::CMD_ON_MESSAGE:
-                    call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
+                    call_user_func_array(array('Event', 'onMessage'), array(Context::$client_id, $pack->body));
                     break;
                 case GatewayProtocol::CMD_ON_CLOSE:
-                    call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
+                    call_user_func_array(array('Event', 'onClose'), array(Context::$client_id));
                     break;
             }
         }
@@ -136,7 +134,7 @@ class BusinessWorker extends Man\Core\SocketWorker
         {
             $success = 0;
             $code = $e->getCode() > 0 ? $e->getCode() : 500;
-            $msg = 'uid:'.Context::$uid."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
+            $msg = 'uid:'.Context::$client_id."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
         }
         
         $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
@@ -177,10 +175,12 @@ class BusinessWorker extends Man\Core\SocketWorker
                     // 删除连不上的端口
                     if($this->badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
                     {
+                        \Man\Core\Lib\Mutex::get();
                         $addresses_list = Store::instance('gateway')->get($key);
                         unset($addresses_list[$addr]);
                         Store::instance('gateway')->set($key, $addresses_list);
                         $this->notice("tcp://$addr ".$errstr." del $addr from store", false);
+                        \Man\Core\Lib\Mutex::release();
                     }
                     continue;
                 }
@@ -223,7 +223,7 @@ class BusinessWorker extends Man\Core\SocketWorker
             $the_fd = (int) $con;
             if($the_fd == $fd)
             {
-                unset($this->gatewayConnections[$addr]);
+                unset($this->gatewayConnections[$addr], $this->badGatewayAddress[$addr]);
             }
         }
         parent::closeClient($fd);

+ 132 - 144
applications/Demo/Bootstrap/Gateway.php

@@ -40,22 +40,28 @@ class Gateway extends Man\Core\SocketWorker
     protected $lanPort = 0;
     
     /**
-     * uid到连接的映射
+     * client_id到连接的映射
      * @var array
      */
-    protected $uidConnMap = array();
+    protected $clientConnMap = array();
     
     /**
-     * 连接到uid的映射
+     * 连接到client_id的映射
      * @var array
      */
-    protected $connUidMap = array();
+    protected $connClientMap = array();
     
     /**
-     * uid到session的映射
+     * 客户端链接和客户端远程地址映射
      * @var array
      */
-    protected $socketSessionMap = array();
+    protected $connRemoteAddressMap = array();
+    
+    /**
+     * client_id到session的映射
+     * @var array
+     */
+    protected $connSessionMap = array();
     
     /**
      * 与worker的连接
@@ -72,7 +78,6 @@ class Gateway extends Man\Core\SocketWorker
     
     /**
      * 心跳数据
-     * 可以是字符串(在配置中直接设置字符串如 ping_data=ping),
      * 可以是二进制数据(二进制数据保存在文件中,在配置中设置ping数据文件路径 如 ping_data=/yourpath/ping.bin)
      * ping数据应该是客户端能够识别的数据格式,只是检测连接的连通性,客户端收到心跳数据可以选择忽略此数据包
      * @var string
@@ -85,65 +90,42 @@ class Gateway extends Man\Core\SocketWorker
      */
     protected static $interfaceMap = array(
             GatewayProtocol::CMD_SEND_TO_ONE             => 'CMD_SEND_TO_ONE',
-            GatewayProtocol::CMD_KICK                              => 'CMD_KICK',
-            GatewayProtocol::CMD_SEND_TO_ALL               => 'CMD_SEND_TO_ALL',
-            GatewayProtocol::CMD_CONNECT_SUCCESS     => 'CMD_CONNECT_SUCCESS',
-            GatewayProtocol::CMD_UPDATE_SESSION        => 'CMD_UPDATE_SESSION',
-            GatewayProtocol::CMD_GET_ONLINE_STATUS  => 'CMD_GET_ONLINE_STATUS',
-            GatewayProtocol::CMD_IS_ONLINE                    => 'CMD_IS_ONLINE',
-            GatewayProtocol::CMD_ON_GATEWAY_CONNECTION    => 'CMD_ON_GATEWAY_CONNECTION',
+            GatewayProtocol::CMD_SEND_TO_ALL             => 'CMD_SEND_TO_ALL',
+            GatewayProtocol::CMD_KICK                    => 'CMD_KICK',
+            GatewayProtocol::CMD_UPDATE_SESSION          => 'CMD_UPDATE_SESSION',
+            GatewayProtocol::CMD_GET_ONLINE_STATUS       => 'CMD_GET_ONLINE_STATUS',
+            GatewayProtocol::CMD_IS_ONLINE               => 'CMD_IS_ONLINE',
      );
     
     /**
      * 由于网络延迟或者socket缓冲区大小的限制,客户端发来的数据可能不会都全部到达,需要根据协议判断数据是否完整
      * @see Man\Core.SocketWorker::dealInput()
      */
-    public function dealInput($recv_str)
+    public function dealInput($recv_buffer)
     {
-        // 如果有Event::onGatewayMessage方法通过这个方法检查数据是否接收完整
-        if(method_exists('Event','onGatewayMessage'))
-        {
-            return call_user_func_array(array('Event', 'onGatewayMessage'), array($recv_str));
-        }
-        return 0;
+        return call_user_func_array(array('Event', 'onGatewayMessage'), array($recv_buffer));
     }
     
     /**
      * 用户客户端发来消息时处理
      * @see Man\Core.SocketWorker::dealProcess()
      */
-    public function dealProcess($recv_str)
+    public function dealProcess($recv_buffer)
     {
-        // 判断用户是否认证过
+        // 统计打点
         StatisticClient::tick();
-        $from_uid = $this->getUidByFd($this->currentDealFd);
         $module = __CLASS__;
         $success = 1;
         $code = 0;
         $msg = '';
-        // 触发ON_CONNECTION
-        if(!$from_uid)
+        $interface = 'CMD_ON_MESSAGE';
+        // 触发ON_MESSAGE
+        $ret =$this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_buffer);
+        if($ret === false)
         {
-            $interface = 'ON_CONNECTION';
-            $ret = $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $this->currentDealFd, $recv_str);
-            if($ret === false)
-            {
-                $success = 0;
-                $msg = 'sendToWorker(CMD_ON_CONNECTION, '.$this->currentDealFd.', strlen($recv_str) = '.strlen($recv_str).') fail ';
-                $code = 101;
-            }
-        }
-        else
-        {
-            // 认证过, 触发ON_MESSAGE
-            $interface = 'CMD_ON_MESSAGE';
-            $ret =$this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_str);
-            if($ret === false)
-            {
-                $success = 0;
-                $msg = 'sendToWorker(CMD_ON_MESSAGE, '.$this->currentDealFd.', strlen($recv_str) = '.strlen($recv_str).') fail ';
-                $code = 102;
-            }
+            $success = 0;
+            $msg = 'sendToWorker(CMD_ON_MESSAGE, '.$this->currentDealFd.', strlen($recv_buffer) = '.strlen($recv_buffer).') fail ';
+            $code = 102;
         }
         StatisticClient::report($module, $interface, $success, $code, $msg);
     }
@@ -190,8 +172,8 @@ class Gateway extends Man\Core\SocketWorker
         $this->registerAddress($this->lanIp.':'.$this->lanPort);
         
         // 添加读udp/tcp事件
-        $this->event->add($this->innerMainSocketUdp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
-        $this->event->add($this->innerMainSocketTcp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTcp'));
+        $this->event->add($this->innerMainSocketUdp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvInnerUdp'));
+        $this->event->add($this->innerMainSocketTcp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptInnerTcp'));
         
         // 初始化心跳包时间间隔
         $ping_interval = \Man\Core\Lib\Config::get($this->workerName.'.ping_interval');
@@ -248,14 +230,41 @@ class Gateway extends Man\Core\SocketWorker
         $fd = (int) $new_connection;
         $this->connections[$fd] = $new_connection;
         $this->recvBuffers[$fd] = array('buf'=>'', 'remain_len'=>$this->prereadLength);
-        $this->socketSessionMap[$fd] = '';
+        $this->connSessionMap[$fd] = '';
  
         // 非阻塞
         stream_set_blocking($this->connections[$fd], 0);
         $this->event->add($this->connections[$fd], Man\Core\Events\BaseEvent::EV_READ , array($this, 'dealInputBase'), $fd);
         
+        // 全局唯一client_id
+        $global_client_id = $this->createGlobalClientId();
+        $this->clientConnMap[$global_client_id] = $fd;
+        $this->connClientMap[$fd] = $global_client_id;
+        $address = array('local_ip'=>$this->lanIp, 'local_port'=>$this->lanPort, 'socket_id'=>$fd);
+        // 保存客户端内部通讯地址
+        if(!Store::instance('gateway')->set($global_client_id, $address))
+        {
+            $this->notice("Store::instance('gateway')->set($global_client_id, ".json_encode($address).") fail");
+        }
+        
+        // 客户端保存 ip:port
+        $address= $this->getRemoteAddress($fd);
+        if($address)
+        {
+            list($client_ip, $client_port) = explode(':', $address, 2);
+        }
+        else
+        {
+            $client_ip = 0;
+            $client_port = 0;
+        }
+        $this->connRemoteAddressMap[$fd] = array('ip'=>$client_ip, 'port'=>$client_port);
+        
         // 触发GatewayOnConnection事件
-        $this->sendToWorker(GatewayProtocol::CMD_ON_GATEWAY_CONNECTION, $fd);
+        if(method_exists('Event','onGatewayConnect'))
+        {
+            $this->sendToWorker(GatewayProtocol::CMD_ON_GATEWAY_CONNECTION, $fd);
+        }
         
         return $new_connection;
     }
@@ -306,7 +315,7 @@ class Gateway extends Man\Core\SocketWorker
      * @param $null_two $base
      * @return void
      */
-    public function recvUdp($socket, $null_one = null, $null_two = null)
+    public function recvInnerUdp($socket, $null_one = null, $null_two = null)
     {
         $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
         // 惊群效应
@@ -326,7 +335,7 @@ class Gateway extends Man\Core\SocketWorker
      * @param null $null_one
      * @param null $null_two
      */
-    public function acceptTcp($socket, $null_one = null, $null_two = null)
+    public function acceptInnerTcp($socket, $null_one = null, $null_two = null)
     {
         // 获得一个连接
         $new_connection = @stream_socket_accept($socket, 0);
@@ -342,7 +351,8 @@ class Gateway extends Man\Core\SocketWorker
     
         // 非阻塞
         stream_set_blocking($this->connections[$fd], 0);
-        $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'recvTcp'), $fd);
+        $this->event->add($this->connections[$fd], \Man\Core\Events\BaseEvent::EV_READ , array($this, 'recvInnerTcp'), $fd);
+        
         // 标记这个连接是内部通讯长连接,区别于客户端连接
         $this->workerConnections[$fd] = $fd;
         return $new_connection;
@@ -358,18 +368,19 @@ class Gateway extends Man\Core\SocketWorker
     }
     
     /**
-     * 处理到的数据
+     * 处理内部通讯收到的数据
      * @param event_buffer $event_buffer
      * @param int $fd
      * @return void
      */
-    public function recvTcp($connection, $flag, $fd = null)
+    public function recvInnerTcp($connection, $flag, $fd = null)
     {
         $this->currentDealFd = $fd;
         $buffer = stream_socket_recvfrom($connection, $this->recvBuffers[$fd]['remain_len']);
         // 出错了
         if('' == $buffer && '' == ($buffer = fread($connection, $this->recvBuffers[$fd]['remain_len'])))
         {
+            // 判断是否是链接断开
             if(!feof($connection))
             {
                 return;
@@ -379,7 +390,7 @@ class Gateway extends Man\Core\SocketWorker
             if(!empty($this->recvBuffers[$fd]['buf']))
             {
                 $this->statusInfo['send_fail']++;
-                $this->notice("INNER_CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
+                $this->notice("INNER_CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".bin2hex($this->recvBuffers[$fd]['buf'])."]\n");
             }
     
             // 关闭链接
@@ -434,15 +445,15 @@ class Gateway extends Man\Core\SocketWorker
 
     /**
      * 内部通讯处理
-     * @param string $recv_str
+     * @param string $recv_buffer
      */
-    public function innerDealProcess($recv_str)
+    public function innerDealProcess($recv_buffer)
     {
-        $pack = new GatewayProtocol($recv_str);
+        $pack = new GatewayProtocol($recv_buffer);
         $cmd = $pack->header['cmd'];
         StatisticClient::tick();
         $module = __CLASS__;
-        $interface = isset(self::$interfaceMap[$cmd]) ? self::$interfaceMap[$cmd] : 'null';
+        $interface = isset(self::$interfaceMap[$cmd]) ? self::$interfaceMap[$cmd] : $cmd;
         $success = 1;
         $code = 0;
         $msg = '';
@@ -450,25 +461,24 @@ class Gateway extends Man\Core\SocketWorker
         {
             switch($cmd)
             {
+                // 向某客户端发送数据
                 case GatewayProtocol::CMD_SEND_TO_ONE:
                     $this->sendToSocketId($pack->header['socket_id'], $pack->body);
                     break;
+                // 踢掉客户端
                 case GatewayProtocol::CMD_KICK:
-                    if($pack->body)
-                    {
-                        $this->sendToSocketId($pack->header['socket_id'], $pack->body);
-                    }
-                    $this->closeClientBySocketId($pack->header['socket_id']);
+                    $this->closeClient($pack->header['socket_id']);
                     break;
+                // 向所客户端发送数据
                 case GatewayProtocol::CMD_SEND_TO_ALL:
                     if($pack->ext_data)
                     {
-                        $uid_array = unpack('N*', $pack->ext_data);
-                        foreach($uid_array as $uid)
+                        $client_id_array = unpack('N*', $pack->ext_data);
+                        foreach($client_id_array as $client_id)
                         {
-                            if(isset($this->uidConnMap[$uid]))
+                            if(isset($this->clientConnMap[$client_id]))
                             {
-                                $this->sendToSocketId($this->uidConnMap[$uid], $pack->body);
+                                $this->sendToSocketId($this->clientConnMap[$client_id], $pack->body);
                             }
                         }
                     }
@@ -477,22 +487,23 @@ class Gateway extends Man\Core\SocketWorker
                         $this->broadCast($pack->body);
                     }
                     break;
-                case GatewayProtocol::CMD_CONNECT_SUCCESS:
-                    $this->connectSuccess($pack->header['socket_id'], $pack->header['uid']);
-                    break;
+                // 更新某个客户端的session
                 case GatewayProtocol::CMD_UPDATE_SESSION:
-                    if(isset($this->socketSessionMap[$pack->header['socket_id']]))
+                    if(isset($this->connSessionMap[$pack->header['socket_id']]))
                     {
-                        $this->socketSessionMap[$pack->header['socket_id']] = $pack->ext_data;
+                        $this->connSessionMap[$pack->header['socket_id']] = $pack->ext_data;
                     }
                     break;
+                // 获得在线状态
                 case GatewayProtocol::CMD_GET_ONLINE_STATUS:
-                    $online_status = json_encode(array_values($this->connUidMap));
+                    $online_status = json_encode(array_values($this->connClientMap));
                     stream_socket_sendto($this->innerMainSocketUdp, $online_status, 0, $this->currentClientAddress);
                     break;
+                // 判断某个客户端id是否在线
                 case GatewayProtocol::CMD_IS_ONLINE:
-                    stream_socket_sendto($this->innerMainSocketUdp, (int)isset($this->uidConnMap[$pack->header['uid']]), 0, $this->currentClientAddress);
+                    stream_socket_sendto($this->innerMainSocketUdp, (int)isset($this->clientConnMap[$pack->header['client_id']]), 0, $this->currentClientAddress);
                     break;
+                // 未知的命令
                 default :
                     $err_msg = "gateway inner pack err cmd=$cmd";
                     $this->notice($err_msg);
@@ -514,76 +525,40 @@ class Gateway extends Man\Core\SocketWorker
      */
     protected function broadCast($bin_data)
     {
-        foreach($this->uidConnMap as $uid=>$conn)
+        foreach($this->clientConnMap as $client_id=>$conn)
         {
             $this->sendToSocketId($conn, $bin_data);
         }
     }
     
     /**
-     * 根据socket_id关闭与客户端的连接,实际上是踢人操作
-     * @param int $socket_id
-     */
-    protected function closeClientBySocketId($socket_id)
-    {
-        if($uid = $this->getUidByFd($socket_id))
-        {
-            unset($this->uidConnMap[$uid]);
-        }
-        unset($this->connUidMap[$socket_id], $this->socketSessionMap[$socket_id]);
-        parent::closeClient($socket_id);
-    }
-    
-    /**
-     * 根据uid获取uid对应连接的id
-     * @param int $uid
+     * 根据client_id获取client_id对应连接的socket_id
+     * @param int $client_id
      */
-    protected function getFdByUid($uid)
+    protected function getFdByClientId($client_id)
     {
-        if(isset($this->uidConnMap[$uid]))
+        if(isset($this->clientConnMap[$client_id]))
         {
-            return $this->uidConnMap[$uid];
+            return $this->clientConnMap[$client_id];
         }
         return 0;
     }
     
     /**
-     * 根据连接id获取用户uid
+     * 根据连接socket_id获取client_id
      * @param int $fd
      */
-    protected function getUidByFd($fd)
+    protected function getClientIdByFd($fd)
     {
-        if(isset($this->connUidMap[$fd]))
+        if(isset($this->connClientMap[$fd]))
         {
-            return $this->connUidMap[$fd];
+            return $this->connClientMap[$fd];
         }
         return 0;
     }
     
     /**
-     * BusinessWorker通知本Gateway进程$uid用户合法,绑定到$socket_id
-     * 后面这个socketid再有消息传来,会自动带上uid传递给BusinessWorker
-     * @param int $socket_id
-     * @param int $uid
-     */
-    protected function connectSuccess($socket_id, $uid)
-    {
-        if($binded_uid = $this->getUidByFd($socket_id))
-        {
-            $this->notice('notify connection fail socket:' . $socket_id . ' already binded uid:' . $binded_uid);
-            return;
-        }
-        if($binded_socket = $this->getFdByUid($uid))
-        {
-            $this->notice('notify connection warning uid:' . $uid . ' already binded socket:' . $binded_socket);
-            $this->closeClient($binded_socket);
-        }
-        $this->uidConnMap[$uid] = $socket_id;
-        $this->connUidMap[$socket_id] = $uid;
-    }
-    
-    /**
-     * 向某个socketId的连接发送消息
+     * 向某个socket_id的连接发送消息
      * @param int $socket_id
      * @param string $bin_data
      */
@@ -598,18 +573,19 @@ class Gateway extends Man\Core\SocketWorker
     }
 
     /**
-     * 用户客户端主动关闭连接触发
+     * 用户客户端关闭连接触发
      * @see Man\Core.SocketWorker::closeClient()
      */
     protected function closeClient($fd)
     {
         StatisticClient::tick();
-        if($uid = $this->getUidByFd($fd))
+        if($client_id = $this->getClientIdByFd($fd))
         {
             $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $fd);
-            unset($this->uidConnMap[$uid]);
+            Store::instance('gateway')->delete($client_id);
+            unset($this->clientConnMap[$client_id]);
         }
-        unset($this->connUidMap[$fd], $this->socketSessionMap[$fd]);
+        unset($this->connClientMap[$fd], $this->connSessionMap[$fd], $this->connRemoteAddressMap[$fd]);
         parent::closeClient($fd);
         StatisticClient::report(__CLASS__, 'CMD_ON_CLOSE', 1, 0, '');
     }
@@ -632,26 +608,16 @@ class Gateway extends Man\Core\SocketWorker
      */
     protected function sendToWorker($cmd, $socket_id, $body = '')
     {
-        $address= $this->getRemoteAddress($socket_id);
-        if($address)
-        {
-            list($client_ip, $client_port) = explode(':', $address, 2);
-        }
-        else
-        {
-            $client_ip = 0;
-            $client_port = 0;
-        }
         $pack = new GatewayProtocol();
         $pack->header['cmd'] = $cmd;
         $pack->header['local_ip'] = $this->lanIp;
         $pack->header['local_port'] = $this->lanPort;
         $pack->header['socket_id'] = $socket_id;
-        $pack->header['client_ip'] = $client_ip;
-        $pack->header['client_port'] = $client_ip;
-        $pack->header['uid'] = $this->getUidByFd($socket_id);
+        $pack->header['client_ip'] = $this->connRemoteAddressMap[$socket_id]['ip'];
+        $pack->header['client_port'] = $this->connRemoteAddressMap[$socket_id]['port'];
+        $pack->header['client_id'] = $this->getClientIdByFd($socket_id);
         $pack->body = $body;
-        $pack->ext_data = $this->socketSessionMap[$pack->header['socket_id']];
+        $pack->ext_data = $this->connSessionMap[$pack->header['socket_id']];
         return $this->sendBufferToWorker($pack->getBuffer());
     }
     
@@ -665,6 +631,10 @@ class Gateway extends Man\Core\SocketWorker
         {
             return $this->sendToClient($bin_data);
         }
+        else
+        {
+            $this->notice("sendBufferToWorker fail \$this->workerConnections=".var_export($this->workerConnections,true));
+        }
     }
     
     /**
@@ -688,14 +658,32 @@ class Gateway extends Man\Core\SocketWorker
     public function onStop()
     {
         $this->unregisterAddress($this->lanIp.':'.$this->lanPort);
-        foreach($this->connUidMap as $uid)
+        foreach($this->connClientMap as $client_id)
+        {
+            Store::instance('gateway')->delete($client_id);
+        }
+    }
+    
+    /**
+     * 创建全局唯一的id
+     */
+    protected function createGlobalClientId()
+    {
+        $global_socket_key = 'GLOBAL_SOCKET_ID_KEY';
+        $global_client_id = Store::instance('gateway')->increment($global_socket_key);
+        if(!$global_client_id || $global_client_id > 2147483646)
+        {
+            Store::instance('gateway')->set($global_socket_key, 1);
+        }
+        else 
         {
-            Store::instance('gateway')->delete($uid);
+            return $global_client_id;
         }
+        return Store::instance('gateway')->increment($global_socket_key);
     }
     
     /**
-     * 向认证的用户发送心跳数据
+     * 向客户端发送心跳数据
      */
     public function ping()
     {

+ 9 - 4
applications/Demo/Config/Store.php

@@ -3,6 +3,7 @@ namespace Config;
 
 /**
  * 存储配置
+ * 注意生产环境使用$driver = self::DRIVER_MC,具体参考applications/Demo/README.md
  * @author walkor
  */
 class Store
@@ -12,15 +13,19 @@ class Store
     // 使用memcache存储,支持workerman分布式部署
     const DRIVER_MC = 2;
     
-    // 使用哪种存储驱动 文件存储DRIVER_FILE 或者 memcache存储DRIVER_MC,为了更好的性能请使用DRIVER_MC
+    /* 使用哪种存储驱动 文件存储DRIVER_FILE 或者 memcache存储DRIVER_MC,为了更好的性能请使用DRIVER_MC
+     * 注意: DRIVER_FILE只适合开发环境,生产环境或者压测请使用DRIVER_MC,需要php cli 安装memcache扩展
+     */
     public static $driver = self::DRIVER_FILE;
     
-    // 如果使用文件存储,则在这里设置数据存储的目录,默认/tmp/下
-    public static $storePath = '/tmp/workerman-Demo/';
-    
     // 如果是memcache存储,则在这里设置memcache的ip端口,注意确保你安装了memcache扩展
     public static $gateway = array(
         '127.0.0.1:22322',
     );
     
+    /* 如果使用文件存储,则在这里设置数据存储的目录,默认/tmp/下
+     * 注意:如果修改了storePath,要将storePath加入到conf/conf.d/FileMonitor.conf的忽略目录中 
+     * 例如 $storePath = '/home/data/',则需要在conf/conf.d/FileMonitor.conf加一行 exclude_path[]=/home/data/
+     */
+    public static $storePath = '/tmp/workerman-Demo/';
 }

+ 83 - 121
applications/Demo/Event.php

@@ -1,121 +1,83 @@
-<?php
-/**
- * 
- * 
- * @author walkor <workerman.net>
- * 
- */
-
-use \Lib\Context;
-use \Lib\Gateway;
-use \Lib\StatisticClient;
-use \Lib\Store;
-use \Protocols\GatewayProtocol;
-use \Protocols\TextProtocol;
-
-
-class Event
-{
-    /**
-     * 当网关有客户端链接上来时触发,一般这里留空
-     */
-    public static function onGatewayConnect()
-    {
-        Gateway::sendToCurrentUid(TextProtocol::encode("type in your name:"));
-    }
-    
-    /**
-     * 网关有消息时,判断消息是否完整
-     */
-    public static function onGatewayMessage($buffer)
-    {
-        return TextProtocol::check($buffer);
-    }
-    
-   /**
-    * 此链接的用户没调用GateWay::notifyConnectionSuccess($uid);前(即没有得到验证),都触发onConnect
-    * 已经调用GateWay::notifyConnectionSuccess($uid);的用户有消息时,则触发onMessage
-    * @param string $message 一般是传递的账号密码等信息
-    * @return void
-    */
-   public static function onConnect($message)
-   {
-       /*
-        * 通过message验证用户,并获得uid。
-        * 一般流程这里$message应该包含用户名 密码,然后根据用户名密码从数据库中获取uid
-        * 这里只是根据时间戳生成uid,高并发下会有小概率uid冲突
-        */ 
-       $uid = self::checkUser($message);
-       // 不合法踢掉
-       if(!$uid)
-       {
-           // 踢掉
-           return GateWay::kickCurrentUser(TextProtocol::encode('uid非法'));
-       }
-       
-       $_SESSION['name'] = TextProtocol::decode($message);
-       
-       // [这步是必须的]合法,记录uid到gateway通信地址的映射
-       GateWay::storeUid($uid);
-       
-       // [这步是必须的]发送数据包到address对应的gateway,确认connection成功
-       GateWay::notifyConnectionSuccess($uid);
-       
-       Gateway::sendToCurrentUid("
-chart room login success, your uid is $uid, name is {$_SESSION['name']}
-use uid:words send message to one user
-use words send message to all\n");
-       
-       // 广播所有用户,xxx come 
-       GateWay::sendToAll(TextProtocol::encode("{$_SESSION['name']}[$uid] come"));
-   }
-   
-   /**
-    * 当用户断开连接时触发的方法
-    * @param string $address 和该用户gateway通信的地址
-    * @param integer $uid 断开连接的用户id 
-    * @return void
-    */
-   public static function onClose($uid)
-   {
-       // 删除这个用户的gateway通信地址
-       GateWay::deleteUidAddress($uid);
-       
-       // 广播 xxx 退出了
-       GateWay::sendToAll(TextProtocol::encode("{$_SESSION['name']}[$uid] logout"));
-   }
-   
-   /**
-    * 有消息时触发该方法
-    * @param int $uid 发消息的uid
-    * @param string $message 消息
-    * @return void
-    */
-   public static function onMessage($uid, $message)
-   {
-        $message_data = TextProtocol::decode($message);
-        
-        // 判断是否是私聊,私聊数据格式 uid:xxxxx
-        $explode_array = explode(':', $message, 2);
-        if(count($explode_array) > 1)
-        {
-            $to_uid = (int)$explode_array[0];
-            GateWay::sendToUid($uid, TextProtocol::encode($_SESSION['name'] . "[$uid] said said to [$to_uid] :" . $explode_array[1]));
-            return GateWay::sendToUid($to_uid, TextProtocol::encode($_SESSION['name'] . "[$uid] said to You :" . $explode_array[1]));
-        }
-        // 群聊
-        return GateWay::sendToAll(TextProtocol::encode($_SESSION['name'] . "[$uid] said :" . $message));
-   }
-   
-   
-   /**
-    * 用户第一次链接时,根据用户传递的消息(一般是用户名 密码)返回当前uid
-    * 这里只是返回了时间戳相关的一个数字,高并发会有一定的几率uid冲突
-    * @param string $message
-    * @return number
-    */
-   protected static function checkUser($message)
-   {
-       return substr(strval(microtime(true)), 3, 10)*100;
-   }
-}
+<?php
+/**
+ * 聊天逻辑,使用的协议是 文本+回车
+ * 测试方法 运行
+ * telnet ip 8480
+ * 可以开启多个telnet窗口,窗口间可以互相聊天
+ * 
+ * websocket协议的聊天室见workerman-chat及workerman-todpole
+ * @author walkor <workerman.net>
+ */
+
+use \Lib\Context;
+use \Lib\Gateway;
+use \Lib\StatisticClient;
+use \Lib\Store;
+use \Protocols\GatewayProtocol;
+use \Protocols\TextProtocol;
+
+class Event
+{
+    /**
+     * 当网关有客户端链接上来时触发,一般这里留空
+     */
+    public static function onGatewayConnect()
+    {
+        Gateway::sendToCurrentClient(TextProtocol::encode("type in your name:"));
+    }
+    
+    /**
+     * 网关有消息时,判断消息是否完整
+     */
+    public static function onGatewayMessage($buffer)
+    {
+        return TextProtocol::check($buffer);
+    }
+    
+   
+   /**
+    * 当用户断开连接时触发的方法
+    * @param integer $client_id 断开连接的用户id 
+    * @return void
+    */
+   public static function onClose($client_id)
+   {
+       // 广播 xxx 退出了
+       GateWay::sendToAll(TextProtocol::encode("{$_SESSION['name']}[$client_id] logout"));
+   }
+   
+   /**
+    * 有消息时触发该方法
+    * @param int $client_id 发消息的client_id
+    * @param string $message 消息
+    * @return void
+    */
+   public static function onMessage($client_id, $message)
+   {
+        $message_data = TextProtocol::decode($message);
+        
+        // **************如果没有$_SESSION['name']说明没有设置过用户名,进入设置用户名逻辑************
+        if(empty($_SESSION['name']))
+        {
+            $_SESSION['name'] = TextProtocol::decode($message);
+            Gateway::sendToCurrentClient("chart room login success, your client_id is $client_id, name is {$_SESSION['name']}\nuse client_id:words send message to one user\nuse words send message to all\n");
+             
+            // 广播所有用户,xxx come
+            return GateWay::sendToAll(TextProtocol::encode("{$_SESSION['name']}[$client_id] come"));
+        }
+        
+        // ********* 进入聊天逻辑 ****************
+        // 判断是否是私聊,私聊数据格式 client_id:xxxxx
+        $explode_array = explode(':', $message, 2);
+        // 私聊
+        if(count($explode_array) > 1)
+        {
+            $to_client_id = (int)$explode_array[0];
+            GateWay::sendToClient($client_id, TextProtocol::encode($_SESSION['name'] . "[$client_id] said said to [$to_client_id] :" . $explode_array[1]));
+            return GateWay::sendToClient($to_client_id, TextProtocol::encode($_SESSION['name'] . "[$client_id] said to You :" . $explode_array[1]));
+        }
+        // 群聊
+        return GateWay::sendToAll(TextProtocol::encode($_SESSION['name'] . "[$client_id] said :" . $message));
+   }
+   
+}

+ 2 - 2
applications/Demo/Lib/Context.php

@@ -35,7 +35,7 @@ class Context
      * 用户id
      * @var int
      */
-    public static $uid;
+    public static $client_id;
     
     /**
      * 编码session
@@ -67,6 +67,6 @@ class Context
      */
     public static function clear()
     {
-        self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid  = null;
+        self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$client_id  = null;
     }
 }

+ 84 - 122
applications/Demo/Lib/Gateway.php

@@ -3,7 +3,6 @@ namespace Lib;
 /**
  * 
  * 数据发送相关
- * sendToAll sendToUid
  * @author walkor <workerman.net>
  * 
  */
@@ -14,27 +13,18 @@ use \Lib\Context;
 
 class Gateway
 {
-    
     /**
      * gateway实例
      * @var object
      */
     protected static  $businessWorker = null;
     
-    /**
-     * 设置gateway实例,用于与
-     * @param unknown_type $gateway_instance
-     */
-    public static function setBusinessWorker($business_worker_instance)
-    {
-        self::$businessWorker = $business_worker_instance;
-    }
-    
    /**
-    * 向所有客户端广播消息
-    * @param string $message
+    * 向所有客户端(或者client_id_array指定的客户端)广播消息
+    * @param string $message 向客户端发送的消息(可以是二进制数据)
+    * @param array $client_id_array 客户端id数组
     */
-   public static function sendToAll($message, $uid_array = array())
+   public static function sendToAll($message, $client_id_array = array())
    {
        $pack = new GatewayProtocol();
        $pack->header['cmd'] = GatewayProtocol::CMD_SEND_TO_ALL;
@@ -43,12 +33,12 @@ class Gateway
        $pack->header['socket_id'] = Context::$socket_id;
        $pack->header['client_ip'] = Context::$client_ip;
        $pack->header['client_port'] = Context::$client_port;
-       $pack->header['uid'] = Context::$uid;
+       $pack->header['client_id'] = Context::$client_id;
        $pack->body = (string)$message;
        
-       if($uid_array)
+       if($client_id_array)
        {
-           $params = array_merge(array('N*'), $uid_array);
+           $params = array_merge(array('N*'), $client_id_array);
            $pack->ext_data = call_user_func_array('pack', $params);
        }
        
@@ -73,35 +63,35 @@ class Gateway
    }
    
    /**
-    * 向某个用户发消息
-    * @param int $uid
+    * 向某个客户端发消息
+    * @param int $client_id 客户端通过Gateway::bindClientId($client_id)绑定的client_id
     * @param string $message
     */
-   public static function sendToUid($uid, $message)
+   public static function sendToClient($clinet_id, $message)
    {
-       return self::sendCmdAndMessageToUid($uid, GatewayProtocol::CMD_SEND_TO_ONE, $message);
-   }
+       return self::sendCmdAndMessageToClient($clinet_id, GatewayProtocol::CMD_SEND_TO_ONE, $message);
+   } 
    
    /**
-    * 向当前用户发送消息
+    * 向当前客户端发送消息
     * @param string $message
     */
-   public static function sendToCurrentUid($message)
+   public static function sendToCurrentClient($message)
    {
-       return self::sendCmdAndMessageToUid(null, GatewayProtocol::CMD_SEND_TO_ONE, $message);
+       return self::sendCmdAndMessageToClient(null, GatewayProtocol::CMD_SEND_TO_ONE, $message);
    }
    
    /**
-    * 判断是否在线
-    * @param int $uid
+    * 判断某个客户端是否在线
+    * @param int $client_id
     * @return 0/1
     */
-   public static function isOnline($uid)
+   public static function isOnline($client_id)
    {
        $pack = new GatewayProtocol();
        $pack->header['cmd'] = \Protocols\GatewayProtocol::CMD_IS_ONLINE;;
-       $pack->header['uid'] = $uid;
-       $address = self::getAddressByUid($uid);
+       $pack->header['client_id'] = $client_id;
+       $address = Store::instance('gateway')->get($client_id);
        if(!$address)
        {
            return 0;
@@ -110,7 +100,7 @@ class Gateway
    }
    
    /**
-    * 获取在线状态,目前返回一个在线uid数组
+    * 获取在线状态,目前返回一个在线client_id数组
     * @return array
     */
    public static function getOnlineStatus()
@@ -142,6 +132,7 @@ class Gateway
            {
                foreach($read as $client)
                {
+                   // udp
                    if($data = json_decode(fread($client, 655350), true))
                    {
                        $status_data = array_merge($status_data, $data);
@@ -158,61 +149,74 @@ class Gateway
    }
    
    /**
-    * 将某个用户踢出
-    * @param int $uid
+    * 将某个客户端踢出
+    * @param int $client_id
     * @param string $message
     */
-   public static function kickUid($uid, $message)
+   public static function kickClient($client_id)
    {
-       if($uid === Context::$uid)
+       if($client_id === Context::$client_id)
        {
-           return self::kickCurrentUser($message);
+           return self::kickCurrentClient();
        }
        // 不是发给当前用户则使用存储中的地址
        else
        {
-           $address = self::getAddressByUid($uid);
+           $address = Store::instance('gateway')->get($client_id);
            if(!$address)
            {
                return false;
            }
-           return self::kickAddress($address['local_ip'], $address['local_port'], $address['socket_id'], $message);
+           return self::kickAddress($address['local_ip'], $address['local_port'], $address['socket_id']);
        }
    }
    
    /**
-    * 踢掉当前用户
+    * 踢掉当前客户端
     * @param string $message
     */
-   public static function kickCurrentUser($message)
+   public static function kickCurrentClient()
    {
-       return self::kickAddress(Context::$local_ip, Context::$local_port, Context::$socket_id, $message);
+       return self::kickAddress(Context::$local_ip, Context::$local_port, Context::$socket_id);
+   }
+   
+   /**
+    * 更新session,框架自动调用,开发者不要调用
+    * @param int $client_id
+    * @param string $session_str
+    */
+   public static function updateSocketSession($socket_id, $session_str)
+   {
+       $pack = new GatewayProtocol();
+       $pack->header['cmd'] = GatewayProtocol::CMD_UPDATE_SESSION;
+       $pack->header['socket_id'] = Context::$socket_id;
+       $pack->ext_data = (string)$session_str;
+       return self::sendToGateway(Context::$local_ip . ':' . Context::$local_port, $pack->getBuffer());
    }
    
-
    /**
     * 想某个用户网关发送命令和消息
-    * @param int $uid
+    * @param int $client_id
     * @param int $cmd
     * @param string $message
     * @return boolean
     */
-   public static function sendCmdAndMessageToUid($uid, $cmd , $message)
+   protected static function sendCmdAndMessageToClient($client_id, $cmd , $message)
    {
        $pack = new GatewayProtocol();
        $pack->header['cmd'] = $cmd;
        // 如果是发给当前用户则直接获取上下文中的地址
-       if($uid === Context::$uid || $uid === null)
+       if($client_id === Context::$client_id || $client_id === null)
        {
            $pack->header['local_ip'] = Context::$local_ip;
            $pack->header['local_port'] = Context::$local_port;
            $pack->header['socket_id'] = Context::$socket_id;
-           $pack->header['uid'] = Context::$uid;
+           $pack->header['client_id'] = Context::$client_id;
        }
        // 不是发给当前用户则使用存储中的地址
        else
        {
-           $address = self::getAddressByUid($uid);
+           $address = Store::instance('gateway')->get($client_id);
            if(!$address)
            {
                return false;
@@ -220,7 +224,7 @@ class Gateway
            $pack->header['local_ip'] = $address['local_ip'];
            $pack->header['local_port'] = $address['local_port'];
            $pack->header['socket_id'] = $address['socket_id'];
-           $pack->header['uid'] = $uid;
+           $pack->header['client_id'] = $client_id;
        }
        $pack->header['client_ip'] = Context::$client_ip;
        $pack->header['client_port'] = Context::$client_port;
@@ -256,6 +260,28 @@ class Gateway
        }
    }
    
+   /**
+    * 发送数据到网关
+    * @param string $address
+    * @param string $buffer
+    */
+   protected static function sendToGateway($address, $buffer)
+   {
+       // 有$businessWorker说明是workerman环境,使用$businessWorker发送数据
+       if(self::$businessWorker)
+       {
+           $connections = self::$businessWorker->getGatewayConnections();
+           if(!isset($connections[$address]))
+           {
+               $e = new \Exception("sendToGateway($address, $buffer) fail \$connections:".json_encode($connections));
+               return false;
+           }
+           return self::$businessWorker->sendToClient($buffer, $connections[$address]);
+       }
+       // 非workerman环境,使用udp发送数据
+       $client = stream_socket_client("udp://$address", $errno, $errmsg);
+       return strlen($buffer) == stream_socket_sendto($client, $buffer);
+   }
    
    /**
     * 踢掉某个网关的socket
@@ -263,9 +289,9 @@ class Gateway
     * @param int $local_port
     * @param int $socket_id
     * @param string $message
-    * @param int $uid
+    * @param int $client_id
     */
-   public static function kickAddress($local_ip, $local_port, $socket_id, $message, $uid = null)
+   protected  static function kickAddress($local_ip, $local_port, $socket_id)
    {
        $pack = new GatewayProtocol();
        $pack->header['cmd'] = GatewayProtocol::CMD_KICK;
@@ -277,83 +303,19 @@ class Gateway
            $pack->header['client_ip'] = Context::$client_ip;
            $pack->header['client_port'] = Context::$client_port;
        }
-       $pack->header['uid'] = $uid ? $uid : 0;
-       $pack->body = (string)$message;
-       
+       $pack->header['client_id'] =  0;
+       $pack->body = '';
+        
        return self::sendToGateway("{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
    }
    
    /**
-    * 存储uid的网关地址
-    * @param int $uid
+    * 设置gateway实例
+    * @param Bootstrap/Gateway $gateway_instance
     */
-   public static function storeUid($uid)
+   public static function setBusinessWorker($business_worker_instance)
    {
-       $address = array('local_ip'=>Context::$local_ip, 'local_port'=>Context::$local_port, 'socket_id'=>Context::$socket_id);
-       Store::instance('gateway')->set($uid, $address);
-   }
-   
-   /**
-    * 获取用户的网关地址
-    * @param int $uid
-    */
-   public static function getAddressByUid($uid)
-   {
-       return Store::instance('gateway')->get($uid);
-   }
-   
-   /**
-    * 删除用户的网关地址
-    * @param int $uid
-    */
-   public static function deleteUidAddress($uid)
-   {
-       return Store::instance('gateway')->delete($uid);
-   }
-   
-   /**
-    * 通知网关uid链接成功(通过验证)
-    * @param int $uid
-    */
-   public static function notifyConnectionSuccess($uid)
-   {
-       return self::sendCmdAndMessageToUid($uid, GatewayProtocol::CMD_CONNECT_SUCCESS, '');
-   }
-   
-   /**
-    * 更新session
-    * @param int $uid
-    * @param string $session_str
-    */
-   public static function updateSocketSession($socket_id, $session_str)
-   {
-       $pack = new GatewayProtocol();
-       $pack->header['cmd'] = GatewayProtocol::CMD_UPDATE_SESSION;
-       $pack->header['socket_id'] = Context::$socket_id;
-       $pack->ext_data = (string)$session_str;
-       return self::sendToGateway(Context::$local_ip . ':' . Context::$local_port, $pack->getBuffer());
-   }
-   
-   /**
-    * 发送数据到网关
-    * @param string $address
-    * @param string $buffer
-    */
-   protected static function sendToGateway($address, $buffer)
-   {
-       // 有$businessWorker说明是workerman环境,使用$businessWorker发送数据
-       if(self::$businessWorker)
-       {
-           $connections = self::$businessWorker->getGatewayConnections();
-           if(!isset($connections[$address]))
-           {
-               $e = new \Exception("sendToGateway($address, $buffer) fail \$connections:".json_encode($connections));
-               return false;
-           }
-           return self::$businessWorker->sendToClient($buffer, $connections[$address]);
-       }
-       // 非workerman环境,使用udp发送数据
-       $client = stream_socket_client("udp://$address", $errno, $errmsg);
-       return strlen($buffer) == stream_socket_sendto($client, $buffer);
+       self::$businessWorker = $business_worker_instance;
    }
+ 
 }

+ 20 - 0
applications/Demo/Lib/StoreDriver/File.php

@@ -95,6 +95,26 @@ class File
     }
     
     /**
+     * 自增
+     * @param string $key
+     * @return boolean|multitype:
+     */
+    public function increment($key)
+    {
+        flock($this->dataFileHandle, LOCK_EX);
+        $this->readDataFromDisk();
+        if(!isset($this->dataCache[$key]))
+        {
+            flock($this->dataFileHandle, LOCK_UN);
+            return false;
+        }
+        $this->dataCache[$key] ++;
+        $this->writeToDisk();
+        flock($this->dataFileHandle, LOCK_UN);
+        return $this->dataCache[$key];
+    }
+    
+    /**
      * 写入磁盘
      * @return number
      */

+ 6 - 12
applications/Demo/Protocols/GatewayProtocol.php

@@ -12,7 +12,7 @@ namespace Protocols;
  *     unsigned int        socket_id,
  *     unsigned int        client_ip,
  *     unsigned short    client_port,
- *     unsigned int        uid,
+ *     unsigned int        client_id,
  *     unsigned int        ext_len,
  *     char[ext_len]        ext_data,
  *     char[pack_length-HEAD_LEN] body//包体
@@ -27,10 +27,7 @@ class GatewayProtocol
     // 发给worker,gateway有一个新的连接
     const CMD_ON_GATEWAY_CONNECTION = 1;
     
-    // 发给worker的未绑定socket上有消息事件
-    const CMD_ON_CONNECTION = 2;
-    
-    // 发给worker的绑定socket上有消息事件
+    // 发给worker的,客户端有消息
     const CMD_ON_MESSAGE = 3;
     
     // 发给worker上的关闭链接事件
@@ -45,9 +42,6 @@ class GatewayProtocol
     // 发给gateway的踢出用户
     const CMD_KICK = 7;
     
-    // 发给gateway的通知用户(通过验证)链接成功,绑定uid gid socketid
-    const CMD_CONNECT_SUCCESS = 8;
-    
     // 发给gateway,通知用户session更改
     const CMD_UPDATE_SESSION = 9;
     
@@ -75,7 +69,7 @@ class GatewayProtocol
         'socket_id'      => 0,
         'client_ip'        => '',
         'client_port'    => 0,
-        'uid'                => 0,
+        'client_id'        => 0,
         'ext_len'          => 0,
     );
     
@@ -83,7 +77,7 @@ class GatewayProtocol
      * 扩展数据,
      * gateway发往worker时这里存储的是session字符串
      * worker发往gateway时,并且CMD_UPDATE_SESSION时存储的是session字符串
-     * worker发往gateway时,并且CMD_SEND_TO_ALL时存储的是接收的uid序列,可能是空(代表向所有人发)
+     * worker发往gateway时,并且CMD_SEND_TO_ALL时存储的是接收的client_id序列,可能是空(代表向所有人发)
      * @var string
      */
     public $ext_data = '';
@@ -141,7 +135,7 @@ class GatewayProtocol
                         $this->header['cmd'], ip2long($this->header['local_ip']), 
                         $this->header['local_port'], $this->header['socket_id'], 
                         ip2long($this->header['client_ip']), $this->header['client_port'], 
-                        $this->header['uid'],
+                        $this->header['client_id'],
                        $this->header['ext_len']) . $this->ext_data . $this->body;
     }
     
@@ -152,7 +146,7 @@ class GatewayProtocol
      */    
     protected static function decode($buffer)
     {
-        $data = unpack("Npack_len/Ccmd/Nlocal_ip/nlocal_port/Nsocket_id/Nclient_ip/nclient_port/Nuid/Next_len", $buffer);
+        $data = unpack("Npack_len/Ccmd/Nlocal_ip/nlocal_port/Nsocket_id/Nclient_ip/nclient_port/Nclient_id/Next_len", $buffer);
         $data['local_ip'] = long2ip($data['local_ip']);
         $data['client_ip'] = long2ip($data['client_ip']);
         if($data['ext_len'] > 0)

+ 14 - 4
applications/Demo/README.md

@@ -1,11 +1,21 @@
 基于TCP的一个聊天的Demo,该架构适用于绝大部分即时通讯应用,如PC\手机app IM、游戏后台、企业通讯软件、与硬件通讯等
 =========
 
+注意:强烈建议生产环境包括压测环境使用memcache,配置方法如下:
+========
+安装memcahced服务,例如 unbuntu 运行sudo apt-get install memcached  
+启动memcached ,例如 ubuntu 运行 memcached -m 256 -p 22322 -u memcache -l 127.0.0.1 -d  
+安装memcache扩展,例如 ubuntu 运行 sudo apt-get install php5-memcache  
+设置 applications/XXX/Config/Store.php 中的 public static $driver = self::DRIVER_MC;public static $gateway = array('127.0.0.1:22322');   
+重启workerman  
+
 ### Demo测试方法 
   * 运行 telnet ip 8480
   * 首先输入昵称 回车
   * 后面直接打字回车是向所有人发消息
-  * $uid:xxxxxx 是向$uid用户发送消息
+  * $uid:xxxxxx 是向$uid用户发送消息  
+
+可以开多个telnet窗口,窗口间可以实时聊天
 
 目录结构
 ========
@@ -40,11 +50,11 @@
 │   │
 │   ├── GatewayProtocol.php  // gateway与BusinessWorker通讯的协议,开发者无需关注
 │   │
-│   ├── TextProtocol.php     // 简单的文本协议
+│   ├── TextProtocol.php     // 简单的文本协议(applications/Demo中用到)
 │   │
-│   ├── JsonProtocol.php     // 间断的json协议
+│   ├── JsonProtocol.php     // json协议(还没有例子使用)
 │   │
-│   └── WebSocket.php        // WebSocket协议
+│   └── WebSocket.php        // WebSocket协议(workerman-chat使用)
 │ 
 │ 
 └── Event.php // 聊天所有的业务代码在此目录,群聊、私聊等

+ 4 - 4
applications/Statistics/Bootstrap/StatisticProvider.php

@@ -106,18 +106,18 @@ class StatisticProvider extends Man\Core\SocketWorker
      * udp 默认全部接收完毕
      * @see Man\Core.SocketWorker::dealInput()
      */
-    public function dealInput($recv_str)
+    public function dealInput($recv_buffer)
     {
         return 0;
     }
     
     /**
      * 处理请求统计
-     * @param string $recv_str
+     * @param string $recv_buffer
      */
-    public function dealProcess($recv_str)
+    public function dealProcess($recv_buffer)
     {
-        $req_data = json_decode(trim($recv_str), true);
+        $req_data = json_decode(trim($recv_buffer), true);
         $module = $req_data['module'];
         $interface = $req_data['interface'];
         $cmd = $req_data['cmd'];

+ 3 - 3
applications/Statistics/Bootstrap/StatisticWorker.php

@@ -64,7 +64,7 @@ class StatisticWorker extends Man\Core\SocketWorker
      * udp 默认全部接收完毕
      * @see Man\Core.SocketWorker::dealInput()
      */
-    public function dealInput($recv_str)
+    public function dealInput($recv_buffer)
     {
         return 0;
     }
@@ -73,10 +73,10 @@ class StatisticWorker extends Man\Core\SocketWorker
      * 业务处理
      * @see Man\Core.SocketWorker::dealProcess()
      */
-    public function dealProcess($recv_str)
+    public function dealProcess($recv_buffer)
     {
         // 解码
-        $unpack_data = StatisticProtocol::decode($recv_str);
+        $unpack_data = StatisticProtocol::decode($recv_buffer);
         $module = $unpack_data['module'];
         $interface = $unpack_data['interface'];
         $cost_time = $unpack_data['cost_time'];

+ 1 - 1
workerman/Common/Monitor.php

@@ -159,7 +159,7 @@ class Monitor extends Man\Core\SocketWorker
      * 确定包是否完整
      * @see Worker::dealInput()
      */
-    public function dealInput($recv_str)
+    public function dealInput($recv_buffer)
     {
         return 0;
     }

+ 8 - 8
workerman/Common/WebServer.php

@@ -137,22 +137,22 @@ class WebServer extends Man\Core\SocketWorker
      * 确定数据是否接收完整
      * @see Man\Core.SocketWorker::dealInput()
      */
-    public function dealInput($recv_str)
+    public function dealInput($recv_buffer)
     {
-        return Man\Common\Protocols\Http\http_input($recv_str);
+        return Man\Common\Protocols\Http\http_input($recv_buffer);
     }
 
     /**
      * 数据接收完整后处理业务逻辑
      * @see Man\Core.SocketWorker::dealProcess()
      */
-    public function dealProcess($recv_str)
+    public function dealProcess($recv_buffer)
     {
          // http请求处理开始。解析http协议,生成$_POST $_GET $_COOKIE
-        Man\Common\Protocols\Http\http_start($recv_str);
+        Man\Common\Protocols\Http\http_start($recv_buffer);
         
         // 记录访问日志
-        $this->logAccess($recv_str);
+        $this->logAccess($recv_buffer);
         
         // 请求的文件
         $url_info = parse_url($_SERVER['REQUEST_URI']);
@@ -302,12 +302,12 @@ class WebServer extends Man\Core\SocketWorker
     
     /**
      * 记录访问日志
-     * @param unknown_type $recv_str
+     * @param unknown_type $recv_buffer
      */
-    public function logAccess($recv_str)
+    public function logAccess($recv_buffer)
     {
         // 记录访问日志
-        $log_data = date('Y-m-d H:i:s') . "\t REMOTE:" . $this->getRemoteAddress()."\n$recv_str";
+        $log_data = date('Y-m-d H:i:s') . "\t REMOTE:" . $this->getRemoteAddress()."\n$recv_buffer";
         if(isset(self::$accessLog[$_SERVER['HTTP_HOST']]))
         {
             file_put_contents(self::$accessLog[$_SERVER['HTTP_HOST']], $log_data, FILE_APPEND);

+ 1 - 1
workerman/Core/Master.php

@@ -33,7 +33,7 @@ class Master
      * 版本
      * @var string
      */
-    const VERSION = '2.1.2';
+    const VERSION = '2.1.3';
     
     /**
      * 服务名

+ 1 - 1
workerman/Core/SocketWorker.php

@@ -377,7 +377,7 @@ abstract class SocketWorker extends AbstractWorker
             if(!empty($this->recvBuffers[$fd]['buf']))
             {
                 $this->statusInfo['send_fail']++;
-                $this->notice("CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".var_export($this->recvBuffers[$fd]['buf'],true)."]\n");
+                $this->notice("CLIENT_CLOSE\nCLIENT_IP:".$this->getRemoteIp()."\nBUFFER:[".bin2hex($this->recvBuffers[$fd]['buf'])."]\n");
             }
             
             // 关闭链接

+ 2 - 2
workerman/conf/conf.d/BusinessWorker.conf

@@ -6,8 +6,8 @@ listen = tcp://0.0.0.0:8483
 start_workers = 5
 ;以哪个用户运行该进程,为了安全请使用权限较低的用户,例如www-data nobody
 user = root
-;请求到来时预读长度,这里固定27
-preread_length = 27
+;请求到来时预读长度,这里固定29
+preread_length = 29
 ;设置最大请求数,超过这个请求数后会安全重启该进程(主要是避免因业务代码不规范导致的内存泄露)
 max_requests=10000
 ;必须是长链接