浏览代码

增加聊天的demo

liangl 11 年之前
父节点
当前提交
4e323fbd91

+ 299 - 0
applications/ChatDemo/Bootstrap/Gateway.php

@@ -0,0 +1,299 @@
+<?php
+/**
+ * 
+ * 暴露给客户端的连接网关 只负责网络io
+ * 1、监听客户端连接
+ * 2、监听后端回应并转发回应给前端
+ * 
+ * @author walkor <worker-man@qq.com>
+ * 
+ */
+define('ROOT_DIR', realpath(__DIR__.'/../'));
+require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
+require_once ROOT_DIR . '/Lib/Store.php';
+
+class Gateway extends Man\Core\SocketWorker
+{
+    /**
+     * 内部通信socket
+     * @var resouce
+     */
+    protected $innerMainSocket = null;
+    
+    /**
+     * 内网ip
+     * @var string
+     */
+    protected $lanIp = '127.0.0.1';
+    
+    
+    /**
+     * 内部通信端口
+     * @var int
+     */
+    protected $lanPort = 0;
+    
+    /**
+     * uid到连接的映射
+     * @var array
+     */
+    protected $uidConnMap = array();
+    
+    /**
+     * 连接到uid的映射
+     * @var array
+     */
+    protected $connUidMap = array();
+    
+    
+    /**
+     * 到Worker的通信地址
+     * @var array
+     */ 
+    protected $workerAddresses = array();
+    
+    /**
+     * 进程启动
+     */
+    public function start()
+    {
+        // 安装信号处理函数
+        $this->installSignal();
+        
+        // 添加accept事件
+        $ret = $this->event->add($this->mainSocket,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'accept'));
+        
+        // 创建内部通信套接字
+        $start_port = Man\Core\Lib\Config::get($this->workerName.'.lan_port_start');
+        $this->lanPort = $start_port - posix_getppid() + posix_getpid();
+        $this->lanIp = Man\Core\Lib\Config::get($this->workerName.'.lan_ip');
+        if(!$this->lanIp)
+        {
+            $this->notice($this->workerName.'.lan_ip not set');
+            $this->lanIp = '127.0.0.1';
+        }
+        $error_no = 0;
+        $error_msg = '';
+        $this->innerMainSocket = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no, $error_msg, STREAM_SERVER_BIND);
+        if(!$this->innerMainSocket)
+        {
+            $this->notice('create innerMainSocket fail and exit '.$error_no . ':'.$error_msg);
+            sleep(1);
+            exit(0);
+        }
+        else
+        {
+            stream_set_blocking($this->innerMainSocket , 0);
+        }
+        
+        $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort);
+        
+        // 添加读udp事件
+        $this->event->add($this->innerMainSocket,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
+        
+        // 初始化到worker的通信地址
+        $this->initWorkerAddresses();
+        
+        // 主体循环,整个子进程会阻塞在这个函数上
+        $ret = $this->event->loop();
+        $this->notice('worker loop exit');
+        exit(0);
+    }
+    
+    /**
+     * 存储全局的通信地址 
+     * @param string $address
+     */
+    protected function registerAddress($address)
+    {
+        \Man\Core\Lib\Mutex::get();
+        $key = 'GLOBAL_GATEWAY_ADDRESS';
+        $addresses_list = Store::get($key);
+        if(empty($addresses_list))
+        {
+            $addresses_list = array();
+        }
+        
+        $addresses_list[$address] = $address;
+        Store::set($key, $addresses_list);
+        \Man\Core\Lib\Mutex::release();
+    }
+    
+    /**
+     * 接收Udp数据
+     * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
+     * @param resource $socket
+     * @param $null_one $flag
+     * @param $null_two $base
+     * @return void
+     */
+    public function recvUdp($socket, $null_one = null, $null_two = null)
+    {
+        $data = stream_socket_recvfrom($socket , self::MAX_UDP_PACKEG_SIZE, 0, $address);
+        // 惊群效应
+        if(false === $data || empty($address))
+        {
+            return false;
+        }
+         
+        $this->currentClientAddress = $address;
+       
+        $this->innerDealProcess($data);
+    }
+    
+    protected function initWorkerAddresses()
+    {
+        $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.game_worker');
+        if(!$this->workerAddresses)
+        {
+            $this->notice($this->workerName.'game_worker not set');
+        }
+    }
+    
+    public function dealInput($recv_str)
+    {
+        return 0;
+    }
+
+    public function innerDealProcess($recv_str)
+    {
+        $pack = new GatewayProtocol($recv_str);
+        
+        switch($pack->header['cmd'])
+        {
+            case GatewayProtocol::CMD_SEND_TO_ONE:
+                return $this->sendToSocketId($pack->header['socket_id'], $pack->body);
+            case GatewayProtocol::CMD_KICK:
+                if($pack->body)
+                {
+                    $this->sendToSocketId($pack->header['socket_id'], $pack->body);
+                }
+                return $this->closeClientBySocketId($pack->header['socket_id']);
+            case GatewayProtocol::CMD_SEND_TO_ALL:
+                return $this->broadCast($pack->body);
+            case GatewayProtocol::CMD_CONNECT_SUCCESS:
+                return $this->connectSuccess($pack->header['socket_id'], $pack->header['uid']);
+            default :
+                $this->notice('gateway inner pack cmd err data:' .$recv_str );
+        }
+    }
+    
+    protected function broadCast($bin_data)
+    {
+        foreach($this->uidConnMap as $uid=>$conn)
+        {
+            $this->sendToSocketId($conn, $bin_data);
+        }
+    }
+    
+    protected function closeClientBySocketId($socket_id)
+    {
+        if($uid = $this->getUidByFd($socket_id))
+        {
+            unset($this->uidConnMap[$uid], $this->connUidMap[$socket_id]);
+        }
+        parent::closeClient($socket_id);
+    }
+    
+    protected function getFdByUid($uid)
+    {
+        if(isset($this->uidConnMap[$uid]))
+        {
+            return $this->uidConnMap[$uid];
+        }
+        return 0;
+    }
+    
+    protected function getUidByFd($fd)
+    {
+        if(isset($this->connUidMap[$fd]))
+        {
+            return $this->connUidMap[$fd];
+        }
+        return 0;
+    }
+    
+    protected function connectSuccess($socket_id, $uid)
+    {
+        $binded_uid = $this->getUidByFd($socket_id);
+        if($binded_uid)
+        {
+            $this->notice('notify connection success fail ' . $socket_id . ' already binded data:'.serialize($data));
+            return;
+        }
+        $this->uidConnMap[$uid] = $socket_id;
+        $this->connUidMap[$socket_id] = $uid;
+    }
+    
+    public function sendToSocketId($socket_id, $bin_data)
+    {
+        if(!isset($this->connections[$socket_id]))
+        {
+            return false;
+        }
+        $this->currentDealFd = $socket_id;
+        return $this->sendToClient($bin_data);
+    }
+
+    protected function closeClient($fd)
+    {
+        if($uid = $this->getUidByFd($fd))
+        {
+            $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $fd);
+            unset($this->uidConnMap[$uid], $this->connUidMap[$fd]);
+        }
+        parent::closeClient($fd);
+    }
+    
+    public function dealProcess($recv_str)
+    {
+        // 判断用户是否认证过
+        $from_uid = $this->getUidByFd($this->currentDealFd);
+        // 触发ON_CONNECTION
+        if(!$from_uid)
+        {
+            return $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $this->currentDealFd, $recv_str);
+        }
+        
+        // 认证过, 触发ON_MESSAGE
+        $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $this->currentDealFd, $recv_str);
+    }
+    
+    protected function sendToWorker($cmd, $socket_id, $body = '')
+    {
+        $address= $this->getRemoteAddress($socket_id);
+        list($client_ip, $client_port) = explode(':', $address, 2);
+        $pack = new GatewayProtocol();
+        $pack->header['cmd'] = $cmd;
+        $pack->header['local_ip'] = $this->lanIp;
+        $pack->header['local_port'] = $this->lanPort;
+        $pack->header['socket_id'] = $socket_id;
+        $pack->header['client_ip'] = $client_ip;
+        $pack->header['client_port'] = $client_ip;
+        $pack->header['uid'] = $this->getUidByFd($socket_id);
+        $pack->body = $body;
+        return $this->sendBufferToWorker($pack->getBuffer());
+    }
+    
+    protected function sendBufferToWorker($bin_data)
+    {
+        $client = stream_socket_client($this->workerAddresses[array_rand($this->workerAddresses)]);
+        $len = stream_socket_sendto($client, $bin_data);
+        return $len == strlen($bin_data);
+    }
+    
+    protected function notice($str, $display=true)
+    {
+        $str = 'Worker['.get_class($this).']:'."$str ip:".$this->getRemoteIp();
+        Man\Core\Lib\Log::add($str);
+        if($display && Man\Core\Lib\Config::get('workerman.debug') == 1)
+        {
+            echo $str."\n";
+        }
+    }
+    
+    public function onStop()
+    {
+        Store::deleteAll();
+    }
+}

