瀏覽代碼

remove Gateway/Worker

walkor 10 年之前
父節點
當前提交
82e0b487a3

+ 0 - 258
GatewayWorker/BusinessWorker.php

@@ -1,258 +0,0 @@
-<?php
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace GatewayWorker;
-
-use Workerman\Connection\TcpConnection;
-
-use \Workerman\Worker;
-use \Workerman\Connection\AsyncTcpConnection;
-use \Workerman\Protocols\GatewayProtocol;
-use \Workerman\Lib\Timer;
-use \GatewayWorker\Lib\Lock;
-use \GatewayWorker\Lib\Store;
-use \GatewayWorker\Lib\Context;
-use \Event;
-
-/**
- * 
- * BusinessWorker 用于处理Gateway转发来的数据
- * 
- * @author walkor<walkor@workerman.net>
- *
- */
-class BusinessWorker extends Worker
-{
-    /**
-     * 如果连接gateway通讯端口失败,尝试重试多少次
-     * @var int
-     */
-    const MAX_RETRY_COUNT = 5;
-    
-    /**
-     * 保存与gateway的连接connection对象
-     * @var array
-     */
-    public $gatewayConnections = array();
-    
-    /**
-     * 正在连接的gateway内部通讯地址
-     * @var array
-     */
-    protected $_connectingGatewayAddress = array();
-    
-    /**
-     * 连接失败gateway内部通讯地址
-     * @var array
-     */
-    protected $_badGatewayAddress = array();
-    
-    /**
-     * 保存用户设置的worker启动回调
-     * @var callback
-     */
-    protected $_onWorkerStart = null;
-    
-    /**
-     * 构造函数
-     * @param string $socket_name
-     * @param array $context_option
-     */
-    public function __construct($socket_name = '', $context_option = array())
-    {
-        parent::__construct($socket_name, $context_option);
-        $backrace = debug_backtrace();
-        $this->_appInitPath = dirname($backrace[0]['file']);
-    }
-    
-    /**
-     * 运行
-     * @see Workerman.Worker::run()
-     */
-    public function run()
-    {
-        $this->_onWorkerStart = $this->onWorkerStart;
-        $this->onWorkerStart = array($this, 'onWorkerStart');
-        parent::run();
-    }
-    
-    /**
-     * 当进程启动时一些初始化工作
-     * @return void
-     */
-    protected function onWorkerStart()
-    {
-        Timer::add(1, array($this, 'checkGatewayConnections'));
-        $this->checkGatewayConnections();
-        \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
-        if($this->_onWorkerStart)
-        {
-            call_user_func($this->_onWorkerStart, $this);
-        }
-    }
-    
-    /**
-     * 当gateway转发来数据时
-     * @param TcpConnection $connection
-     * @param mixed $data
-     */
-    public function onGatewayMessage($connection, $data)
-    {
-        // 上下文数据
-        Context::$client_ip = $data['client_ip'];
-        Context::$client_port = $data['client_port'];
-        Context::$local_ip = $data['local_ip'];
-        Context::$local_port = $data['local_port'];
-        Context::$client_id = $data['client_id'];
-        // $_SERVER变量
-        $_SERVER = array(
-                'REMOTE_ADDR' => Context::$client_ip,
-                'REMOTE_PORT' => Context::$client_port,
-                'GATEWAY_ADDR' => Context::$local_ip,
-                'GATEWAY_PORT'  => Context::$local_port,
-                'GATEWAY_CLIENT_ID' => Context::$client_id,
-        );
-        // 尝试解析session
-        if($data['ext_data'] != '')
-        {
-            $_SESSION = Context::sessionDecode($data['ext_data']);
-        }
-        else
-        {
-            $_SESSION = null;
-        }
-        // 备份一次$data['ext_data'],请求处理完毕后判断session是否和备份相等,不相等就更新session
-        $session_str_copy = $data['ext_data'];
-        $cmd = $data['cmd'];
-    
-        // 尝试执行Event::onConnection、Event::onMessage、Event::onClose
-        try{
-            switch($cmd)
-            {
-                case GatewayProtocol::CMD_ON_CONNECTION:
-                    Event::onConnect(Context::$client_id);
-                    break;
-                case GatewayProtocol::CMD_ON_MESSAGE:
-                    Event::onMessage(Context::$client_id, $data['body']);
-                    break;
-                case GatewayProtocol::CMD_ON_CLOSE:
-                    Event::onClose(Context::$client_id);
-                    break;
-            }
-        }
-        catch(\Exception $e)
-        {
-            $msg = 'client_id:'.Context::$client_id."\tclient_ip:".Context::$client_ip."\n".$e->__toString();
-            $this->log($msg);
-        }
-    
-        // 判断session是否被更改
-        $session_str_now = $_SESSION !== null ? Context::sessionEncode($_SESSION) : '';
-        if($session_str_copy != $session_str_now)
-        {
-            \GatewayWorker\Lib\Gateway::updateSocketSession(Context::$client_id, $session_str_now);
-        }
-    
-        Context::clear();
-    }
-    
-    /**
-     * 当与Gateway的连接断开时触发
-     * @param TcpConnection $connection
-     * @return  void
-     */
-    public function onClose($connection)
-    {
-        unset($this->gatewayConnections[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
-    }
-
-    /**
-     * 检查gateway的通信端口是否都已经连
-     * 如果有未连接的端口,则尝试连接
-     * @return void
-     */
-    public function checkGatewayConnections()
-    {
-        $key = 'GLOBAL_GATEWAY_ADDRESS';
-        $addresses_list = Store::instance('gateway')->get($key);
-        if(empty($addresses_list))
-        {
-            return;
-        }
-        foreach($addresses_list as $addr)
-        {
-            if(!isset($this->gatewayConnections[$addr]) && !isset($this->_connectingGatewayAddress[$addr]) && !isset($this->_badGatewayAddress[$addr]))
-            {
-                $gateway_connection = new AsyncTcpConnection("GatewayProtocol://$addr");
-                $gateway_connection->remoteAddress = $addr;
-                $gateway_connection->onConnect = array($this, 'onConnectGateway');
-                $gateway_connection->onMessage = array($this, 'onGatewayMessage');
-                $gateway_connection->onClose = array($this, 'onClose');
-                $gateway_connection->onError = array($this, 'onError');
-                if(TcpConnection::$defaultMaxSendBufferSize == $gateway_connection->maxSendBufferSize)
-                {
-                    $gateway_connection->maxSendBufferSize = 10*1024*1024;
-                }
-                $gateway_connection->connect();
-                $this->_connectingGatewayAddress[$addr] = 1;
-            }
-        }
-    }
-    
-    /**
-     * 当连接上gateway的通讯端口时触发
-     * 将连接connection对象保存起来
-     * @param TcpConnection $connection
-     * @return void
-     */
-    public function onConnectGateway($connection)
-    {
-        $this->gatewayConnections[$connection->remoteAddress] = $connection;
-        unset($this->_badGatewayAddress[$connection->remoteAddress], $this->_connectingGatewayAddress[$connection->remoteAddress]);
-    }
-    
-    /**
-     * 当与gateway的连接出现错误时触发
-     * @param TcpConnection $connection
-     * @param int $error_no
-     * @param string $error_msg
-     */
-    public function onError($connection, $error_no, $error_msg)
-    {
-         $this->tryToDeleteGatewayAddress($connection->remoteAddress, $error_msg);
-    }
-    
-    /**
-     * 从存储中删除删除连不上的gateway通讯端口
-     * @param string $addr
-     * @param string $errstr
-     */
-    public function tryToDeleteGatewayAddress($addr, $errstr)
-    {
-        $key = 'GLOBAL_GATEWAY_ADDRESS';
-        if(!isset($this->_badGatewayAddress[$addr]))
-        {
-            $this->_badGatewayAddress[$addr] = 0;
-        }
-        // 删除连不上的端口
-        if($this->_badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
-        {
-            Lock::get();
-            $addresses_list = Store::instance('gateway')->get($key);
-            unset($addresses_list[$addr]);
-            Store::instance('gateway')->set($key, $addresses_list);
-            Lock::release();
-            $this->log("tcp://$addr ".$errstr." del $addr from store", false);
-        }
-    }
-}

+ 0 - 632
GatewayWorker/Gateway.php

@@ -1,632 +0,0 @@
-<?php 
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace GatewayWorker;
-
-use Workerman\Connection\TcpConnection;
-
-use \Workerman\Worker;
-use \Workerman\Lib\Timer;
-use \Workerman\Protocols\GatewayProtocol;
-use \GatewayWorker\Lib\Lock;
-use \GatewayWorker\Lib\Store;
-use \Workerman\Autoloader;
-
-/**
- * 
- * Gateway,基于Worker开发
- * 用于转发客户端的数据给Worker处理,以及转发Worker的数据给客户端
- * 
- * @author walkor<walkor@workerman.net>
- *
- */
-class Gateway extends Worker
-{
-    /**
-     * 本机ip
-     * @var 单机部署默认127.0.0.1,如果是分布式部署,需要设置成本机ip
-     */
-    public $lanIp = '127.0.0.1';
-    
-    /**
-     * gateway内部通讯起始端口,每个gateway实例应该都不同,步长1000
-     * @var int
-     */
-    public $startPort = 2000;
-    
-    /**
-     * 是否可以平滑重启,gateway不能平滑重启,否则会导致连接断开
-     * @var bool
-     */
-    public $reloadable = false;
-    
-    /**
-     * 心跳时间间隔
-     * @var int
-     */
-    public $pingInterval = 0;
-
-    /**
-     * $pingNotResponseLimit*$pingInterval时间内,客户端未发送任何数据,断开客户端连接
-     * @var int
-     */
-    public $pingNotResponseLimit = 0;
-    
-    /**
-     * 服务端向客户端发送的心跳数据
-     * @var string
-     */
-    public $pingData = '';
-    
-    /**
-     * 路由函数
-     * @var callback
-     */
-    public $router = null;
-    
-    /**
-     * 保存客户端的所有connection对象
-     * @var array
-     */
-    protected $_clientConnections = array();
-    
-    /**
-     * 保存所有worker的内部连接的connection对象
-     * @var array
-     */
-    protected $_workerConnections = array();
-    
-    /**
-     * gateway内部监听worker内部连接的worker
-     * @var Worker
-     */
-    protected $_innerTcpWorker = null;
-    
-    /**
-     * gateway内部监听udp数据的worker
-     * @var Worker
-     */
-    protected $_innerUdpWorker = null;
-    
-    /**
-     * 当worker启动时
-     * @var callback
-     */
-    protected $_onWorkerStart = null;
-    
-    /**
-     * 当有客户端连接时
-     * @var callback
-     */
-    protected $_onConnect = null;
-    
-    /**
-     * 当客户端发来消息时
-     * @var callback
-     */
-    protected $_onMessage = null;
-    
-    /**
-     * 当客户端连接关闭时
-     * @var callback
-     */
-    protected $_onClose = null;
-    
-    /**
-     * 当worker停止时
-     * @var callback
-     */
-    protected $_onWorkerStop = null;
-    
-    /**
-     * 进程启动时间
-     * @var int
-     */
-    protected $_startTime = 0;
-    
-    /**
-     * 构造函数
-     * @param string $socket_name
-     * @param array $context_option
-     */
-    public function __construct($socket_name, $context_option = array())
-    {
-        parent::__construct($socket_name, $context_option);
-        
-        $this->router = array("\\GatewayWorker\\Gateway", 'routerRand');
-        
-        $backrace = debug_backtrace();
-        $this->_appInitPath = dirname($backrace[0]['file']);
-    }
-    
-    /**
-     * 运行
-     * @see Workerman.Worker::run()
-     */
-    public function run()
-    {
-        // 保存用户的回调,当对应的事件发生时触发
-        $this->_onWorkerStart = $this->onWorkerStart;
-        $this->onWorkerStart = array($this, 'onWorkerStart');
-        // 保存用户的回调,当对应的事件发生时触发
-        $this->_onConnect = $this->onConnect;
-        $this->onConnect = array($this, 'onClientConnect');
-        
-        // onMessage禁止用户设置回调
-        $this->onMessage = array($this, 'onClientMessage');
-        
-        // 保存用户的回调,当对应的事件发生时触发
-        $this->_onClose = $this->onClose;
-        $this->onClose = array($this, 'onClientClose');
-        // 保存用户的回调,当对应的事件发生时触发
-        $this->_onWorkerStop = $this->onWorkerStop;
-        $this->onWorkerStop = array($this, 'onWorkerStop');
-        
-        // 记录进程启动的时间
-        $this->_startTime = time();
-        // 运行父方法
-        parent::run();
-    }
-    
-    /**
-     * 当客户端发来数据时,转发给worker处理
-     * @param TcpConnection $connection
-     * @param mixed $data
-     */
-    public function onClientMessage($connection, $data)
-    {
-        $connection->pingNotResponseCount = -1;
-        $this->sendToWorker(GatewayProtocol::CMD_ON_MESSAGE, $connection, $data);
-    }
-    
-    /**
-     * 当客户端连接上来时,初始化一些客户端的数据
-     * 包括全局唯一的client_id、初始化session等
-     * @param unknown_type $connection
-     */
-    public function onClientConnect($connection)
-    {
-        // 分配一个全局唯一的client_id
-        $connection->globalClientId = $this->createGlobalClientId();
-        // 保存该连接的内部通讯的数据包报头,避免每次重新初始化
-        $connection->gatewayHeader = array(
-            'local_ip' => $this->lanIp,
-            'local_port' => $this->lanPort,
-            'client_ip'=>$connection->getRemoteIp(),
-            'client_port'=>$connection->getRemotePort(),
-            'client_id'=>$connection->globalClientId,
-        );
-        // 连接的session
-        $connection->session = '';
-        // 该连接的心跳参数
-        $connection->pingNotResponseCount = -1;
-        // 保存客户端连接connection对象
-        $this->_clientConnections[$connection->globalClientId] = $connection;
-        // 保存该连接的内部gateway通讯地址
-        $address = $this->lanIp.':'.$this->lanPort;
-        $this->storeClientAddress($connection->globalClientId, $address);
-        
-        // 如果用户有自定义onConnect回调,则执行
-        if($this->_onConnect)
-        {
-            call_user_func($this->_onConnect, $connection);
-        }
-        
-        // 如果设置了Event::onConnect,则通知worker进程,让worker执行onConnect
-        if(method_exists('Event','onConnect'))
-        {
-            $this->sendToWorker(GatewayProtocol::CMD_ON_CONNECTION, $connection);
-        }
-    }
-    
-    /**
-     * 发送数据给worker进程
-     * @param int $cmd
-     * @param TcpConnection $connection
-     * @param mixed $body
-     */
-    protected function sendToWorker($cmd, $connection, $body = '')
-    {
-        $gateway_data = $connection->gatewayHeader;
-        $gateway_data['cmd'] = $cmd;
-        $gateway_data['body'] = $body;
-        $gateway_data['ext_data'] = $connection->session;
-        if($this->_workerConnections)
-        {
-            // 调用路由函数,选择一个worker把请求转发给它
-            $worker_connection = call_user_func($this->router, $this->_workerConnections, $connection, $cmd, $body);
-            if(false === $worker_connection->send($gateway_data))
-            {
-                $msg = "SendBufferToWorker fail. May be the send buffer are overflow";
-                $this->log($msg);
-                return false;
-            }
-        }
-        // 没有可用的worker
-        else
-        {
-            // gateway启动后1-2秒内SendBufferToWorker fail是正常现象,因为与worker的连接还没建立起来,所以不记录日志,只是关闭连接
-            $time_diff = 2;
-            if(time() - $this->_startTime >= $time_diff)
-            {
-                $msg = "SendBufferToWorker fail. The connections between Gateway and BusinessWorker are not ready";
-                $this->log($msg);
-            }
-            $connection->destroy();
-            return false;
-        }
-        return true;
-    }
-    
-    /**
-     * 随机路由,返回worker connection对象
-     * @param array $worker_connections
-     * @param TcpConnection $client_connection
-     * @param int $cmd
-     * @param mixed $buffer
-     * @return TcpConnection
-     */
-    public static function routerRand($worker_connections, $client_connection, $cmd, $buffer)
-    {
-        return $worker_connections[array_rand($worker_connections)];
-    }
-    
-    /**
-     * 保存客户端连接的gateway通讯地址
-     * @param int $global_client_id
-     * @param string $address
-     * @return bool
-     */
-    protected function storeClientAddress($global_client_id, $address)
-    {
-        if(!Store::instance('gateway')->set('client_id-'.$global_client_id, $address))
-        {
-            $msg = 'storeClientAddress fail.';
-            if(get_class(Store::instance('gateway')) == 'Memcached')
-            {
-                $msg .= " reason :".Store::instance('gateway')->getResultMessage();
-            }
-            $this->log($msg);
-            return false;
-        }
-        return true;
-    }
-    
-    /**
-     * 删除客户端gateway通讯地址
-     * @param int $global_client_id
-     * @return void
-     */
-    protected function delClientAddress($global_client_id)
-    {
-        Store::instance('gateway')->delete('client_id-'.$global_client_id);
-    }
-    
-    /**
-     * 当客户端关闭时
-     * @param unknown_type $connection
-     */
-    public function onClientClose($connection)
-    {
-        // 尝试通知worker,触发Event::onClose
-        if(method_exists('Event','onClose'))
-        {
-            $this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);
-        }
-        // 清理连接的数据
-        $this->delClientAddress($connection->globalClientId);
-        unset($this->_clientConnections[$connection->globalClientId]);
-        if($this->_onClose)
-        {
-            call_user_func($this->_onClose, $connection);
-        }
-    }
-    
-    /**
-     * 创建一个workerman集群全局唯一的client_id
-     * @return int|false
-     */
-    protected function createGlobalClientId()
-    {
-        $global_socket_key = 'GLOBAL_CLIENT_ID_KEY';
-        $store = Store::instance('gateway');
-        $global_client_id = $store->increment($global_socket_key);
-        if(!$global_client_id || $global_client_id > 2147483646)
-        {
-            $store->set($global_socket_key, 0);
-            $global_client_id = $store->increment($global_socket_key);
-        }
-    
-        if(!$global_client_id)
-        {
-            $msg .= "createGlobalClientId fail :";
-            if(get_class($store) == 'Memcached')
-            {
-                $msg .= $store->getResultMessage();
-            }
-            $this->log($msg);
-        }
-        
-        return $global_client_id;
-    }
-    
-    /**
-     * 当Gateway启动的时候触发的回调函数
-     * @return void
-     */
-    public function onWorkerStart()
-    {
-        // 分配一个内部通讯端口
-        $this->lanPort = function_exists('posix_getppid') ? $this->startPort - posix_getppid() + posix_getpid() : $this->startPort;
-        if($this->lanPort<0 || $this->lanPort >=65535)
-        {
-            $this->lanPort = rand($this->startPort, 65535);
-        }
-        
-        // 如果有设置心跳,则定时执行
-        if($this->pingInterval > 0)
-        {
-            Timer::add($this->pingInterval, array($this, 'ping'));
-        }
-    
-        // 初始化gateway内部的监听,用于监听worker的连接已经连接上发来的数据
-        $this->_innerTcpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
-        $this->_innerTcpWorker->listen();
-        $this->_innerUdpWorker = new Worker("GatewayProtocol://{$this->lanIp}:{$this->lanPort}");
-        $this->_innerUdpWorker->transport = 'udp';
-        $this->_innerUdpWorker->listen();
-    
-        // 重新设置自动加载根目录
-        Autoloader::setRootPath($this->_appInitPath);
-        
-        // 设置内部监听的相关回调
-        $this->_innerTcpWorker->onMessage = array($this, 'onWorkerMessage');
-        $this->_innerUdpWorker->onMessage = array($this, 'onWorkerMessage');
-        
-        $this->_innerTcpWorker->onConnect = array($this, 'onWorkerConnect');
-        $this->_innerTcpWorker->onClose = array($this, 'onWorkerClose');
-        
-        // 注册gateway的内部通讯地址,worker去连这个地址,以便gateway与worker之间建立起TCP长连接
-        if(!$this->registerAddress())
-        {
-            $this->log('registerAddress fail and exit');
-            Worker::stopAll();
-        }
-        
-        if($this->_onWorkerStart)
-        {
-            call_user_func($this->_onWorkerStart, $this);
-        }
-    }
-    
-    
-    /**
-     * 当worker通过内部通讯端口连接到gateway时
-     * @param TcpConnection $connection
-     */
-    public function onWorkerConnect($connection)
-    {
-        $connection->remoteAddress = $connection->getRemoteIp().':'.$connection->getRemotePort();
-        if(TcpConnection::$defaultMaxSendBufferSize == $connection->maxSendBufferSize)
-        {
-            $connection->maxSendBufferSize = 10*1024*1024;
-        }
-        $this->_workerConnections[$connection->remoteAddress] = $connection;
-    }
-    
-    /**
-     * 当worker发来数据时
-     * @param TcpConnection $connection
-     * @param mixed $data
-     * @throws \Exception
-     */
-    public function onWorkerMessage($connection, $data)
-    {
-        $cmd = $data['cmd'];
-        switch($cmd)
-        {
-            // 向某客户端发送数据,Gateway::sendToClient($client_id, $message);
-            case GatewayProtocol::CMD_SEND_TO_ONE:
-                if(isset($this->_clientConnections[$data['client_id']]))
-                {
-                    $this->_clientConnections[$data['client_id']]->send($data['body']);
-                }
-                break;
-                // 关闭客户端连接,Gateway::closeClient($client_id);
-            case GatewayProtocol::CMD_KICK:
-                if(isset($this->_clientConnections[$data['client_id']]))
-                {
-                    $this->_clientConnections[$data['client_id']]->destroy();
-                }
-                break;
-                // 广播, Gateway::sendToAll($message, $client_id_array)
-            case GatewayProtocol::CMD_SEND_TO_ALL:
-                // $client_id_array不为空时,只广播给$client_id_array指定的客户端
-                if($data['ext_data'])
-                {
-                    $client_id_array = unpack('N*', $data['ext_data']);
-                    foreach($client_id_array as $client_id)
-                    {
-                        if(isset($this->_clientConnections[$client_id]))
-                        {
-                            $this->_clientConnections[$client_id]->send($data['body']);
-                        }
-                    }
-                }
-                // $client_id_array为空时,广播给所有在线客户端
-                else
-                {
-                    foreach($this->_clientConnections as $client_connection)
-                    {
-                        $client_connection->send($data['body']);
-                    }
-                }
-                break;
-                // 更新客户端session
-            case GatewayProtocol::CMD_UPDATE_SESSION:
-                if(isset($this->_clientConnections[$data['client_id']]))
-                {
-                    $this->_clientConnections[$data['client_id']]->session = $data['ext_data'];
-                }
-                break;
-                // 获得客户端在线状态 Gateway::getOnlineStatus()
-            case GatewayProtocol::CMD_GET_ONLINE_STATUS:
-                $online_status = json_encode(array_keys($this->_clientConnections));
-                $connection->send($online_status);
-                break;
-                // 判断某个client_id是否在线 Gateway::isOnline($client_id)
-            case GatewayProtocol::CMD_IS_ONLINE:
-                $connection->send((int)isset($this->_clientConnections[$data['client_id']]));
-                break;
-            default :
-                $err_msg = "gateway inner pack err cmd=$cmd";
-                throw new \Exception($err_msg);
-        }
-    }
-    
-    /**
-     * 当worker连接关闭时
-     * @param TcpConnection $connection
-     */
-    public function onWorkerClose($connection)
-    {
-        //$this->log("{$connection->remoteAddress} CLOSE INNER_CONNECTION\n");
-        unset($this->_workerConnections[$connection->remoteAddress]);
-    }
-    
-    /**
-     * 存储当前Gateway的内部通信地址
-     * @param string $address
-     * @return bool
-     */
-    protected function registerAddress()
-    {
-        $address = $this->lanIp.':'.$this->lanPort;
-        // key
-        $key = 'GLOBAL_GATEWAY_ADDRESS';
-        try
-        {
-            $store = Store::instance('gateway');
-        }
-        catch(\Exception $msg)
-        {
-            $this->log($msg);
-            return false;
-        }
-        // 为保证原子性,需要加锁
-        Lock::get();
-        $addresses_list = $store->get($key);
-        if(empty($addresses_list))
-        {
-            $addresses_list = array();
-        }
-        $addresses_list[$address] = $address;
-        if(!$store->set($key, $addresses_list))
-        {
-            Lock::release();
-            if(get_class($store) == 'Memcached')
-            {
-                $msg = " registerAddress fail : Memcache Error " . $store->getResultMessage();
-            }
-            $this->log($msg);
-            return false;
-        }
-        Lock::release();
-        return true;
-    }
-    
-    /**
-     * 删除当前Gateway的内部通信地址
-     * @param string $address
-     * @return bool
-     */
-    protected function unregisterAddress()
-    {
-        $address = $this->lanIp.':'.$this->lanPort;
-        $key = 'GLOBAL_GATEWAY_ADDRESS';
-        try
-        {
-            $store = Store::instance('gateway');
-        }
-        catch (\Exception $msg)
-        {
-            $this->log($msg);
-            return false;
-        }
-        // 为保证原子性,需要加锁
-        Lock::get();
-        $addresses_list = $store->get($key);
-        if(empty($addresses_list))
-        {
-            $addresses_list = array();
-        }
-        unset($addresses_list[$address]);
-        if(!$store->set($key, $addresses_list))
-        {
-            Lock::release();
-            $msg = "unregisterAddress fail";
-            if(get_class($store) == 'Memcached')
-            {
-                $msg .= " reason:".$store->getResultMessage();
-            }
-            $this->log($msg);
-            return;
-        }
-        Lock::release();
-        return true;
-    }
-    
-    /**
-     * 心跳逻辑
-     * @return void
-     */
-    public function ping()
-    {
-        // 遍历所有客户端连接
-        foreach($this->_clientConnections as $connection)
-        {
-            // 上次发送的心跳还没有回复次数大于限定值就断开
-            if($this->pingNotResponseLimit > 0 && $connection->pingNotResponseCount >= $this->pingNotResponseLimit)
-            {
-                $connection->destroy();
-                continue;
-            }
-            // $connection->pingNotResponseCount为-1说明最近客户端有发来消息,则不给客户端发送心跳
-            if($connection->pingNotResponseCount++ >= 0)
-            {
-                if($this->pingData)
-                {
-                    $connection->send($this->pingData);
-                }
-            }
-        }
-    }
-    
-    /**
-     * 当gateway关闭时触发,清理数据
-     * @return void
-     */
-    public function onWorkerStop()
-    {
-        $this->unregisterAddress();
-        foreach($this->_clientConnections as $connection)
-        {
-            $this->delClientAddress($connection->globalClientId);
-        }
-        // 尝试触发用户设置的回调
-        if($this->_onWorkerStop)
-        {
-            call_user_func($this->_onWorkerStop, $this);
-        }
-    }
-}

+ 0 - 79
GatewayWorker/Lib/Context.php

@@ -1,79 +0,0 @@
-<?php
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace GatewayWorker\Lib;
-
-/**
- * 上下文 包含当前用户uid, 内部通信local_ip local_port socket_id ,以及客户端client_ip client_port
- */
-class Context
-{
-    /**
-     * 内部通讯id
-     * @var string
-     */
-    public static $local_ip;
-    /**
-     * 内部通讯端口
-     * @var int
-     */
-    public static $local_port;
-    /**
-     * 客户端ip
-     * @var string
-     */
-    public static $client_ip;
-    /**
-     * 客户端端口
-     * @var int
-     */
-    public static $client_port;
-    /**
-     * 用户id
-     * @var int
-     */
-    public static $client_id;
-    
-    /**
-     * 编码session
-     * @param mixed $session_data
-     * @return string
-     */
-    public static function sessionEncode($session_data = '')
-    {
-        if($session_data !== '')
-        {
-            return serialize($session_data);
-        }
-        return '';
-    }
-    
-    /**
-     * 解码session
-     * @param string $session_buffer
-     * @return mixed
-     */
-    public static function sessionDecode($session_buffer)
-    {
-        return unserialize($session_buffer);
-    }
-    
-    /**
-     * 清除上下文
-     * @return void
-     */
-    public static function clear()
-    {
-        self::$local_ip = self::$local_port  = self::$client_ip = self::$client_port = self::$client_id  = null;
-    }
-}

+ 0 - 72
GatewayWorker/Lib/Db.php

@@ -1,72 +0,0 @@
-<?php
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace GatewayWorker\Lib;
-
-/**
- * 数据库类
- */
-class Db
-{
-    /**
-     * 实例数组
-     * @var array
-     */
-    protected static $instance = array();
-    
-    /**
-     * 获取实例
-     * @param string $config_name
-     * @throws \Exception
-     */
-    public static function instance($config_name)
-    {
-        if(!isset(\Config\Db::$$config_name))
-        {
-            echo "\\Config\\Db::$config_name not set\n";
-            throw new \Exception("\\Config\\Db::$config_name not set\n");
-        }
-        
-        if(empty(self::$instance[$config_name]))
-        {
-            $config = \Config\Db::$$config_name;
-            self::$instance[$config_name] = new \GatewayWorker\Lib\DbConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['dbname']);
-        }
-        return self::$instance[$config_name];
-    }
-    
-    /**
-     * 关闭数据库实例
-     * @param string $config_name
-     */
-    public static function close($config_name)
-    {
-        if(isset(self::$instance[$config_name]))
-        {
-            self::$instance[$config_name]->closeConnection();
-            self::$instance[$config_name] = null;
-        }
-    }
-    
-    /**
-     * 关闭所有数据库实例
-     */
-    public static function closeAll()
-    {
-        foreach(self::$instance as $connection)
-        {
-            $connection->closeConnection();
-        }
-        self::$instance = array();
-    }
-}

+ 0 - 1850
GatewayWorker/Lib/DbConnection.php

@@ -1,1850 +0,0 @@
-<?php
-namespace GatewayWorker\Lib;
-
-/**
- * 数据库连接类,依赖mysql_pdo扩展
- * 在https://github.com/auraphp/Aura.SqlQuery的基础上修改而成
- */
-class DbConnection 
-{
-    /**
-     * SELECT
-     * @var array
-     */
-    protected $union = array();
-    
-    /**
-     * 是否是更新
-     * @var bool
-     */
-    protected $for_update = false;
-    
-    /**
-     * 选择的列
-     * @var array
-     */
-    protected $cols = array();
-    
-    /**
-     * 从哪些表里面SELECT
-     * @var array
-     */
-    protected $from = array();
-    
-    /**
-     * $from 当前的 key
-     * @var int
-     */
-    protected $from_key = -1;
-    
-    /**
-     * GROUP BY 的列 
-     * @var array
-     */
-    protected $group_by = array();
-    
-    /**
-     * HAVING 条件数组.
-     * @var array
-     */
-    protected $having = array();
-    
-    /**
-     * HAVING 语句中绑定的值.
-     * @var array
-     */
-    protected $bind_having = array();
-    
-    /**
-     * 每页多少条记录
-     * @var int
-     */
-    protected $paging = 10;
-    
-    /**
-     * sql中绑定的值
-     * @var array
-     */
-    protected $bind_values = array();
-    
-    /**
-     * WHERE 条件.
-     * @var array
-     */
-    protected $where = array();
-    
-    /**
-     * WHERE语句绑定的值
-     * @var array
-     */
-    protected $bind_where = array();
-    
-    /**
-     * ORDER BY 的列
-     * @var array
-     */
-    protected $order_by = array();
-    
-    /**
-     * ORDER BY 的排序方式,默认为升序
-     * @var bool 
-     */
-    protected $order_asc = true;
-    /**
-     * SELECT多少记录
-     * @var int
-     */
-    protected $limit = 0;
-    
-    /**
-     * 返回记录的游标
-     * @var int
-     */
-    protected $offset = 0;
-    
-    /**
-     * flags 列表
-     * @var array
-     */
-    protected $flags = array();
-    
-    /**
-     * 操作哪个表
-     * @var string
-     */
-    protected $table;
-   
-    /**
-     * 表.列 和 last-insert-id 映射
-     * @var array
-     */
-    protected $last_insert_id_names = array();
-    
-    /**
-     * INSERT 或者 UPDATE 的列 
-     * @param array
-     */
-    protected $col_values;
-    
-    /**
-     * 返回的列
-     * @var array
-     */
-    protected $returning = array();
-    
-    /**
-     * sql的类型 SELECT INSERT DELETE UPDATE
-     * @var string
-     */
-    protected $type = '';
-    
-    /**
-     * pdo 实例
-     * @var pdo
-     */
-    protected $pdo;
-    
-    /**
-     * PDO statement 实例
-     * @var PDO statement
-     */
-    protected $sQuery;
-    
-    /**
-     * 数据库用户名密码等配置
-     * @var array
-     */
-    protected $settings = array();
-    
-    /**
-     * sql的参数
-     * @var array
-     */
-    protected $parameters = array();
-    
-    /**
-     * 最后一条直行的sql
-     * @var string
-     */
-    protected $lastSql = '';
-
-    /**
-     * 选择哪些列
-     * @param string/array $cols
-     */
-    public function select($cols = '*')
-    {
-        $this->type = 'SELECT';
-        if(!is_array($cols))
-        {
-            $cols = array($cols);
-        }
-        $this->cols($cols);
-        return $this;
-    }
-    
-    /**
-     * 从哪个表删除
-     * @param string $table
-     * @return self
-     */
-    public function delete($table)
-    {
-        $this->type = 'DELETE';
-        $this->table = $this->quoteName($table);
-        $this->fromRaw($this->quoteName($table));
-        return $this;
-    }
-
-    /**
-     * 更新哪个表
-     * @param string $table
-     */
-    public function update($table)
-    {
-        $this->type = 'UPDATE';
-        $this->table = $this->quoteName($table);
-        return $this;
-    }
-
-    /**
-     * 向哪个表插入
-     * @param string $table
-     */
-    public function insert($table)
-    {
-        $this->type = 'INSERT';
-        $this->table = $this->quoteName($table);
-        return $this;
-    }
-    
-    /**
-     *
-     * 设置 SQL_CALC_FOUND_ROWS 标记.
-     * @param bool 
-     * @return self
-     */
-    public function calcFoundRows($enable = true)
-    {
-        $this->setFlag('SQL_CALC_FOUND_ROWS', $enable);
-        return $this;
-    }
-
-    /**
-     * 设置 SQL_CACHE 标记
-     * @param bool 
-     * @return self
-     */
-    public function cache($enable = true)
-    {
-        $this->setFlag('SQL_CACHE', $enable);
-        return $this;
-    }
-
-    /**
-     * 设置 SQL_NO_CACHE 标记
-     * @param bool
-     * @return self
-     */
-    public function noCache($enable = true)
-    {
-        $this->setFlag('SQL_NO_CACHE', $enable);
-        return $this;
-    }
-
-    /**
-     * 设置 STRAIGHT_JOIN 标记.
-     * @param bool
-     * @return self
-     */
-    public function straightJoin($enable = true)
-    {
-        $this->setFlag('STRAIGHT_JOIN', $enable);
-        return $this;
-    }
-
-    /**
-     * 设置 HIGH_PRIORITY 标记
-     * @param bool 
-     * @return self
-     */
-    public function highPriority($enable = true)
-    {
-        $this->setFlag('HIGH_PRIORITY', $enable);
-        return $this;
-    }
-
-    /**
-     * 设置 SQL_SMALL_RESULT 标记
-     * @param bool
-     * @return self
-     */
-    public function smallResult($enable = true)
-    {
-        $this->setFlag('SQL_SMALL_RESULT', $enable);
-        return $this;
-    }
-
-    /**
-     * 设置 SQL_BIG_RESULT 标记
-     * @param bool 
-     * @return self
-     */
-    public function bigResult($enable = true)
-    {
-        $this->setFlag('SQL_BIG_RESULT', $enable);
-        return $this;
-    }
-
-    /**
-     * 设置 SQL_BUFFER_RESULT 标记
-     * @param bool 
-     * @return self
-     */
-    public function bufferResult($enable = true)
-    {
-        $this->setFlag('SQL_BUFFER_RESULT', $enable);
-        return $this;
-    }
-    
-    /**
-     * 设置 FOR UPDATE 标记
-     * @param bool
-     * @return self
-     */
-    public function forUpdate($enable = true)
-    {
-        $this->for_update = (bool) $enable;
-        return $this;
-    }
-    
-    /**
-     * 设置 DISTINCT 标记
-     * @param bool
-     * @return self
-     */
-    public function distinct($enable = true)
-    {
-        $this->setFlag('DISTINCT', $enable);
-        return $this;
-    }
-    
-    /**
-     * 设置 LOW_PRIORITY 标记
-     * @param bool $enable
-     * @return self
-     */
-    public function lowPriority($enable = true)
-    {
-        $this->setFlag('LOW_PRIORITY', $enable);
-        return $this;
-    }
-    
-    /**
-     * 设置 IGNORE 标记
-     * @param bool $enable
-     * @return self
-     */
-    public function ignore($enable = true)
-    {
-        $this->setFlag('IGNORE', $enable);
-        return $this;
-    }
-    
-    /**
-     * 设置 QUICK 标记
-     * @param bool $enable
-     * @return self
-     */
-    public function quick($enable = true)
-    {
-        $this->setFlag('QUICK', $enable);
-        return $this;
-    }
-    
-    /**
-     * 设置 DELAYED 标记
-     * @param bool $enable
-     * @return self
-     */
-    public function delayed($enable = true)
-    {
-        $this->setFlag('DELAYED', $enable);
-        return $this;
-    }
-    
-    /**
-     * 序列化
-     * @return string 
-     */
-    public function __toString()
-    {
-        $union = '';
-        if ($this->union) {
-            $union = implode(' ', $this->union) . ' ';
-        }
-        return $union . $this->build();
-    }
-    
-    /**
-     * 设置每页多少条记录
-     * @param int 
-     * @return self
-     */
-    public function setPaging($paging)
-    {
-        $this->paging = (int) $paging;
-        return $this;
-    }
-    
-    /**
-     * 获取每页多少条记录
-     * @return int 
-     */
-    public function getPaging()
-    {
-        return $this->paging;
-    }
-    
-    /**
-     * 获取绑定在占位符上的值
-     */
-    public function getBindValues()
-    {
-        switch($this->type)
-        {
-            case 'SELECT':
-                return $this->getBindValuesSELECT();
-            case 'DELETE':
-            case 'UPDATE':
-            case 'INSERT':
-                return $this->getBindValuesCOMMON();
-            default :
-                throw new \Exception("type err");
-        }
-    }
-    
-    /**
-     * 获取绑定在占位符上的值
-     * @return array
-     */
-    public function getBindValuesSELECT()
-    {
-        $bind_values = $this->bind_values;
-        $i = 1;
-        foreach ($this->bind_where as $val) {
-            $bind_values[$i] = $val;
-            $i ++;
-        }
-        foreach ($this->bind_having as $val) {
-            $bind_values[$i] = $val;
-            $i ++;
-        }
-        return $bind_values;
-    }
-    
-    /**
-     *
-     * SELECT选择哪些列
-     * @param mixed
-     * @return null
-     */
-    protected function addColSELECT($key, $val)
-    {
-        if (is_string($key)) {
-            $this->cols[$val] = $key;
-        } else {
-            $this->addColWithAlias($val);
-        }
-    }
-    
-    /**
-     * SELECT增加选择的列
-     * @param string
-     * @return null
-     */
-    protected function addColWithAlias($spec)
-    {
-        $parts = explode(' ', $spec);
-        $count = count($parts);
-        if ($count == 2) {
-            $this->cols[$parts[1]] = $parts[0];
-        } elseif ($count == 3 && strtoupper($parts[1]) == 'AS') {
-            $this->cols[$parts[2]] = $parts[0];
-        } else {
-            $this->cols[] = $spec;
-        }
-    }
-    
-    /**
-     * from 哪个表
-     * @param string $table
-     * @return self
-     */
-    public function from($table)
-    {
-        return $this->fromRaw($this->quoteName($table));
-    }
-    
-    /**
-     * from的表
-     * @param string $table
-     * @return self
-     */
-    public function fromRaw($table)
-    {
-        $this->from[] = array($table);
-        $this->from_key ++;
-        return $this;
-    }
-    /**
-     *
-     * 子查询
-     * @param string $table
-     * @param string $name The alias name for the sub-select.
-     * @return self
-     */
-    public function fromSubSelect($table, $name)
-    {
-        $this->from[] = array( "($table) AS " . $this->quoteName($name));
-        $this->from_key ++;
-        return $this;
-    }
-    
-    
-    /**
-     * 增加join语句
-     * @param string $join  inner, left, natural
-     * @param string $table
-     * @param string $cond
-     * @return self
-     * @throws Exception
-     */
-    public function join($table, $cond = null, $type = '')
-    {
-    	return $this->joinInternal($type, $table, $cond);
-    }
-    
-    /**
-     * 增加join语句
-     * @param string $join  inner, left, natural
-     * @param string $table 
-     * @param string $cond 
-     * @return self
-     * @throws Exception
-     */
-    protected function joinInternal($join, $table, $cond = null)
-    {
-        if (! $this->from) {
-            throw new \Exception('Cannot join() without from()');
-        }
-    
-        $join = strtoupper(ltrim("$join JOIN"));
-        $table = $this->quoteName($table);
-        $cond = $this->fixJoinCondition($cond);
-        $this->from[$this->from_key][] = rtrim("$join $table $cond");
-        return $this;
-    }
-    
-    /**
-     * quote
-     * @param string $cond 
-     * @return string
-     *
-     */
-    protected function fixJoinCondition($cond)
-    {
-        if (! $cond) {
-            return;
-        }
-    
-        $cond = $this->quoteNamesIn($cond);
-    
-        if (strtoupper(substr(ltrim($cond), 0, 3)) == 'ON ') {
-            return $cond;
-        }
-    
-        if (strtoupper(substr(ltrim($cond), 0, 6)) == 'USING ') {
-            return $cond;
-        }
-    
-        return 'ON ' . $cond;
-    }
-    
-    /**
-     * inner join
-     * @param string $spec
-     * @param string $cond 
-     * @return self
-     * @throws Exception
-     */
-    public function innerJoin($table, $cond = null)
-    {
-        return $this->joinInternal('INNER', $table, $cond);
-    }
-    
-    /**
-     * left join
-     * @param string $table
-     * @param string $cond 
-     * @return self
-     * @throws Exception
-     */
-    public function leftJoin($table, $cond = null)
-    {
-        return $this->joinInternal('LEFT', $table, $cond);
-    }
-    
-    /**
-     * right join
-     * @param string $table
-     * @param string $cond
-     * @return self
-     * @throws Exception
-     */
-    public function rightJoin($table, $cond = null)
-    {
-    	return $this->joinInternal('RIGHT', $table, $cond);
-    }
-    
-    /**
-     * joinSubSelect
-     * @param string $join  inner, left, natural
-     * @param string $spec
-     * @param string $name sub-select 的别名
-     * @param string $cond
-     * @return self
-     * @throws Exception
-     */
-    public function joinSubSelect($join, $spec, $name, $cond = null)
-    {
-        if (! $this->from) {
-            throw new \Exception('Cannot join() without from() first.');
-        }
-    
-        $join = strtoupper(ltrim("$join JOIN"));
-        $name = $this->quoteName($name);
-        $cond = $this->fixJoinCondition($cond);
-        $this->from[$this->from_key][] = rtrim("$join ($spec) AS $name $cond");
-        return $this;
-    }
-    
-    /**
-     * group by 语句
-     * @param array $cols
-     * @return self
-     */
-    public function groupBy(array $cols)
-    {
-        foreach ($cols as $col) {
-            $this->group_by[] = $this->quoteNamesIn($col);
-        }
-        return $this;
-    }
-    
-    /**
-     * having 语句
-     * @param string $cond
-     * @return self
-     */
-    public function having($cond)
-    {
-        $this->addClauseCondWithBind('having', 'AND', func_get_args());
-        return $this;
-    }
-    
-    /**
-     * or having 语句
-     * @param string $cond The HAVING condition.
-     * @return self
-     */
-    public function orHaving($cond)
-    {
-        $this->addClauseCondWithBind('having', 'OR', func_get_args());
-        return $this;
-    }
-    
-    /**
-     * 设置每页的记录数量
-     * @param int $page 
-     * @return self
-     */
-    public function page($page)
-    {
-        $this->limit  = 0;
-        $this->offset = 0;
-    
-        $page = (int) $page;
-        if ($page > 0) {
-            $this->limit  = $this->paging;
-            $this->offset = $this->paging * ($page - 1);
-        }
-        return $this;
-    }
-    
-    /**
-     * union
-     * @return self
-     */
-    public function union()
-    {
-        $this->union[] = $this->build() . ' UNION';
-        $this->reset();
-        return $this;
-    }
-    
-    /**
-     * unionAll
-     * @return self
-     */
-    public function unionAll()
-    {
-        $this->union[] = $this->build() . ' UNION ALL';
-        $this->reset();
-        return $this;
-    }
-    
-    /**
-     * 重置
-     * @return null
-     */
-    protected function reset()
-    {
-        $this->resetFlags();
-        $this->cols       = array();
-        $this->from       = array();
-        $this->from_key   = -1;
-        $this->where      = array();
-        $this->group_by   = array();
-        $this->having     = array();
-        $this->order_by   = array();
-        $this->limit      = 0;
-        $this->offset     = 0;
-        $this->for_update = false;
-    }
-    
-    /**
-     * 清除所有数据
-     * @return void
-     */
-    protected function resetAll()
-    {
-        $this->union = array();
-        $this->for_update = false;
-        $this->cols = array();
-        $this->from = array();
-        $this->from_key = -1;
-        $this->group_by = array();
-        $this->having = array();
-        $this->bind_having = array();
-        $this->paging = 10;
-        $this->bind_values = array();
-        $this->where = array();
-        $this->bind_where = array();
-        $this->order_by = array();
-        $this->limit = 0;
-        $this->offset = 0;
-        $this->flags = array();
-        $this->table = '';
-        $this->last_insert_id_names = array();
-        $this->col_values = array();
-        $this->returning = array();
-        $this->parameters = array();
-    }
-    
-    /**
-     * 创建 SELECT SQL
-     * @return string
-     */
-    protected function buildSELECT()
-    {
-        return 'SELECT'
-        . $this->buildFlags()
-        . $this->buildCols()
-        . $this->buildFrom()
-        . $this->buildWhere()
-        . $this->buildGroupBy()
-        . $this->buildHaving()
-        . $this->buildOrderBy()
-        . $this->buildLimit()
-        . $this->buildForUpdate();
-    }
-    
-    /**
-     * 创建DELETE SQL
-     */
-    protected function buildDELETE()
-    {
-        return 'DELETE'
-            . $this->buildFlags()
-            . $this->buildFrom()
-            . $this->buildWhere()
-            . $this->buildOrderBy()
-            . $this->buildLimit()
-            . $this->buildReturning();
-    }
-
-    /**
-     * 生成SELECT列语句
-     * @return string
-     * @throws Exception
-     */
-    protected function buildCols()
-    {
-        if (! $this->cols) {
-            throw new \Exception('No columns in the SELECT.');
-        }
-    
-        $cols = array();
-        foreach ($this->cols as $key => $val) {
-            if (is_int($key)) {
-                $cols[] = $this->quoteNamesIn($val);
-            } else {
-                $cols[] = $this->quoteNamesIn("$val AS $key");
-            }
-        }
-    
-        return $this->indentCsv($cols);
-    }
-    
-    /**
-     * 生成 FROM 语句.
-     * @return string
-     */
-    protected function buildFrom()
-    {
-        if (! $this->from) {
-            return '';
-        }
-    
-        $refs = array();
-        foreach ($this->from as $from) {
-            $refs[] = implode(' ', $from);
-        }
-        return ' FROM' . $this->indentCsv($refs);
-    }
-    
-    /**
-     * 生成 GROUP BY 语句.
-     * @return string
-     */
-    protected function buildGroupBy()
-    {
-        if (! $this->group_by) {
-            return ''; 
-        }
-        return ' GROUP BY' . $this->indentCsv($this->group_by);
-    }
-    
-    /**
-     * 生成 HAVING 语句.
-     * @return string
-     */
-    protected function buildHaving()
-    {
-        if (! $this->having) {
-            return ''; 
-        }
-        return ' HAVING' . $this->indent($this->having);
-    }
-    
-    /**
-     * 生成 FOR UPDATE 语句
-     * @return string
-     */
-    protected function buildForUpdate()
-    {
-        if (! $this->for_update) {
-            return ''; 
-        }
-        return ' FOR UPDATE';
-    }
-    
-    /**
-     * where
-     * @param string/array $cond 
-     * @param mixed ...$bind
-     * @return self
-     */
-    public function where($cond)
-    {
-    	if(is_array($cond))
-    	{
-    		foreach($cond as $key=>$val)
-    		{
-    			if(is_string($key))
-    			{
-    				$this->addWhere('AND', array($key, $val));
-    			}
-    			else
-    			{
-    				$this->addWhere('AND', array($val));
-    			}
-    		}
-    	}
-    	else 
-    	{
-        	$this->addWhere('AND', func_get_args());
-    	}
-        return $this;
-    }
-    
-    /**
-     * or where
-     * @param string/array $cond 
-     * @param mixed ...$bind
-     * @return self
-     */
-    public function orWhere($cond)
-    {
-    	if(is_array($cond))
-    	{
-    		foreach($cond as $key=>$val)
-    		{
-    			if(is_string($key))
-    			{
-    				$this->addWhere('OR', array($key, $val));
-    			}
-    			else
-    			{
-    				$this->addWhere('OR', array($val));
-    			}
-    		}
-    	}
-    	else
-    	{
-        	$this->addWhere('OR', func_get_args());
-    	}
-        return $this;
-    }
-    
-    /**
-     * limit 
-     * @param int $limit
-     * @return self
-     */
-    public function limit($limit)
-    {
-        $this->limit = (int) $limit;
-        return $this;
-    }
-    
-    /**
-     * limit offset
-     * @param int $offset 
-     * @return self
-     */
-    public function offset($offset)
-    {
-        $this->offset = (int) $offset;
-        return $this;
-    }
-    
-    /**
-     * orderby.
-     * @param array $cols
-     * @return self
-     */
-    public function orderBy(array $cols)
-    {
-        return $this->addOrderBy($cols);
-    }
-     /**
-     * order by ASC OR DESC
-     * @param array $cols
-     * @param bool $order_asc
-     * @return self
-     */
-    public function orderByASC(array $cols, $order_asc = true)
-    {
-        $this->order_asc = $order_asc;
-        return $this->addOrderBy($cols);
-    }
-    
-    // -------------abstractquery----------
-    /**
-     * 返回逗号分隔的字符串
-     * @param array $list
-     * @return string
-     */
-    protected function indentCsv(array $list)
-    {
-        return ' ' . implode(',', $list);
-    }
-    
-    /**
-     * 返回空格分隔的字符串
-     * @param array $list
-     * @return string
-     */
-    protected function indent(array $list)
-    {
-        return ' ' . implode(' ', $list);
-    }
-    
-    /**
-     * 批量为占位符绑定值
-     * @param array $bind_values
-     * @return self
-     *
-     */
-    public function bindValues(array $bind_values)
-    {
-        foreach ($bind_values as $key => $val) {
-            $this->bindValue($key, $val);
-        }
-        return $this;
-    }
-    
-    /**
-     * 单个为占位符绑定值
-     * @param string $name 
-     * @param mixed $value
-     * @return self
-     */
-    public function bindValue($name, $value)
-    {
-        $this->bind_values[$name] = $value;
-        return $this;
-    }
-    
-    /**
-     * 生成flag
-     * @return string
-     */
-    protected function buildFlags()
-    {
-        if (! $this->flags) {
-            return '';
-        }
-        return ' ' . implode(' ', array_keys($this->flags));
-    }
-    
-    /**
-     * 设置 flag.
-     * @param string $flag 
-     * @param bool $enable
-     * @return null
-     */
-    protected function setFlag($flag, $enable = true)
-    {
-        if ($enable) {
-            $this->flags[$flag] = true;
-        } else {
-            unset($this->flags[$flag]);
-        }
-    }
-    
-    /**
-     * 重置flag
-     * @return null
-     */
-    protected function resetFlags()
-    {
-        $this->flags = array();
-    }
-    
-    /**
-     *
-     * 添加where语句
-     * @param string $andor 'AND' or 'OR
-     * @param array $conditions
-     * @return self
-     *
-     */
-    protected function addWhere($andor, $conditions)
-    {
-        $this->addClauseCondWithBind('where', $andor, $conditions);
-        return $this;
-    }
-    
-    /**
-     * 添加条件和绑定值
-     * @param string $clause where 、having等
-     * @param string $andor AND、OR等
-     * @param array $conditions 
-     * @return null
-     */
-    protected function addClauseCondWithBind($clause, $andor, $conditions)
-    {
-        $cond = array_shift($conditions);
-        $cond = $this->quoteNamesIn($cond);
-    
-        $bind =& $this->{"bind_{$clause}"};
-        foreach ($conditions as $value) {
-            $bind[] = $value;
-        }
-    
-        $clause =& $this->$clause;
-        if ($clause) {
-            $clause[] = "$andor $cond";
-        } else {
-            $clause[] = $cond;
-        }
-    }
-    
-    /**
-     * 生成where语句
-     * @return string
-     */
-    protected function buildWhere()
-    {
-        if (! $this->where) {
-            return ''; 
-        }
-        return ' WHERE' . $this->indent($this->where);
-    }
-    
-    /**
-     * 增加order by
-     * @param array $spec The columns and direction to order by.
-     * @return self
-     */
-    protected function addOrderBy(array $spec)
-    {
-        foreach ($spec as $col) {
-            $this->order_by[] = $this->quoteNamesIn($col);
-        }
-        return $this;
-    }
-    
-    /**
-     * 生成order by 语句
-     * @return string
-     */
-    protected function buildOrderBy()
-    {
-        if (! $this->order_by) {
-            return ''; 
-        }
-
-        if ($this->order_asc)
-        {
-            return ' ORDER BY' . $this->indentCsv($this->order_by) . ' ASC';
-        }
-        else
-        {
-            return ' ORDER BY' . $this->indentCsv($this->order_by) . ' DESC';
-        }
-    }
-    
-    /**
-     * 生成limit语句
-     * @return string
-     */
-    protected function buildLimit()
-    {
-        $has_limit = $this->type == 'DELETE' || $this->type == 'UPDATE';
-        $has_offset = $this->type == 'SELECT';
-    
-        if ($has_offset && $this->limit) {
-            $clause = " LIMIT {$this->limit}";
-            if ($this->offset) {
-                $clause .= " OFFSET {$this->offset}";
-            }
-            return $clause;
-        } elseif ($has_limit && $this->limit) {
-            return " LIMIT {$this->limit}";
-        }
-        return ''; 
-    }
-
-    /**
-     * Quotes 
-     * @param string $spec
-     * @return string|array 
-     */
-    public function quoteName($spec)
-    {
-        $spec = trim($spec);
-        $seps = array(' AS ', ' ', '.');
-        foreach ($seps as $sep) {
-            $pos = strripos($spec, $sep);
-            if ($pos) {
-                return $this->quoteNameWithSeparator($spec, $sep, $pos);
-            }
-        }
-        return $this->replaceName($spec);
-    }
-
-    /**
-     * 指定分隔符的Quotes 
-     * @param string $spec 
-     * @param string $sep 
-     * @param string $pos 
-     * @return string 
-     */
-    protected function quoteNameWithSeparator($spec, $sep, $pos)
-    {
-        $len = strlen($sep);
-        $part1 = $this->quoteName(substr($spec, 0, $pos));
-        $part2 = $this->replaceName(substr($spec, $pos + $len));
-        return "{$part1}{$sep}{$part2}";
-    }
-
-    /**
-     * Quotes "table.col" 格式的字符串
-     * @param string $text 
-     * @return string|array 
-     */
-    public function quoteNamesIn($text)
-    {
-        $list = $this->getListForQuoteNamesIn($text);
-        $last = count($list) - 1;
-        $text = null;
-        foreach ($list as $key => $val) {
-            if (($key+1) % 3) {
-                $text .= $this->quoteNamesInLoop($val, $key == $last);
-            }
-        }
-        return $text;
-    }
-
-    /**
-     * 返回quote元素列表
-     * @param string $text 
-     * @return array
-     */
-    protected function getListForQuoteNamesIn($text)
-    {
-        $apos = "'";
-        $quot = '"';
-        return preg_split(
-            "/(($apos+|$quot+|\\$apos+|\\$quot+).*?\\2)/",
-            $text,
-            -1,
-            PREG_SPLIT_DELIM_CAPTURE
-        );
-    }
-
-    /**
-     * 循环quote
-     * @param string $val 
-     * @param bool $is_last
-     * @return string 
-     */
-    protected function quoteNamesInLoop($val, $is_last)
-    {
-        if ($is_last) {
-            return $this->replaceNamesAndAliasIn($val);
-        }
-        return $this->replaceNamesIn($val);
-    }
-
-    /**
-     *
-     * 替换成别名
-     * @param string $val 
-     * @return string
-     */
-    protected function replaceNamesAndAliasIn($val)
-    {
-        $quoted = $this->replaceNamesIn($val);
-        $pos = strripos($quoted, ' AS ');
-        if ($pos) {
-            $alias = $this->replaceName(substr($quoted, $pos + 4));
-            $quoted = substr($quoted, 0, $pos) . " AS $alias";
-        }
-        return $quoted;
-    }
-
-    /**
-     * Quotes name 
-     * @param string $name 
-     * @return string 
-     */
-    protected function replaceName($name)
-    {
-        $name = trim($name);
-        if ($name == '*') {
-            return $name;
-        }
-        return '`'. $name.'`';
-    }
-
-    /**
-     * Quotes 
-     * @param string $text
-     * @return string|array 
-     */
-    protected function replaceNamesIn($text)
-    {
-        $is_string_literal = strpos($text, "'") !== false
-                        || strpos($text, '"') !== false;
-        if ($is_string_literal) {
-            return $text;
-        }
-
-        $word = "[a-z_][a-z0-9_]+";
-
-        $find = "/(\\b)($word)\\.($word)(\\b)/i";
-
-        $repl = '$1`$2`.`$3`$4';
-
-        $text = preg_replace($find, $repl, $text);
-
-        return $text;
-    }
-    
-     // ---------- insert --------------
-    /**
-     * 设置 `table.column` 与 last-insert-id 的映射
-     * @param array $insert_id_names
-     */
-    public function setLastInsertIdNames(array $last_insert_id_names)
-    {
-        $this->last_insert_id_names = $last_insert_id_names;
-    }
-
-    /**
-     * insert into.
-     * @param string $into
-     * @return self
-     */
-    public function into($table)
-    {
-        $this->table = $this->quoteName($table);
-        return $this;
-    }
-
-    /**
-     * 生成INSERT 语句
-     * @return string
-     */
-    protected function buildINSERT()
-    {
-        return 'INSERT'
-            . $this->buildFlags()
-            . $this->buildInto()
-            . $this->buildValuesForInsert()
-            . $this->buildReturning();
-    }
-
-    /**
-     * 生成 INTO 语句
-     * @return string
-     */
-    protected function buildInto()
-    {
-        return " INTO " . $this->table;
-    }
-
-    /**
-     * PDO::lastInsertId()
-     * @param string $col 
-     * @return mixed 
-     */
-    public function getLastInsertIdName($col)
-    {
-        $key = str_replace('`', '', $this->table) . '.' . $col;
-        if (isset($this->last_insert_id_names[$key])) {
-            return $this->last_insert_id_names[$key];
-        }
-    }
-
-    /**
-     *
-     * 设置一列,如果有第二各参数,则把第二个参数绑定在占位符上
-     * @param string $col 
-     * @param mixed  $val
-     * @return self
-     */
-    public function col($col)
-    {
-        return call_user_func_array(array($this, 'addCol'), func_get_args());
-    }
-
-    /**
-     * 设置多列
-     * @param array $cols 
-     * @return self
-     */
-    public function cols(array $cols)
-    {
-        if($this->type == 'SELECT')
-        {
-            foreach ($cols as $key => $val) 
-            {
-                $this->addColSELECT($key, $val);
-            }
-            return $this;
-        }
-        return $this->addCols($cols);
-    }
-
-    /**
-     * 直接设置列的值
-     * @param string $col
-     * @param string $value
-     * @return self
-     */
-    public function set($col, $value)
-    {
-        return $this->setCol($col, $value);
-    }
-
-    /**
-     * 为INSERT语句绑定值
-     * @return string
-     */
-    protected function buildValuesForInsert()
-    {
-        return ' ('.$this->indentCsv(array_keys($this->col_values)).') VALUES (' . $this->indentCsv(array_values($this->col_values)) . ')';
-    }
-
-    // ------update-------
-    /**
-     * 更新哪个表
-     * @param string $table
-     * @return self
-     */
-    public function table($table)
-    {
-        $this->table = $this->quoteName($table);
-        return $this;
-    }
-
-    /**
-     * 生成完整SQL语句
-     * @return string
-     */
-    protected function build()
-    {
-        switch($this->type)
-        {
-           case 'DELETE':
-             return $this->buildDELETE();
-           case 'INSERT':
-             return $this->buildINSERT();
-           case 'UPDATE':
-             return $this->buildUPDATE();
-           case 'SELECT':
-             return $this->buildSELECT();
-        }
-        throw new \Exception("type empty");
-    }
-    
-    /**
-     * 生成更新的SQL语句
-     */
-    protected function buildUPDATE()
-    {
-        return 'UPDATE'
-            . $this->buildFlags()
-            . $this->buildTable()
-            . $this->buildValuesForUpdate()
-            . $this->buildWhere()
-            . $this->buildOrderBy()
-            . $this->buildLimit()
-            . $this->buildReturning();
-   
-    }
-
-    /**
-     * 哪个表
-     * @return null
-     */
-    protected function buildTable()
-    {
-        return " {$this->table}";
-    }
-
-    /**
-     * 为更新语句绑定值
-     * @return string
-     */
-    protected function buildValuesForUpdate()
-    {
-        $values = array();
-        foreach ($this->col_values as $col => $value) {
-            $values[] = "{$col} = {$value}";
-        }
-        return ' SET' . $this->indentCsv($values);
-    }
-   
-    // ----------Dml---------------
-    /**
-     * 获取绑定的值
-     * @return array
-     */
-    public function getBindValuesCOMMON()
-    {
-        $bind_values = $this->bind_values;
-        $i = 1;
-        foreach ($this->bind_where as $val) {
-            $bind_values[$i] = $val;
-            $i ++;
-        }
-        return $bind_values;
-    }
-
-    /**
-     * 设置列
-     * @param string $col
-     * @param mixed $val
-     * @return self
-     */
-    protected function addCol($col)
-    {
-        $key = $this->quoteName($col);
-        $this->col_values[$key] = ":$col";
-        $args = func_get_args();
-        if (count($args) > 1) {
-            $this->bindValue($col, $args[1]);
-        }
-        return $this;
-    }
-
-    /**
-     * 设置多个列
-     * @param array $cols
-     * @return self
-     */
-    protected function addCols(array $cols)
-    {
-        foreach ($cols as $key => $val) {
-            if (is_int($key)) {
-                $this->addCol($val);
-            } else {
-                $this->addCol($key, $val);
-            }
-        }
-        return $this;
-    }
-
-    /**
-     * 设置单列的值
-     * @param string $col .
-     * @param string $value
-     * @return self
-     */
-    protected function setCol($col, $value)
-    {
-        if ($value === null) {
-            $value = 'NULL';
-        }
-
-        $key = $this->quoteName($col);
-        $value = $this->quoteNamesIn($value);
-        $this->col_values[$key] = $value;
-        return $this;
-    }
-
-    /**
-     * 增加返回的列
-     * @param array $cols
-     * @return self
-     *
-     */
-    protected function addReturning(array $cols)
-    {
-        foreach ($cols as $col) {
-            $this->returning[] = $this->quoteNamesIn($col);
-        }
-        return $this;
-    }
-
-    /**
-     * 生成 RETURNING 语句
-     * @return string
-     */
-    protected function buildReturning()
-    {
-        if (! $this->returning) {
-            return '';
-        }
-        return ' RETURNING' . $this->indentCsv($this->returning);
-    }
-    
-    /**
-     * 构造函数
-     */
-    public function __construct($host, $port, $user, $password, $db_name, $charset = 'utf8')
-    {
-        $this->settings = array(
-            'host'          => $host,
-            'port'          => $port,
-            'user'          => $user,
-            'password' => $password,
-            'dbname'  => $db_name,
-            'charset'    => $charset,
-        );
-        $this->connect();
-    }
-    
-    /**
-     * 创建pdo实例
-     */
-    protected function connect()
-    {
-        $dsn = 'mysql:dbname='.$this->settings["dbname"].';host='.$this->settings["host"].';port='.$this->settings['port'];
-        $this->pdo = new \PDO($dsn, $this->settings["user"], $this->settings["password"], array(\PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . (!empty($this->settings['charset']) ? $this->settings['charset'] : 'utf8')));
-        $this->pdo->setAttribute(\PDO::ATTR_ERRMODE, \PDO::ERRMODE_EXCEPTION);
-        $this->pdo->setAttribute(\PDO::ATTR_EMULATE_PREPARES, false);
-    }
-   
-    /*
-    *   关闭连接
-    */
-    public function closeConnection()
-    {
-        $this->pdo = null;
-    }
-
-    /**
-     * 执行
-     * @param string $query
-     * @param string $parameters
-     */
-    protected function execute($query,$parameters = "")
-    {
-        try {
-            $this->sQuery = $this->pdo->prepare($query);
-            $this->bindMore($parameters);
-            if(!empty($this->parameters)) {
-                foreach($this->parameters as $param)
-                {
-                    $parameters = explode("\x7F",$param);
-                    $this->sQuery->bindParam($parameters[0],$parameters[1]);
-                }
-            }
-            $this->succes  = $this->sQuery->execute();
-        }
-        catch(\PDOException $e)
-        {
-            // 服务端断开时重连一次
-            if($e->errorInfo[1] == 2006 || $e->errorInfo[1] == 2013)
-            {
-                $this->closeConnection();
-                $this->connect();
-                
-                try {
-                    $this->sQuery = $this->pdo->prepare($query);
-                    $this->bindMore($parameters);
-                    if(!empty($this->parameters)) {
-                        foreach($this->parameters as $param)
-                        {
-                            $parameters = explode("\x7F",$param);
-                            $this->sQuery->bindParam($parameters[0],$parameters[1]);
-                        }
-                    }
-                    $this->succes  = $this->sQuery->execute();
-                }   
-                catch(\PDOException $ex)
-                {
-                    $this->rollBackTrans();
-                    throw $ex;
-                }
-            }
-            else
-            {
-                $this->rollBackTrans();
-                throw $e;
-            }
-        }
-        $this->parameters = array();
-    }
-    
-    /**
-    * 绑定
-    * @param string $para
-    * @param string $value
-    */
-    public function bind($para, $value)
-    {
-    	if(is_string($para))
-    	{
-        	$this->parameters[sizeof($this->parameters)] = ":" . $para . "\x7F" . $value;
-    	}
-    	else
-    	{
-    		$this->parameters[sizeof($this->parameters)] = $para . "\x7F" . $value;
-    	}
-    }
-    
-    /**
-     * 绑定多个
-     * @param array $parray
-     */
-    public function bindMore($parray)
-    {
-        if(empty($this->parameters) && is_array($parray)) {
-            $columns = array_keys($parray);
-            foreach($columns as $i => &$column)    {
-                $this->bind($column, $parray[$column]);
-            }
-        }
-    }
-    
-    /**
-     * 执行SQL
-     * @param  string $query
-     * @param  array  $params
-     * @param  int    $fetchmode
-     * @return mixed
-     */
-    public function query($query = '',$params = null, $fetchmode = \PDO::FETCH_ASSOC)
-    {
-        $query = trim($query);
-        if(empty($query))
-        {
-            $query = $this->build();
-            if(!$params)
-            {
-                $params = $this->getBindValues();
-            }
-        }
-        
-        $this->resetAll();
-        $this->lastSql = $query;
-        
-        $this->execute($query,$params);
-        
-        $rawStatement = explode(" ", $query);
-            
-        $statement = strtolower(trim($rawStatement[0]));
-        if ($statement === 'select' || $statement === 'show') {
-            return $this->sQuery->fetchAll($fetchmode);
-        }
-        elseif ( $statement === 'update' || $statement === 'delete' ) {
-            return $this->sQuery->rowCount();
-        }
-        elseif( $statement === 'insert' ){
-            if( $this->sQuery->rowCount() > 0 ){
-                return $this->lastInsertId();
-            }
-        }
-        else {
-            return NULL;
-        }
-    }
-    
-    /**
-    * 返回一列
-    * @param  string $query
-    * @param  array  $params
-    * @return array
-    */
-    public function column($query = '',$params = null)
-    {
-        $query = trim($query);
-        if(empty($query))
-        {
-            $query = $this->build();
-            if(!$params)
-            {
-                $params = $this->getBindValues();
-            }
-        }
-        
-        $this->resetAll();
-        $this->lastSql = $query;
-        
-        $this->execute($query,$params);
-        $columns = $this->sQuery->fetchAll(\PDO::FETCH_NUM);
-        $column = null;
-        foreach($columns as $cells) {
-            $column[] = $cells[0];
-        }
-        return $column;
-    }
-    
-    /**
-    * 返回一行
-    * @param  string $query
-    * @param  array  $params
-    * @param  int    $fetchmode
-    * @return array
-    */
-    public function row($query = '',$params = null, $fetchmode = \PDO::FETCH_ASSOC)
-    {
-        $query = trim($query);
-        if(empty($query))
-        {
-            $query = $this->build();
-            if(!$params)
-            {
-                $params = $this->getBindValues();
-            }
-        }
-        
-        $this->resetAll();
-        $this->lastSql = $query;
-        
-        $this->execute($query,$params);
-        return $this->sQuery->fetch($fetchmode);
-    }
-    
-    /**
-    * 返回单个值
-    * @param  string $query
-    * @param  array  $params
-    * @return string
-    */
-    public function single($query = '',$params = null)
-    {
-        $query = trim($query);
-        if(empty($query))
-        {
-            $query = $this->build();
-            if(!$params)
-            {
-                $params = $this->getBindValues();
-            }
-        }
-        
-        $this->resetAll();
-        $this->lastSql = $query;
-        
-        $this->execute($query,$params);
-        return $this->sQuery->fetchColumn();
-    }
-    
-    /**
-     * 返回lastInsertId
-     * @return string
-     */
-    public function lastInsertId() {
-        return $this->pdo->lastInsertId();
-    }
-    
-    /**
-     * 返回最后一条直行的sql
-     * @return  string
-     */
-    public function lastSQL()
-    {
-        return $this->lastSql;
-    }
-
-    /**
-     * 开始事务 
-     */
-
-    public function beginTrans()
-    {
-        $this->pdo->beginTransaction();
-    }
-
-    /**
-     * 提交事务 
-     */
-
-    public function commitTrans()
-    {
-        $this->pdo->commit();
-    }
-
-    /**
-     * 事务回滚 
-     */
-
-    public function rollBackTrans()
-    {
-        if ($this->pdo->inTransaction())
-        {
-            $this->pdo->rollBack();
-        }
-    }
-}
-

+ 0 - 314
GatewayWorker/Lib/Gateway.php

@@ -1,314 +0,0 @@
-<?php
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace GatewayWorker\Lib;
-
-/**
- * 数据发送相关
- */
-use \Workerman\Protocols\GatewayProtocol;
-use \GatewayWorker\Lib\Store;
-use \GatewayWorker\Lib\Context;
-
-class Gateway
-{
-    /**
-     * gateway实例
-     * @var object
-     */
-    protected static  $businessWorker = null;
-    
-   /**
-    * 向所有客户端(或者client_id_array指定的客户端)广播消息
-    * @param string $message 向客户端发送的消息
-    * @param array $client_id_array 客户端id数组
-    */
-   public static function sendToAll($message, $client_id_array = null)
-   {
-       $gateway_data = GatewayProtocol::$empty;
-       $gateway_data['cmd'] = GatewayProtocol::CMD_SEND_TO_ALL;
-       $gateway_data['body'] = $message;
-       
-       if($client_id_array)
-       {
-           $params = array_merge(array('N*'), $client_id_array);
-           $gateway_data['ext_data'] = call_user_func_array('pack', $params);
-       }
-       elseif(empty($client_id_array) && is_array($client_id_array))
-       {
-           return;
-       }
-       
-       // 如果有businessWorker实例,说明运行在workerman环境中,通过businessWorker中的长连接发送数据
-       if(self::$businessWorker)
-       {
-           foreach(self::$businessWorker->gatewayConnections as $gateway_connection)
-           {
-               $gateway_connection->send($gateway_data);
-           }
-       }
-       // 运行在其它环境中,使用udp向worker发送数据
-       else
-       {
-           $all_addresses = Store::instance('gateway')->get('GLOBAL_GATEWAY_ADDRESS');
-           if(!$all_addresses)
-           {
-               throw new \Exception('GLOBAL_GATEWAY_ADDRESS is ' . var_export($all_addresses, true));
-           }
-           foreach($all_addresses as $address)
-           {
-               self::sendToGateway($address, $gateway_data);
-           }
-       }
-   }
-   
-   /**
-    * 向某个客户端发消息
-    * @param int $client_id 
-    * @param string $message
-    */
-   public static function sendToClient($client_id, $message)
-   {
-       return self::sendCmdAndMessageToClient($client_id, GatewayProtocol::CMD_SEND_TO_ONE, $message);
-   } 
-   
-   /**
-    * 向当前客户端发送消息
-    * @param string $message
-    */
-   public static function sendToCurrentClient($message)
-   {
-       return self::sendCmdAndMessageToClient(null, GatewayProtocol::CMD_SEND_TO_ONE, $message);
-   }
-   
-   /**
-    * 判断某个客户端是否在线
-    * @param int $client_id
-    * @return 0/1
-    */
-   public static function isOnline($client_id)
-   {
-       $address = Store::instance('gateway')->get('client_id-'.$client_id);
-       if(!$address)
-       {
-           return 0;
-       }
-       $gateway_data = GatewayProtocol::$empty;
-       $gateway_data['cmd'] = GatewayProtocol::CMD_IS_ONLINE;
-       $gateway_data['client_id'] = $client_id;
-       return self::sendUdpAndRecv($address, $gateway_data);
-   }
-   
-   /**
-    * 获取在线状态,目前返回一个在线client_id数组
-    * @return array
-    */
-   public static function getOnlineStatus()
-   {
-       $gateway_data = GatewayProtocol::$empty;
-       $gateway_data['cmd'] = GatewayProtocol::CMD_GET_ONLINE_STATUS;
-       $gateway_buffer = GatewayProtocol::encode($gateway_data);
-       
-       $all_addresses = Store::instance('gateway')->get('GLOBAL_GATEWAY_ADDRESS');
-       $client_array = $status_data = array();
-       // 批量向所有gateway进程发送CMD_GET_ONLINE_STATUS命令
-       foreach($all_addresses as $address)
-       {
-           $client = stream_socket_client("udp://$address", $errno, $errmsg);
-           if(strlen($gateway_buffer) === stream_socket_sendto($client, $gateway_buffer))
-           {
-               $client_id = (int) $client;
-               $client_array[$client_id] = $client;
-           }
-       }
-       // 超时1秒
-       $time_out = 1;
-       $time_start = microtime(true);
-       // 批量接收请求
-       while(count($client_array) > 0)
-       {
-           $write = $except = array();
-           $read = $client_array;
-           if(@stream_select($read, $write, $except, $time_out))
-           {
-               foreach($read as $client)
-               {
-                   // udp
-                   $data = json_decode(fread($client, 65535), true);
-                   if($data)
-                   {
-                       $status_data = array_merge($status_data, $data);
-                   }
-                   unset($client_array[$client]);
-               }
-           }
-           if(microtime(true) - $time_start > $time_out)
-           {
-               break;
-           }
-       }
-       return $status_data;
-   }
-   
-   /**
-    * 关闭某个客户端
-    * @param int $client_id
-    * @param string $message
-    */
-   public static function closeClient($client_id)
-   {
-       if($client_id === Context::$client_id)
-       {
-           return self::closeCurrentClient();
-       }
-       // 不是发给当前用户则使用存储中的地址
-       else
-       {
-           $address = Store::instance('gateway')->get('client_id-'.$client_id);
-           if(!$address)
-           {
-               return false;
-           }
-           return self::kickAddress($address, $client_id);
-       }
-   }
-   
-   /**
-    * 踢掉当前客户端
-    * @param string $message
-    */
-   public static function closeCurrentClient()
-   {
-       return self::kickAddress(Context::$local_ip.':'.Context::$local_port, Context::$client_id);
-   }
-   
-   /**
-    * 更新session,框架自动调用,开发者不要调用
-    * @param int $client_id
-    * @param string $session_str
-    */
-   public static function updateSocketSession($client_id, $session_str)
-   {
-       $gateway_data = GatewayProtocol::$empty;
-       $gateway_data['cmd'] = GatewayProtocol::CMD_UPDATE_SESSION;
-       $gateway_data['client_id'] = $client_id;
-       $gateway_data['ext_data'] = $session_str;
-       return self::sendToGateway(Context::$local_ip . ':' . Context::$local_port, $gateway_data);
-   }
-   
-   /**
-    * 想某个用户网关发送命令和消息
-    * @param int $client_id
-    * @param int $cmd
-    * @param string $message
-    * @return boolean
-    */
-   protected static function sendCmdAndMessageToClient($client_id, $cmd , $message)
-   {
-       // 如果是发给当前用户则直接获取上下文中的地址
-       if($client_id === Context::$client_id || $client_id === null)
-       {
-           $address = Context::$local_ip.':'.Context::$local_port;
-       }
-       else
-       {
-           $address = Store::instance('gateway')->get('client_id-'.$client_id);
-           if(!$address)
-           {
-               return false;
-           }
-       }
-       $gateway_data = GatewayProtocol::$empty;
-       $gateway_data['cmd'] = $cmd;
-       $gateway_data['client_id'] = $client_id ? $client_id : Context::$client_id;
-       $gateway_data['body'] = $message;
-       
-       return self::sendToGateway($address, $gateway_data);
-   }
-   
-   /**
-    * 发送udp数据并返回
-    * @param int $address
-    * @param string $message
-    * @return boolean
-    */
-   protected static function sendUdpAndRecv($address , $data)
-   {
-       $buffer = GatewayProtocol::encode($data);
-       // 非workerman环境,使用udp发送数据
-       $client = stream_socket_client("udp://$address", $errno, $errmsg);
-       if(strlen($buffer) == stream_socket_sendto($client, $buffer))
-       {
-           // 阻塞读
-           stream_set_blocking($client, 1);
-           // 1秒超时
-           stream_set_timeout($client, 1);
-           // 读udp数据
-           $data = fread($client, 655350);
-           // 返回结果
-           return json_decode($data, true);
-       }
-       else
-       {
-           throw new \Exception("sendUdpAndRecv($address, \$bufer) fail ! Can not send UDP data!", 502);
-       }
-   }
-   
-   /**
-    * 发送数据到网关
-    * @param string $address
-    * @param string $buffer
-    */
-   protected static function sendToGateway($address, $gateway_data)
-   {
-       // 有$businessWorker说明是workerman环境,使用$businessWorker发送数据
-       if(self::$businessWorker)
-       {
-           if(!isset(self::$businessWorker->gatewayConnections[$address]))
-           {
-               return false;
-           }
-           return self::$businessWorker->gatewayConnections[$address]->send($gateway_data);
-       }
-       // 非workerman环境,使用udp发送数据
-       $gateway_buffer = GatewayProtocol::encode($gateway_data);
-       $client = stream_socket_client("udp://$address", $errno, $errmsg);
-       return strlen($gateway_buffer) == stream_socket_sendto($client, $gateway_buffer);
-   }
-   
-   /**
-    * 踢掉某个网关的socket
-    * @param string $local_ip
-    * @param int $local_port
-    * @param int $client_id
-    * @param string $message
-    * @param int $client_id
-    */
-   protected  static function kickAddress($address, $client_id)
-   {
-       $gateway_data = GatewayProtocol::$empty;
-       $gateway_data['cmd'] = GatewayProtocol::CMD_KICK;
-       $gateway_data['client_id'] = $client_id;
-       return self::sendToGateway($address, $gateway_data);
-   }
-   
-   /**
-    * 设置gateway实例
-    * @param Bootstrap/Gateway $gateway_instance
-    */
-   public static function setBusinessWorker($business_worker_instance)
-   {
-       self::$businessWorker = $business_worker_instance;
-   }
- 
-}

+ 0 - 67
GatewayWorker/Lib/Lock.php

@@ -1,67 +0,0 @@
-<?php
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace GatewayWorker\Lib;
-/**
- * 锁
- * 基于文件锁实现
- */
-class Lock
-{
-    /**
-     * handle
-     * @var resource
-     */
-    private static $fileHandle = null;
-    
-    /**
-     * 获取锁
-     * @param bool block
-     * @return bool
-     */
-    public static function get($block=true)
-    {
-        $operation = $block ? LOCK_EX : LOCK_EX | LOCK_NB;
-        if(self::getHandle())
-        {
-            return flock(self::$fileHandle, $operation);
-        }
-        return false;
-    }
-    
-    /**
-     * 释放锁
-     * @return true
-     */
-    public static function release()
-    {
-        if(self::getHandle())
-        {
-            return flock(self::$fileHandle, LOCK_UN);
-        }
-        return false;
-    }
-    
-    /**
-     * 获得文件句柄
-     * @return resource
-     */
-    protected static function getHandle()
-    {
-        if(!self::$fileHandle)
-        {
-            self::$fileHandle = fopen(__FILE__, 'r+');
-        }
-        return self::$fileHandle;
-    }
-}

+ 0 - 97
GatewayWorker/Lib/Store.php

@@ -1,97 +0,0 @@
-<?php
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace GatewayWorker\Lib;
-
-/**
- * 存储类
- * 这里用memcache实现
- */
-class Store
-{
-    /**
-     * 实例数组
-     * @var array
-     */
-    protected static $instance = array();
-    
-    /**
-     * 获取实例
-     * @param string $config_name
-     * @throws \Exception
-     */
-    public static function instance($config_name)
-    {
-        // memcached 驱动
-        if(\Config\Store::$driver == \Config\Store::DRIVER_MC)
-        {
-            if(!isset(\Config\Store::$$config_name))
-            {
-                echo "\\Config\\Store::$config_name not set\n";
-                throw new \Exception("\\Config\\Store::$config_name not set\n");
-            }
-            
-            if(!isset(self::$instance[$config_name]))
-            {
-                if(extension_loaded('Memcached'))
-                {
-                    self::$instance[$config_name] = new \Memcached;
-                }
-                elseif(extension_loaded('Memcache'))
-                {
-                    self::$instance[$config_name] = new \Memcache;
-                }
-                else
-                {
-                    sleep(2);
-                    exit("extension memcached is not installed\n");
-                }
-                foreach(\Config\Store::$$config_name as $address)
-                {
-                    list($ip, $port) = explode(':', $address);
-                    self::$instance[$config_name] ->addServer($ip, $port);
-                }
-            }
-            return self::$instance[$config_name];
-        }
-        // redis 驱动
-        elseif(\Config\Store::$driver == \Config\Store::DRIVER_REDIS)
-        {
-            if(!isset(\Config\Store::$$config_name))
-            {
-                echo "\\Config\\Store::$config_name not set\n";
-                throw new \Exception("\\Config\\Store::$config_name not set\n");
-            }
-            if(!isset(self::$instance[$config_name]))
-            {
-                self::$instance[$config_name] = new \GatewayWorker\Lib\StoreDriver\Redis();
-                // 只选择第一个ip作为服务端
-                $address = current(\Config\Store::$$config_name);
-                list($ip, $port) = explode(':', $address);
-                $timeout = 1;
-                self::$instance[$config_name]->connect($ip, $port, $timeout);
-                self::$instance[$config_name]->setOption(\Redis::OPT_SERIALIZER, \Redis::SERIALIZER_PHP);
-            }
-            return self::$instance[$config_name];
-        }
-        // 文件驱动
-        else 
-        {
-            if(!isset(self::$instance[$config_name]))
-            {
-                self::$instance[$config_name] = new \GatewayWorker\Lib\StoreDriver\File($config_name);
-            }
-            return self::$instance[$config_name];
-        }
-    }
-}

+ 0 - 111
GatewayWorker/Lib/StoreDriver/File.php

@@ -1,111 +0,0 @@
-<?php
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace GatewayWorker\Lib\StoreDriver;
-
-/**
- * 这里用php数组文件来存储数据,
- * 为了获取高性能需要用类似memcache的存储
- */
-
-class File
-{
-    // 为了避免频繁读取磁盘,增加了缓存机制
-    protected $dataCache = array();
-    // 上次缓存时间
-    protected $lastCacheTime = 0;
-    // 打开文件的句柄
-    protected $dataFileHandle = null;
-    
-    /**
-     * 构造函数
-     * @param 配置名 $config_name
-     */
-    public function __construct($config_name)
-    {
-        if(!is_dir(\Config\Store::$storePath) && !@mkdir(\Config\Store::$storePath, 0777, true))
-        {
-            // 可能目录已经被其它进程创建
-            clearstatcache();
-            if(!is_dir(\Config\Store::$storePath))
-            {
-                // 避免狂刷日志
-                sleep(1);
-                throw new \Exception('cant not mkdir('.\Config\Store::$storePath.')');
-            }
-        }
-        $this->dataFileHandle = fopen(__FILE__, 'r');
-        if(!$this->dataFileHandle)
-        {
-            throw new \Exception("can not fopen dataFileHandle");
-        }
-    }
-    
-    /**
-     * 设置
-     * @param string $key
-     * @param mixed $value
-     * @param int $ttl
-     * @return number
-     */
-    public function set($key, $value, $ttl = 0)
-    {
-        return file_put_contents(\Config\Store::$storePath.'/'.$key, serialize($value), LOCK_EX);
-    }
-    
-    /**
-     * 读取
-     * @param string $key
-     * @param bool $use_cache
-     * @return Ambigous <NULL, multitype:>
-     */
-    public function get($key, $use_cache = true)
-    {
-        $ret = @file_get_contents(\Config\Store::$storePath.'/'.$key);
-        return $ret ? unserialize($ret) : null;
-    }
-   
-    /**
-     * 删除
-     * @param string $key
-     * @return number
-     */
-    public function delete($key)
-    {
-        return @unlink(\Config\Store::$storePath.'/'.$key);
-    }
-    
-    /**
-     * 自增
-     * @param string $key
-     * @return boolean|multitype:
-     */
-    public function increment($key)
-    {
-        flock($this->dataFileHandle, LOCK_EX);
-        $val = $this->get($key);
-        $val = !$val ? 1 : ++$val;
-        file_put_contents(\Config\Store::$storePath.'/'.$key, serialize($val));
-        flock($this->dataFileHandle, LOCK_UN);
-        return $val;
-    }
-    
-    /**
-     * 清零销毁存储数据
-     */
-    public function destroy()
-    {
-        
-    }
-    
-}

+ 0 - 26
GatewayWorker/Lib/StoreDriver/Redis.php

@@ -1,26 +0,0 @@
-<?php
-/**
- * This file is part of workerman.
- *
- * Licensed under The MIT License
- * For full copyright and license information, please see the MIT-LICENSE.txt
- * Redistributions of files must retain the above copyright notice.
- *
- * @author walkor<walkor@workerman.net>
- * @copyright walkor<walkor@workerman.net>
- * @link http://www.workerman.net/
- * @license http://www.opensource.org/licenses/mit-license.php MIT License
- */
-namespace GatewayWorker\Lib\StoreDriver;
-
-/**
- * Redis
- */
-
-class Redis extends \Redis
-{
-    public function increment($key)
-    {
-        return parent::incr($key);
-    }
-}