Browse Source

3.0.5 onWebSocketConnect && Gateway/Worker onXX support

walkor 10 years ago
parent
commit
66540ce0e9

+ 26 - 6
GatewayWorker/BusinessWorker.php

@@ -35,7 +35,13 @@ class BusinessWorker extends Worker
      * 连接失败gateway内部通讯地址
      * @var array
      */
-    public $badGatewayAddress = array();
+    protected $_badGatewayAddress = array();
+    
+    /**
+     * 保存用户设置的worker启动回调
+     * @var callback
+     */
+    protected $_onWorkerStart = null;
     
     /**
      * 构造函数
@@ -44,13 +50,23 @@ class BusinessWorker extends Worker
      */
     public function __construct($socket_name = '', $context_option = array())
     {
-        $this->onWorkerStart = array($this, 'onWorkerStart');
         parent::__construct($socket_name, $context_option);
         $backrace = debug_backtrace();
         $this->_appInitPath = dirname($backrace[0]['file']);
     }
     
     /**
+     * 运行
+     * @see Workerman.Worker::run()
+     */
+    public function run()
+    {
+        $this->_onWorkerStart = $this->onWorkerStart;
+        $this->onWorkerStart = array($this, 'onWorkerStart');
+        parent::run();
+    }
+    
+    /**
      * 当进程启动时一些初始化工作
      * @return void
      */
@@ -59,6 +75,10 @@ class BusinessWorker extends Worker
         Timer::add(1, array($this, 'checkGatewayConnections'));
         $this->checkGatewayConnections();
         \GatewayWorker\Lib\Gateway::setBusinessWorker($this);