+ 64 - 0
applications/ChatDemo/Bootstrap/Worker.php

@@ -0,0 +1,64 @@
+<?php
+/**
+ * 
+ * 处理具体逻辑
+ * 
+ * @author walkor <worker-man@qq.com>
+ * 
+ */
+define('ROOT_DIR', realpath(__DIR__.'/../'));
+require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
+require_once ROOT_DIR . '/Event.php';
+
+class Worker extends Man\Core\SocketWorker
+{
+    public function dealInput($recv_str)
+    {
+        return GatewayProtocol::input($recv_str); 
+    }
+
+    public function dealProcess($recv_str)
+    {
+        $pack = new GatewayProtocol($recv_str);
+        Context::$client_ip = $pack->header['client_ip'];
+        Context::$client_port = $pack->header['client_port'];
+        Context::$local_ip = $pack->header['local_ip'];
+        Context::$local_port = $pack->header['local_port'];
+        Context::$socket_id = $pack->header['socket_id'];
+        Context::$uid = $pack->header['uid'];
+        switch($pack->header['cmd'])
+        {
+            case GatewayProtocol::CMD_ON_CONNECTION:
+                $ret = call_user_func_array(array('Event', 'onConnect'), array($pack->body));
+                break;
+            case GatewayProtocol::CMD_ON_MESSAGE:
+                $ret = call_user_func_array(array('Event', 'onMessage'), array(Context::$uid, $pack->body));
+                break;
+            case GatewayProtocol::CMD_ON_CLOSE:
+                $ret = call_user_func_array(array('Event', 'onClose'), array(Context::$uid));
+                break;
+        }
+        Context::clear();
+        return $ret;
+    }
+}
+
+/**
+ * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
+ * @author walkor
+ *
+ */
+class Context
+{
+    public static $series_id;
+    public static $local_ip;
+    public static $local_port;
+    public static $socket_id;
+    public static $client_ip;
+    public static $client_port;
+    public static $uid;
+    public static function clear()
+    {
+        self::$series_id = self::$local_ip = self::$local_port = self::$socket_id = self::$client_ip = self::$client_port = self::$uid = null;
+    }
+}

+ 90 - 0
applications/ChatDemo/Event.php

@@ -0,0 +1,90 @@
+<?php
+/**
+ * 
+ * 
+ * @author walkor <workerman.net>
+ * 
+ */
+
+require_once ROOT_DIR . '/Lib/Gateway.php';
+
+class Event
+{
+   /**
+    * 用户连接gateway后第一次发包会触发此方法
+    * @param string $message 一般是传递的账号密码等信息
+    * @return void
+    */
+   public static function onConnect($message)
+   {
+       // 通过message验证用户,并获得uid
+       $uid = self::checkUser($message);
+       // 不合法踢掉
+       if(!$uid)
+       {
+           // 返回失败
+           return GateWay::kickCurrentUser('登录失败');
+       }
+       
+       // [这步是必须的]合法,记录uid到gateway通信地址的映射
+       GateWay::storeUid($uid);
+       
+       // [这步是必须的]发送数据包到address对应的gateway,确认connection成功
+       GateWay::notifyConnectionSuccess($uid);
+       
+       // 向当前用户发送uid
+       GateWay::sendToCurrentUid(json_encode(array('uid'=>$uid))."\n");
+       
+       // 广播所有用户,xxx connected
+       GateWay::sendToAll(json_encode(array('from_uid'=>'SYSTEM', 'message'=>"$uid come \n", 'to_uid'=>'all'))."\n");
+   }
+   
+   /**
+    * 当用户断开连接时触发的方法
+    * @param string $address 和该用户gateway通信的地址
+    * @param integer $uid 断开连接的用户id 
+    * @return void
+    */
+   public static function onClose($uid)
+   {
+       // [这步是必须的]删除这个用户的gateway通信地址
+       GateWay::deleteUidAddress($uid);
+       
+       // 广播 xxx 退出了
+       GateWay::sendToAll(json_encode(array('from_uid'=>'SYSTEM', 'message'=>"$uid logout\n", 'to_uid'=>'all'))."\n");
+   }
+   
+   /**
+    * 有消息时触发该方法
+    * @param int $uid 发消息的uid
+    * @param string $message 消息
+    * @return void
+    */
+   public static function onMessage($uid, $message)
+   {
+        $message_data = json_decode($message, true);
+        
+        // 向所有人发送
+        if($message_data['to_uid'] == 'all')
+        {
+            return GateWay::sendToAll($message);
+        }
+        // 向某个人发送
+        else
+        {
+            return GateWay::sendToUid($message_data['to_uid'], $message);
+        }
+   }
+   
+   
+   /**
+    * 用户第一次链接时,根据用户传递的消息(一般是用户名 密码)返回当前uid
+    * 这里只是返回了时间戳相关的一个数字
+    * @param string $message
+    * @return number
+    */
+   protected static function checkUser($message)
+   {
+       return substr(strval(microtime(true)), 3, 10)*100;
+   }
+}