+        if($this->_onWorkerStart)
+        {
+            call_user_func($this->_onWorkerStart, $this);
+        }
     }
     
     /**
@@ -172,7 +192,7 @@ class BusinessWorker extends Worker
     public function onConnectGateway($connection)
     {
         $this->gatewayConnections[$connection->remoteAddress] = $connection;
-        unset($this->badGatewayAddress[$connection->remoteAddress]);
+        unset($this->_badGatewayAddress[$connection->remoteAddress]);
     }
     
     /**
@@ -194,12 +214,12 @@ class BusinessWorker extends Worker
     public function tryToDeleteGatewayAddress($addr, $errstr)
     {
         $key = 'GLOBAL_GATEWAY_ADDRESS';
-        if(!isset($this->badGatewayAddress[$addr]))
+        if(!isset($this->_badGatewayAddress[$addr]))
         {
-            $this->badGatewayAddress[$addr] = 0;
+            $this->_badGatewayAddress[$addr] = 0;
         }
         // 删除连不上的端口
-        if($this->badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
+        if($this->_badGatewayAddress[$addr]++ > self::MAX_RETRY_COUNT)
         {
             Lock::get();
             $addresses_list = Store::instance('gateway')->get($key);

+ 75 - 4
GatewayWorker/Gateway.php

@@ -78,21 +78,71 @@ class Gateway extends Worker
     protected $_innerUdpWorker = null;
     
     /**
+     * 当worker启动时
+     * @var callback
+     */
+    protected $_onWorkerStart = null;
+    
+    /**
+     * 当有客户端连接时
+     * @var callback
+     */
+    protected $_onConnect = null;
+    
+    /**
+     * 当客户端发来消息时
+     * @var callback
+     */
+    protected $_onMessage = null;
+    
+    /**
+     * 当客户端连接关闭时
+     * @var callback
+     */
+    protected $_onClose = null;
+    
+    /**
+     * 当worker停止时
+     * @var callback
+     */
+    protected $_onWorkerStop = null;
+    
+    /**
      * 构造函数
      * @param string $socket_name
      * @param array $context_option
      */
     public function __construct($socket_name, $context_option = array())
     {
+        parent::__construct($socket_name, $context_option);
+        
+        $backrace = debug_backtrace();
+        $this->_appInitPath = dirname($backrace[0]['file']);
+    }
+    
+    /**
+     * 运行
+     * @see Workerman.Worker::run()
+     */
+    public function run()
+    {
+        // 保存用户的回调,当对应的事件发生时触发
+        $this->_onWorkerStart = $this->onWorkerStart;
         $this->onWorkerStart = array($this, 'onWorkerStart');
+        // 保存用户的回调,当对应的事件发生时触发
+        $this->_onConnect = $this->onConnect;
         $this->onConnect = array($this, 'onClientConnect');
+        
+        // onMessage禁止用户设置回调
         $this->onMessage = array($this, 'onClientMessage');
+        
+        // 保存用户的回调,当对应的事件发生时触发
+        $this->_onClose = $this->onClose;
         $this->onClose = array($this, 'onClientClose');
+        // 保存用户的回调,当对应的事件发生时触发
+        $this->_onWorkerStop = $this->onWorkerStop;
         $this->onWorkerStop = array($this, 'onWorkerStop');
-        parent::__construct($socket_name, $context_option);
-        
-        $backrace = debug_backtrace();
-        $this->_appInitPath = dirname($backrace[0]['file']);
+        parent::run();
     }
     
     /**
@@ -132,6 +182,13 @@ class Gateway extends Worker
         // 保存该连接的内部gateway通讯地址
         $address = $this->lanIp.':'.$this->lanPort;
         $this->storeClientAddress($connection->globalClientId, $address);
+        
+        // 如果用户有自定义onConnect回调,则执行
+        if($this->_onConnect)
+        {
+            call_user_func($this->_onConnect, $connection);
+        }
+        
         // 如果设置了Event::onConnect,则通知worker进程,让worker执行onConnect
         if(method_exists('Event','onConnect'))
         {
@@ -217,6 +274,10 @@ class Gateway extends Worker
         // 清理连接的数据
         $this->delClientAddress($connection->globalClientId);
         unset($this->_clientConnections[$connection->globalClientId]);
+        if($this->_onClose)
+        {
+            call_user_func($this->_onClose, $connection);
+        }
     }
     
     /**
@@ -286,6 +347,11 @@ class Gateway extends Worker
             $this->log('registerAddress fail and exit');
             Worker::stopAll();
         }
+        
+        if($this->_onWorkerStart)
+        {
+            call_user_func($this->_onWorkerStart, $this);
+        }
     }
     
     
@@ -492,5 +558,10 @@ class Gateway extends Worker
         {
             $this->delClientAddress($connection->globalClientId);
         }
+        // 尝试触发用户设置的回调
+        if($this->_onWorkerStop)
+        {
+            call_user_func($this->_onWorkerStop, $this);
+        }
     }
 }

+ 11 - 3
Workerman/Protocols/Http.php

@@ -170,9 +170,15 @@ class Http implements \Workerman\Protocols\ProtocolInterface
         
         // QUERY_STRING
         $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
-        
-        // GET
-        parse_str($_SERVER['QUERY_STRING'], $_GET);
+        if($_SERVER['QUERY_STRING'])
+        {
+            // $GET
+            parse_str($_SERVER['QUERY_STRING'], $_GET);
+        }
+        else
+        {
+            $_SERVER['QUERY_STRING'] = '';
+        }
         
         // REQUEST
         $_REQUEST = array_merge($_GET, $_POST);
@@ -180,6 +186,8 @@ class Http implements \Workerman\Protocols\ProtocolInterface
         // REMOTE_ADDR REMOTE_PORT
         $_SERVER['REMOTE_ADDR'] = $connection->getRemoteIp();
         $_SERVER['REMOTE_PORT'] = $connection->getRemotePort();
+        
+        return $recv_buffer;
     }
     
     /**

+ 133 - 50
Workerman/Protocols/Websocket.php

@@ -44,56 +44,7 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
         // 还没有握手
         if(empty($connection->handshake))
         {
-            // 握手阶段客户端发送HTTP协议
-            if(0 === strpos($buffer, 'GET'))
-            {
-                // 判断\r\n\r\n边界
-                $heder_end_pos = strpos($buffer, "\r\n\r\n");
-                if(!$heder_end_pos)
-                {
-                    return 0;
-                }
-                // 解析Sec-WebSocket-Key
-                $Sec_WebSocket_Key = '';
-                if(preg_match("/Sec-WebSocket-Key: *(.*?)\r\n/", $buffer, $match))
-                {
-                    $Sec_WebSocket_Key = $match[1];
-                }
-                $new_key = base64_encode(sha1($Sec_WebSocket_Key."258EAFA5-E914-47DA-95CA-C5AB0DC85B11",true));
-                // 握手返回的数据
-                $new_message = "HTTP/1.1 101 Switching Protocols\r\n";
-                $new_message .= "Upgrade: websocket\r\n";
-                $new_message .= "Sec-WebSocket-Version: 13\r\n";
-                $new_message .= "Connection: Upgrade\r\n";
-                $new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";
-                $connection->handshake = true;
-                $connection->consumeRecvBuffer(strlen($buffer));
-                $connection->send($new_message, true);
-                $connection->protocolData = array(
-                    'binaryType' => self::BINARY_TYPE_BLOB, // blob or arraybuffer
-                );
-                // 如果有设置onWebSocketConnect回调,尝试执行
-                if(isset($connection->onWebSocketConnect))
-                {
-                    call_user_func(array($connection, 'onWebSocketConnect'), $connection);
-                }
-                return 0;
-            }
-            // 如果是flash的policy-file-request
-            elseif(0 === strpos($buffer,'<polic'))
-            {
-                if('>' != $buffer[strlen($buffer) - 1])
-                {
-                    return 0;
-                }
-                $policy_xml = '<?xml version="1.0"?><cross-domain-policy><site-control permitted-cross-domain-policies="all"/><allow-access-from domain="*" to-ports="*"/></cross-domain-policy>'."\0";
-                $connection->send($policy_xml, true);
-                $connection->consumeRecvBuffer(strlen($buffer));
-                return 0;
-            }
-            // 出错
-            $connection->close();
-            return 0;
+            return self::dealHandshake($buffer, $connection);
         }
         
         $data_len = ord($buffer[1]) & 127;
@@ -223,4 +174,136 @@ class Websocket implements \Workerman\Protocols\ProtocolInterface
         }
         return $decoded;
     }
+    
+    /**
+     * 处理websocket握手
+     * @param string $buffer
+     * @param TcpConnection $connection
+     * @return int
+     */
+    protected static function dealHandshake($buffer, $connection)
+    {
+        // 握手阶段客户端发送HTTP协议
+        if(0 === strpos($buffer, 'GET'))
+        {
+            // 判断\r\n\r\n边界
+            $heder_end_pos = strpos($buffer, "\r\n\r\n");
+            if(!$heder_end_pos)
+            {
+                return 0;
+            }
+            
+            // 解析Sec-WebSocket-Key
+            $Sec_WebSocket_Key = '';
+            if(preg_match("/Sec-WebSocket-Key: *(.*?)\r\n/", $buffer, $match))
+            {
+                $Sec_WebSocket_Key = $match[1];
+            }
+            $new_key = base64_encode(sha1($Sec_WebSocket_Key."258EAFA5-E914-47DA-95CA-C5AB0DC85B11",true));
+            // 握手返回的数据
+            $new_message = "HTTP/1.1 101 Switching Protocols\r\n";
+            $new_message .= "Upgrade: websocket\r\n";
+            $new_message .= "Sec-WebSocket-Version: 13\r\n";
+            $new_message .= "Connection: Upgrade\r\n";
+            $new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";
+            $connection->handshake = true;
+            $connection->consumeRecvBuffer(strlen($buffer));
+            $connection->send($new_message, true);
+            $connection->protocolData = array(
+                    'binaryType' => self::BINARY_TYPE_BLOB, // blob or arraybuffer
+            );
+            // 如果有设置onWebSocketConnect回调,尝试执行
+            if(isset($connection->onWebSocketConnect))
+            {
+                self::parseHttpHeader($buffer);
+                try
+                {
+                    call_user_func($connection->onWebSocketConnect, $connection, $buffer);
+                }
+                catch(\Exception $e)
+                {
+                    echo $e;
+                }
+                $_GET = $_COOKIE = $_SERVER = array();
+            }
+            return 0;
+        }
+        // 如果是flash的policy-file-request
+        elseif(0 === strpos($buffer,'<polic'))
+        {
+            if('>' != $buffer[strlen($buffer) - 1])
+            {
+                return 0;
+            }
+            $policy_xml = '<?xml version="1.0"?><cross-domain-policy><site-control permitted-cross-domain-policies="all"/><allow-access-from domain="*" to-ports="*"/></cross-domain-policy>'."\0";
+            $connection->send($policy_xml, true);
+            $connection->consumeRecvBuffer(strlen($buffer));
+            return 0;
+        }
+        // 出错
+        $connection->close();
+        return 0;
+    }
+    
+    /**
+     * 从header中获取
+     * @param string $buffer
+     * @return void
+     */
+    protected function parseHttpHeader($buffer)
+    {
+        $header_data = explode("\r\n", $buffer);
+        $_SERVER = array();
+        list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]);
+        unset($header_data[0]);
+        foreach($header_data as $content)
+        {
+            // \r\n\r\n
+            if(empty($content))
+            {
+                continue;
+            }
+            list($key, $value) = explode(':', $content, 2);
+            $key = strtolower($key);
+            $value = trim($value);
+            switch($key)
+            {
+                // HTTP_HOST
+                case 'host':
+                    $_SERVER['HTTP_HOST'] = $value;
+                    $tmp = explode(':', $value);
+                    $_SERVER['SERVER_NAME'] = $tmp[0];
+                    if(isset($tmp[1]))
+                    {
+                        $_SERVER['SERVER_PORT'] = $tmp[1];
+                    }
+                    break;
+                // HTTP_COOKIE
+                case 'cookie':
+                    $_SERVER['HTTP_COOKIE'] = $value;
+                    parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
+                    break;
+                // HTTP_USER_AGENT
+                case 'user-agent':
+                    $_SERVER['HTTP_USER_AGENT'] = $value;
+                    break;
+                // HTTP_REFERER
+                case 'referer':
+                    $_SERVER['HTTP_REFERER'] = $value;
+                    break;
+            }
+        }
+        
+        // QUERY_STRING
+        $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
+        if($_SERVER['QUERY_STRING'])
+        {
+            // $GET
+            parse_str($_SERVER['QUERY_STRING'], $_GET);
+        }
+        else
+        {
+            $_SERVER['QUERY_STRING'] = '';
+        }
+    }
 }

+ 1 - 1
Workerman/Worker.php

@@ -21,7 +21,7 @@ class Worker
      * 版本号
      * @var string
      */
-    const VERSION = '3.0.4';
+    const VERSION = '3.0.5';
     
     /**
      * 状态 启动中