+ 205 - 0
applications/ChatDemo/Lib/Gateway.php

@@ -0,0 +1,205 @@
+<?php
+/**
+ * 
+ * 数据发送相关
+ * sendToAll sendToUid
+ * @author walkor <worker-man@qq.com>
+ * 
+ */
+
+require_once ROOT_DIR . '/Lib/Store.php';
+
+class GateWay
+{
+   /**
+    * 向所有客户端广播消息
+    * @param string $message
+    */
+   public static function sendToAll($message)
+   {
+       $pack = new GatewayProtocol();
+       $pack->header['cmd'] = GatewayProtocol::CMD_SEND_TO_ALL;
+       $pack->header['series_id'] = 0;
+       $pack->header['local_ip'] = Context::$local_ip;
+       $pack->header['local_port'] = Context::$local_port;
+       $pack->header['socket_id'] = Context::$socket_id;
+       $pack->header['client_ip'] = Context::$client_ip;
+       $pack->header['client_port'] = Context::$client_port;
+       $pack->header['uid'] = Context::$uid;
+       $pack->body = (string)$message;
+       $buffer = $pack->getBuffer();
+       $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS');
+       foreach($all_addresses as $address)
+       {
+           self::sendToGateway($address, $buffer);
+       }
+   }
+   
+   /**
+    * 向某个用户发消息
+    * @param int $uid
+    * @param string $message
+    */
+   public static function sendToUid($uid, $message, $raw_data = false)
+   {
+       return self::sendCmdAndMessageToUid($uid, GatewayProtocol::CMD_SEND_TO_ONE, $message, $raw_data);
+   }
+   
+   /**
+    * 向当前用户发送消息
+    * @param string $message
+    */
+   public static function sendToCurrentUid($message, $raw_data = false)
+   {
+       return self::sendCmdAndMessageToUid(null, GatewayProtocol::CMD_SEND_TO_ONE, $message, $raw_data);
+   }
+   
+   /**
+    * 将某个用户踢出
+    * @param int $uid
+    * @param string $message
+    */
+   public static function kickUid($uid, $message)
+   {
+       if($uid === Context::$uid)
+       {
+           return self::kickCurrentUser($message);
+       }
+       // 不是发给当前用户则使用存储中的地址
+       else
+       {
+           $address = self::getAddressByUid($uid);
+           if(!$address)
+           {
+               return false;
+           }
+           return self::kickAddress($address['local_ip'], $address['local_port'], $address['socket_id'], $message);
+       }
+   }
+   
+   /**
+    * 踢掉当前用户
+    * @param string $message
+    */
+   public static function kickCurrentUser($message)
+   {
+       return self::kickAddress(Context::$local_ip, Context::$local_port, Context::$socket_id, $message);
+   }
+   
+
+   /**
+    * 想某个用户网关发送命令和消息
+    * @param int $uid
+    * @param int $cmd
+    * @param string $message
+    * @return boolean
+    */
+   public static function sendCmdAndMessageToUid($uid, $cmd , $message, $raw_data = false)
+   {
+       $pack = new GatewayProtocol();
+       $pack->header['cmd'] = $cmd;
+       $pack->header['series_id'] = Context::$series_id > 0 ? Context::$series_id : 0;
+       // 如果是发给当前用户则直接获取上下文中的地址
+       if($uid === Context::$uid || $uid === null)
+       {
+           $pack->header['local_ip'] = Context::$local_ip;
+           $pack->header['local_port'] = Context::$local_port;
+           $pack->header['socket_id'] = Context::$socket_id;
+       }
+       // 不是发给当前用户则使用存储中的地址
+       else
+       {
+           $address = self::getAddressByUid($uid);
+           if(!$address)
+           {
+               return false;
+           }
+           $pack->header['local_ip'] = $address['local_ip'];
+           $pack->header['local_port'] = $address['local_port'];
+           $pack->header['socket_id'] = $address['socket_id'];
+       }
+       $pack->header['client_ip'] = Context::$client_ip;
+       $pack->header['client_port'] = Context::$client_port;
+       $pack->header['uid'] = empty($uid) ? 0 : $uid;
+       $pack->body = (string)$message;
+        
+       return self::sendToGateway("udp://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
+   }
+   
+   
+   /**
+    * 踢掉某个网关的socket
+    * @param string $local_ip
+    * @param int $local_port
+    * @param int $socket_id
+    * @param string $message
+    * @param int $uid
+    */
+   public static function kickAddress($local_ip, $local_port, $socket_id, $message, $uid = null)
+   {
+       $pack = new GatewayProtocol();
+       $pack->header['cmd'] = GatewayProtocol::CMD_KICK;
+       $pack->header['series_id'] = Context::$series_id > 0 ? Context::$series_id : 0;
+       $pack->header['local_ip'] = $local_ip;
+       $pack->header['local_port'] = $local_port;
+       $pack->header['socket_id'] = $socket_id;
+       if(null !== Context::$client_ip)
+       {
+           $pack->header['client_ip'] = Context::$client_ip;
+           $pack->header['client_port'] = Context::$client_port;
+       }
+       $pack->header['uid'] = $uid ? $uid : 0;
+       $pack->body = (string)$message;
+       
+       return self::sendToGateway("udp://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
+   }
+   
+   /**
+    * 存储uid的网关地址
+    * @param int $uid
+    */
+   public static function storeUid($uid)
+   {
+       $address = array('local_ip'=>Context::$local_ip, 'local_port'=>Context::$local_port, 'socket_id'=>Context::$socket_id);
+       Store::set($uid, $address);
+   }
+   
+   /**
+    * 获取用户的网关地址
+    * @param int $uid
+    */
+   public static function getAddressByUid($uid)
+   {
+       return Store::get($uid);
+   }
+   
+   /**
+    * 删除用户的网关地址
+    * @param int $uid
+    */
+   public static function deleteUidAddress($uid)
+   {
+       return Store::delete($uid);
+   }
+   
+   /**
+    * 通知网关uid链接成功(通过验证)
+    * @param int $uid
+    */
+   public static function notifyConnectionSuccess($uid)
+   {
+       return self::sendCmdAndMessageToUid($uid, GatewayProtocol::CMD_CONNECT_SUCCESS, '');
+   }
+   
+   /**
+    * 发送数据到网关
+    * @param string $address
+    * @param string $buffer
+    */
+   public static function sendToGateway($address, $buffer)
+   {
+       $client = stream_socket_client($address);
+       $len = stream_socket_sendto($client, $buffer);
+       return $len == strlen($buffer);
+   }
+}

+ 88 - 0
applications/ChatDemo/Lib/Store.php

@@ -0,0 +1,88 @@
+<?php
+/**
+ * 
+ * 这里用php数组文件来存储数据,
+ * 为了获取高性能需要用类似memcache、redis的存储
+ * @author walkor <worker-man@qq.com>
+ * 
+ */
+
+class Store
+{
+    // 为了避免频繁读取磁盘,增加了缓存机制
+    protected static $dataCache = array();
+    // 上次缓存时间
+    protected static $lastCacheTime = 0;
+    // 保存数据的文件相对与WORKERMAN_LOG_DIR目录目录
+    protected static $dataFile = 'data.php';
+    // 打开文件的句柄
+    protected static $dataFileHandle = null;
+    
+    // 缓存过期时间
+    const CACHE_EXP_TIME = 1;
+    
+    public static function set($key, $value, $ttl = 0)
+    {
+        self::readDataFromDisk();
+        self::$dataCache[$key] = $value;
+        return self::writeToDisk();
+    }
+    
+    public static function get($key, $use_cache = true)
+    {
+        if(time() - self::$lastCacheTime > self::CACHE_EXP_TIME)
+        {
+            self::readDataFromDisk();
+        }
+        return isset(self::$dataCache[$key]) ? self::$dataCache[$key] : null;
+    }
+   
+    public static function delete($key)
+    {
+        self::readDataFromDisk();
+        unset(self::$dataCache[$key]);
+        return self::writeToDisk();
+    }
+    
+    public static function deleteAll()
+    {
+        self::$dataCache = array();
+        self::writeToDisk();
+    }
+   
+    protected static function writeToDisk()
+    {
+        $data_file = WORKERMAN_LOG_DIR . self::$dataFile;
+        if(!self::$dataFileHandle)
+        {
+            if(!is_file($data_file))
+            {
+                touch($data_file);
+            }
+            self::$dataFileHandle = fopen($data_file, 'r+');
+            if(!self::$dataFileHandle)
+            {
+                return false;
+            }
+        }
+        flock(self::$dataFileHandle, LOCK_EX);
+        $ret = file_put_contents($data_file, "<?php \n return " . var_export(self::$dataCache, true). ';');
+        flock(self::$dataFileHandle, LOCK_UN);
+        return $ret;
+    }
+    
+    protected static function readDataFromDisk()
+    {
+        $data_file = WORKERMAN_LOG_DIR . self::$dataFile;
+        if(!is_file($data_file))
+        {
+            touch($data_file);
+        }
+        $cache = include WORKERMAN_LOG_DIR . self::$dataFile;
+        if(is_array($cache))
+        {
+            self::$dataCache = $cache;
+        }
+        self::$lastCacheTime = time();
+    }
+}

+ 52 - 0
applications/ChatDemo/Lib/Store.php.for-memcache

@@ -0,0 +1,52 @@
+<?php
+/**
+ * 
+ * 
+ * @author walkor <worker-man@qq.com>
+ * 
+ */
+
+class Store
+{
+    protected static $instance = null;
+    
+    public static function connect()
+    {
+        if(!self::$instance)
+        {
+            self::$instance = new Memcache;
+            self::$instance->addServer('127.0.0.1', 11211);
+        }
+        return self::$instance;
+    }
+    
+    public static function set($key, $value, $ttl = 0)
+    {
+        if(self::connect())
+        {
+            return self::$instance->set($key, $value, $ttl);
+        }
+        return false;
+    }
+    
+    public static function get($key)
+    {
+        if(self::connect())
+        {
+            return self::$instance->get($key);
+        }
+        return false;
+    }
+   
+    public static function delete($key)
+    {
+        if(self::connect())
+        {
+            return self::$instance->delete($key);
+        }
+        return false;
+    }
+    
+    public static function deleteAll(){}
+   
+}

+ 175 - 0
applications/ChatDemo/Protocols/GatewayProtocol.php

@@ -0,0 +1,175 @@
+<?php 
+/**
+ * 二进制协议
+ * 
+ * struct GatewayProtocol
+ * {
+ *     unsigned short    series_id,//序列号 udp协议使用
+ *     unsigned char     cmd,//命令字
+ *     unsigned int      local_ip,
+ *     unsigned short    local_port,
+ *     unsigned int      socket_id,
+ *     unsigned int      client_ip,
+ *     unsigned short    client_port,
+ *     unsigned int      pack_len,
+ *     unsigned int      uid,
+ *     char[pack_length-HEAD_LEN] body//包体
+ * }
+ * 
+ * 
+ * @author walkor <worker-man@qq.com>
+ */
+
+class GatewayProtocol
+{
+    // 发给worker上的链接事件
+    const CMD_ON_CONNECTION = 1;
+    
+    // 发给worker上的有消息可读事件
+    const CMD_ON_MESSAGE = 2;
+    
+    // 发给worker上的关闭链接事件
+    const CMD_ON_CLOSE = 3;
+    
+    // 发给gateway的向单个用户发送数据
+    const CMD_SEND_TO_ONE = 4;
+    
+    // 发给gateway的向所有用户发送数据
+    const CMD_SEND_TO_ALL = 5;
+    
+    // 发给gateway的踢出用户
+    const CMD_KICK = 6;
+    
+    // 发给gateway的通知用户(通过验证)链接成功
+    const CMD_CONNECT_SUCCESS = 7;
+    
+    /**
+     * 包头长度
+     * @var integer
+     */
+    const HEAD_LEN = 27;
+     
+    /**
+     * 序列号,防止串包
+     * @var integer
+     */
+    protected static $seriesId = 0;
+    
+    /**
+     * 协议头
+     * @var array
+     */
+    public $header = array(
+        'cmd'            => 0,
+        'series_id'      => 0,
+        'local_ip'       => '',
+        'local_port'     => 0,
+        'socket_id'      => 0,
+        'client_ip'      => '',
+        'client_port'    => 0,
+        'uid'            => 0,
+        'pack_len'       => self::HEAD_LEN,
+    );
+    
+    /**
+     * 包体
+     * @var string
+     */
+    public $body = '';
+    
+    /**
+     * 初始化
+     * @return void
+     */
+    public function __construct($buffer = null)
+    {
+        if($buffer)
+        {
+            $data = self::decode($buffer);
+            $this->body = $data['body'];
+            unset($data['body']);
+            $this->header = $data;
+        }
+        else
+        {
+            if(self::$seriesId>=65535)
+            {
+                self::$seriesId = 0;
+            }
+            else
+            {
+                $this->header['series_id'] = self::$seriesId++;
+            }
+        }
+    }
+    
+    /**
+     * 判断数据包是否都到了
+     * @param string $buffer
+     * @return int int=0数据是完整的 int>0数据不完整,还要继续接收int字节
+     */
+    public static function input($buffer)
+    {
+        $len = strlen($buffer);
+        if($len < self::HEAD_LEN)
+        {
+            return self::HEAD_LEN - $len;
+        }
+        
+        $data = unpack("nseries_id/Ccmd/Nlocal_ip/nlocal_port/Nsocket_id/Nclient_ip/nclient_port/Nuid/Npack_len", $buffer);
+        if($data['pack_len'] > $len)
+        {
+            return $data['pack_len'] - $len;
+        }
+        return 0;
+    }
+    
+    
+    /**
+     * 设置包体
+     * @param string $body_str
+     * @return void
+     */
+    public function setBody($body_str)
+    {
+        $this->body = (string) $body_str;
+    }
+    
+    /**
+     * 获取整个包的buffer
+     * @param string $data
+     * @return string
+     */
+    public function getBuffer()
+    {
+        $this->header['pack_len'] = self::HEAD_LEN + strlen($this->body);
+        return pack("nCNnNNnNN", $this->header['series_id'], 
+                        $this->header['cmd'], ip2long($this->header['local_ip']), 
+                        $this->header['local_port'], $this->header['socket_id'], 
+                        ip2long($this->header['client_ip']), $this->header['client_port'], 
+                        $this->header['uid'], $this->header['pack_len']).$this->body;
+    }
+    
+    /**
+     * 从二进制数据转换为数组
+     * @param string $buffer
+     * @return array
+     */    
+    protected static function decode($buffer)
+    {
+        $data = unpack("nseries_id/Ccmd/Nlocal_ip/nlocal_port/Nsocket_id/Nclient_ip/nclient_port/Nuid/Npack_len", $buffer);
+        $data['body'] = '';
+        $data['local_ip'] = long2ip($data['local_ip']);
+        $data['client_ip'] = long2ip($data['client_ip']);
+        $body_len = $data['pack_len'] - self::HEAD_LEN;
+        if($body_len > 0)
+        {
+            $data['body'] = substr($buffer, self::HEAD_LEN, $body_len);
+        }
+        return $data;
+    }
+    
+}
+
+
+

+ 73 - 0
applications/ChatDemo/Tests/Chat.php

@@ -0,0 +1,73 @@
+<?php
+ini_set('display_errors', 'on');
+error_reporting(E_ALL);
+
+$ip = isset($argv[1]) ? $argv[1] : '127.0.0.1';
+$port = isset($argv[2]) ? $argv[2] : 8480;
+$sock = stream_socket_client("tcp://$ip:$port");
+if(!$sock)exit("can not create sock\n");
+
+
+fwrite($sock, 'connect');
+$rsp_string = fgets($sock, 1024);
+$ret = json_decode($rsp_string, true);
+if(isset($ret['uid']))
+{
+    echo "chart room login success , your uid is [{$ret['uid']}]\n";
+    echo "use uid:words send message to one user\n";
+    echo "use words send message to all\n";
+}
+else
+{
+    exit("connet faild reponse:$rsp_string\n");
+}
+
+$MYUID = $ret['uid'];
+
+stream_set_blocking($sock, 0);
+stream_set_blocking(STDIN, 0);
+
+$read = array(STDIN, $sock);
+
+$write = $ex = array();
+while(1)
+{
+    $read_copy = $read;
+    if($ret = stream_select($read_copy, $write, $ex, 1000))
+    {
+       foreach($read as $fd)
+       {
+          // 接收消息
+          if((int)$fd === (int)$sock)
+          {
+              $ret = fgets($fd, 102400);
+              if(!$ret){continue;exit("connection closed\n ");}
+              $ret = json_decode(trim($ret),true);
+              if($ret['to_uid'] == $MYUID)
+              {
+                  echo $ret['from_uid'] , ' say to YOU:', $ret['message'], "\n";
+              }
+              else 
+              {
+                  echo $ret['from_uid'] , ' say to ALL:', $ret['message'], "\n";
+              }
+              continue;
+          }
+          
+          // 向某个uid发送消息 格式为 uid:xxxxxxxx
+          $ret = fgets(STDIN, 10240);
+          if(!$ret)continue;
+          if(preg_match("/(\d+):(.*)/", $ret, $match))
+          {
+             $uid = $match[1];
+             $words = $match[2];
+             fwrite($sock, json_encode(array('from_uid'=>$MYUID, 'to_uid'=>$uid, 'message'=>$words))."\n");
+             continue;
+          }
+          // 向所有用户发消息
+          fwrite($sock, json_encode(array('from_uid'=>$MYUID, 'to_uid'=>'all', 'message'=>$ret))."\n");
+          continue;
+
+       }
+    }
+}

+ 14 - 0
workerman/conf/conf.d/Gateway.conf

@@ -0,0 +1,14 @@
+worker_file = ../applications/ChatDemo/Bootstrap/Gateway.php
+listen = tcp://0.0.0.0:8480
+persistent_connection = 1
+start_workers = 5
+user = root
+preread_length = 65535
+lan_ip = 127.0.0.1
+lan_port_start = 20000
+game_worker[] = udp://127.0.0.1:8483
+game_worker[] = udp://127.0.0.1:8483
+;不reload
+no_reload = 1
+;不打印
+no_debug = 1

+ 6 - 0
workerman/conf/conf.d/Worker.conf

@@ -0,0 +1,6 @@
+worker_file = ../applications/ChatDemo/Bootstrap/Worker.php
+listen = udp://0.0.0.0:8483
+start_workers = 5
+user = root
+preread_length = 23
+max_requests=10000