Просмотр исходного кода

心跳机制、onReload接口 及一些优化

walkor 11 лет назад
Родитель
Сommit
e1d0d4db3d

+ 30 - 10
README.md

@@ -3,9 +3,13 @@ workerman
 
 workerman 是一个高性能的PHP socket服务框架,开发者可以在这个框架下开发各种网络应用,例如Rpc服务、聊天室、游戏等。
 workerman 具有以下特性
- * 多进程
+ * 支持HHVM,将PHP性能提高9倍左右
+ * 多进程/多线程(多线程版本)
  * 支持TCP/UDP
+ * 支持多端口监听
  * 支持各种应用层协议
+ * 标准输入输出重定向
+ * 守护进程化
  * 使用libevent事件轮询库,支持高并发
  * 支持文件更新检测及自动加载
  * 支持服务平滑重启
@@ -13,6 +17,7 @@ workerman 具有以下特性
  * 支持异常监控及告警
  * 支持长连接
  * 支持以指定用户运行worker进程
+ * 支持请求数上限配置
 
  [更多请访问www.workerman.net](http://www.workerman.net/workerman)
 
@@ -131,19 +136,34 @@ pid     memory      listening        timestamp  worker_name    total_request pac
 =============
 
 ###测试环境:
-系统:ubuntu 12.04 LTS 64位  
-内存:8G  
-cpu:Intel® Core™ i3-3220 CPU @ 3.30GHz × 4  
+系统:debian 6.0 64位  
+内存:64G  
+cpu:Intel(R) Xeon(R) CPU E5-2420 0 @ 1.90GHz (2颗物理cpu,6核心,2线程)
+Workerman:开启200个Benchark进程
+压测脚本:benchmark
+业务:发送并返回hello字符串
+
+###普通PHP(版本5.3.10)压测
+    短链接(每次请求完成后关闭链接,下次请求建立新的链接):
+        条件: 压测脚本开500个并发线程模拟500个并发用户,每个线程链接Workerman 10W次,每次链接发送1个请求
+        结果: 吞吐量:1.9W/S , cpu利用率:32% 
+
+    长链接(每次请求后不关闭链接,下次请求继续复用这个链接):
+        条件: 压测脚本开2000个并发线程模拟2000个并发用户,每个线程链接Workerman 1次,每个链接发送10W请求
+        结果: 吞吐量:36.7W/S , cpu利用率:69% 
 
-###结果
+    内存:每个进程内存稳定在6444K,无内存泄漏
 
-    Workerman开启4个worker进程(worker进程业务逻辑只是将收到的包写回客户端)
 
+###HHVM环境压测
     短链接(每次请求完成后关闭链接,下次请求建立新的链接):
-        条件: 压测脚本开500个线程,每个线程链接Workerman 10W次,每次链接发送1个请求
-        结果: 吞吐量:3W/S , cpu:60% , 内存占用:4*8M = 32M
+        条件: 压测脚本开1000个并发线程模拟1000个并发用户,每个线程链接Workerman 10W次,每次链接发送1个请求
+        结果: 吞吐量:3.5W/S , cpu利用率:35% 
 
     长链接(每次请求后不关闭链接,下次请求继续复用这个链接):
-        条件: 压测脚本开1000个线程,每个线程链接Workerman 1次,每个链接发送10W请求
-        结果: 吞吐量:9.7W/S , cpu:68% , 内存占用:4*8M = 32M
+        条件: 压测脚本开6000个并发线程模拟6000个并发用户,每个线程链接Workerman 1次,每个链接发送10W请求
+        结果: 吞吐量:45W/S , cpu利用率:67% 
+
+    内存:HHVM环境每个进程内存稳定在46M,无内存泄漏
+
 

+ 3 - 2
applications/Benchmark/Benchmark.php

@@ -25,9 +25,10 @@ class Benchmark extends Man\Core\SocketWorker
     public function dealProcess($buffer)
     {
         // 是HTTP协议
-        if('H' == $buffer[0] )
+        if('G' == $buffer[0] )
         {
-            return $this->sendToClient("HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello");
+            $this->sendToClient("HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello");
+            return $this->closeClient($this->currentDealFd);
         }
         // 是benchmark脚本
         return $this->sendToClient($buffer);

+ 1 - 1
applications/ChatDemo/Bootstrap/Worker.php → applications/ChatDemo/Bootstrap/BusinessWorker.php

@@ -10,7 +10,7 @@ define('ROOT_DIR', realpath(__DIR__.'/../'));
 require_once ROOT_DIR . '/Protocols/GatewayProtocol.php';
 require_once ROOT_DIR . '/Event.php';
 
-class Worker extends Man\Core\SocketWorker
+class BusinessWorker extends Man\Core\SocketWorker
 {
     public function dealInput($recv_str)
     {

+ 87 - 12
applications/ChatDemo/Bootstrap/Gateway.php

@@ -15,10 +15,16 @@ require_once ROOT_DIR . '/Lib/Store.php';
 class Gateway extends Man\Core\SocketWorker
 {
     /**
-     * 内部通信socket
+     * 内部通信socket udp
      * @var resouce
      */
-    protected $innerMainSocket_udp = null;
+    protected $innerMainSocketUdp = null;
+    
+    /**
+     * 内部通信socket tcp
+     * @var resouce
+     */
+    protected $innerMainSocketTcp = null;
     
     /**
      * 内网ip
@@ -53,6 +59,21 @@ class Gateway extends Man\Core\SocketWorker
     protected $workerAddresses = array();
     
     /**
+     * gateway 发送心跳时间间隔 单位:秒 ,0表示不发送心跳,在配置中设置
+     * @var integer
+     */
+    protected $pingInterval = 0;
+    
+    /**
+     * 心跳数据
+     * 可以是字符串(在配置中直接设置字符串如 ping_data=ping),
+     * 可以是二进制数据(二进制数据保存在文件中,在配置中设置ping数据文件路径 如 ping_data=/yourpath/ping.bin)
+     * ping数据应该是客户端能够识别的数据格式,只是检测连接的连通性,客户端收到心跳数据可以选择忽略此数据包
+     * @var string
+     */
+    protected $pingData = '';
+    
+    /**
      * 进程启动
      */
     public function start()
@@ -74,9 +95,9 @@ class Gateway extends Man\Core\SocketWorker
         }
         $error_no_udp = $error_no_tcp = 0;
         $error_msg_udp = $error_msg_tcp = '';
-        $this->innerMainSocket_udp = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no_udp, $error_msg_udp, STREAM_SERVER_BIND);
-        $this->innerMainSocket_tcp = stream_socket_server("tcp://".$this->lanIp.':'.$this->lanPort, $error_no_tcp, $error_msg_tcp, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
-        if(!$this->innerMainSocket_udp || !$this->innerMainSocket_tcp)
+        $this->innerMainSocketUdp = stream_socket_server("udp://".$this->lanIp.':'.$this->lanPort, $error_no_udp, $error_msg_udp, STREAM_SERVER_BIND);
+        $this->innerMainSocketTcp = stream_socket_server("tcp://".$this->lanIp.':'.$this->lanPort, $error_no_tcp, $error_msg_tcp, STREAM_SERVER_BIND | STREAM_SERVER_LISTEN);
+        if(!$this->innerMainSocketUdp || !$this->innerMainSocketTcp)
         {
             $this->notice('create innerMainSocket udp or tcp fail and exit '.$error_msg_udp.$error_msg_tcp);
             sleep(1);
@@ -84,20 +105,46 @@ class Gateway extends Man\Core\SocketWorker
         }
         else
         {
-            stream_set_blocking($this->innerMainSocket_udp , 0);
-            stream_set_blocking($this->innerMainSocket_tcp , 0);
+            stream_set_blocking($this->innerMainSocketUdp , 0);
+            stream_set_blocking($this->innerMainSocketTcp , 0);
         }
         
+        // 注册套接字
         $this->registerAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
         $this->registerAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
         
         // 添加读udp事件
-        $this->event->add($this->innerMainSocket_udp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
-        $this->event->add($this->innerMainSocket_tcp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTCP'));
+        $this->event->add($this->innerMainSocketUdp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'recvUdp'));
+        $this->event->add($this->innerMainSocketTcp,  Man\Core\Events\BaseEvent::EV_READ, array($this, 'acceptTCP'));
         
         // 初始化到worker的通信地址
         $this->initWorkerAddresses();
         
+        // 初始化心跳包时间间隔
+        $ping_interval = \Man\Core\Lib\Config::get($this->workerName.'.ping_interval');
+        if((int)$ping_interval > 0)
+        {
+            $this->pingInterval = (int)$ping_interval;
+        }
+        
+        // 获取心跳包数据
+        $ping_data_or_path = \Man\Core\Lib\Config::get($this->workerName.'.ping_data');
+        if(is_file($ping_data_or_path))
+        {
+            $this->pingData = file_get_contents($ping_data_or_path);
+        }
+        else
+        {
+            $this->pingData = $ping_data_or_path;
+        }
+        
+        // 设置定时任务,发送心跳
+        if($this->pingInterval > 0 && $this->pingData)
+        {
+            \Man\Core\Lib\Task::init($this->event);
+            \Man\Core\Lib\Task::add($this->pingInterval, array($this, 'ping'));
+        }
+        
         // 主体循环,整个子进程会阻塞在这个函数上
         $ret = $this->event->loop();
         $this->notice('worker loop exit');
@@ -123,6 +170,24 @@ class Gateway extends Man\Core\SocketWorker
     }
     
     /**
+     * 删除全局的通信地址
+     * @param string $address
+     */
+    protected function unregisterAddress($address, $protocol)
+    {
+        \Man\Core\Lib\Mutex::get();
+        $key = 'GLOBAL_GATEWAY_ADDRESS-' . $protocol;
+        $addresses_list = Store::get($key);
+        if(empty($addresses_list))
+        {
+            $addresses_list = array();
+        }
+        unset($addresses_list[$address]);
+        Store::set($key, $addresses_list);
+        \Man\Core\Lib\Mutex::release();
+    }
+    
+    /**
      * 接收Udp数据
      * 如果数据超过一个udp包长,需要业务自己解析包体,判断数据是否全部到达
      * @param resource $socket
@@ -253,10 +318,10 @@ class Gateway extends Man\Core\SocketWorker
     
     protected function initWorkerAddresses()
     {
-        $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.game_worker');
+        $this->workerAddresses = Man\Core\Lib\Config::get($this->workerName.'.business_worker');
         if(!$this->workerAddresses)
         {
-            $this->notice($this->workerName.'game_worker not set');
+            $this->notice($this->workerName.'business_worker not set');
         }
     }
     
@@ -404,6 +469,16 @@ class Gateway extends Man\Core\SocketWorker
     
     public function onStop()
     {
-        Store::deleteAll();
+        $this->unregisterAddress("udp://".$this->lanIp.':'.$this->lanPort, 'udp');
+        $this->unregisterAddress("tcp://".$this->lanIp.':'.$this->lanPort, 'tcp');
+        foreach($this->connUidMap as $uid)
+        {
+            Store::delete($uid);
+        }
+    }
+    
+    public function ping()
+    {
+        $this->broadCast($this->pingData);
     }
 }

+ 14 - 0
applications/ChatDemo/Lib/Gateway.php

@@ -28,6 +28,10 @@ class GateWay
        $pack->header['uid'] = Context::$uid;
        $pack->body = (string)$message;
        $buffer = $pack->getBuffer();
+       if(empty(Context::$protocol))
+       {
+           Context::$protocol = 'tcp';
+       }
        $all_addresses = Store::get('GLOBAL_GATEWAY_ADDRESS-' . Context::$protocol);
        foreach($all_addresses as $address)
        {
@@ -122,6 +126,11 @@ class GateWay
        $pack->header['client_port'] = Context::$client_port;
        $pack->header['uid'] = empty($uid) ? 0 : $uid;
        $pack->body = (string)$message;
+       
+       if(empty(Context::$protocol))
+       {
+           Context::$protocol = 'tcp';
+       }
         
        return self::sendToGateway(Context::$protocol."://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
    }
@@ -151,6 +160,11 @@ class GateWay
        $pack->header['uid'] = $uid ? $uid : 0;
        $pack->body = (string)$message;
        
+       if(empty(Context::$protocol))
+       {
+           Context::$protocol = 'tcp';
+       }
+       
        return self::sendToGateway(Context::$protocol."://{$pack->header['local_ip']}:{$pack->header['local_port']}", $pack->getBuffer());
    }
    

+ 8 - 0
applications/ChatDemo/Tests/Chat.php

@@ -42,7 +42,15 @@ while(1)
           {
               $ret = fgets($fd, 102400);
               if(!$ret){continue;exit("connection closed\n ");}
+              
+              // 是服务端发来的心跳,只是检测联通性,不用回复
+              if("#ping#" == $ret)
+              {
+                  continue;
+              }
+              
               $ret = json_decode(trim($ret),true);
+              
               if($ret['to_uid'] == $MYUID)
               {
                   echo $ret['from_uid'] , ' say to YOU:', $ret['message'], "\n";

+ 9 - 2
workerman/Common/Monitor.php

@@ -552,8 +552,15 @@ class Monitor extends Man\Core\SocketWorker
         }
     
         $ip = $this->getIp();
-    
-        $this->sendSms('告警消息 WorkerMan框架监控 '.$ip.' '.$worker_name.'进程频繁退出 退出次数'.$exit_count.' 退出状态码:'.$status);
+        
+        if(65280 == $status || 30720 == $status)
+        {
+            $this->sendSms('告警消息 Workerman框架监控 '.$ip.' '.$worker_name.'5分钟内出现 FatalError '.$exit_count.'次 时间:'.date('Y-m-d H:i:s'));
+        }
+        else
+        {
+            $this->sendSms('告警消息 Workerman框架监控 '.$ip.' '.$worker_name.' 进程频繁退出 退出次数'.$exit_count.' 退出状态码:'.$status .' 时间:'.date('Y-m-d H:i:s'));
+        }
     
         // 记录这次告警时间
         self::$lastWarningTimeMap[self::WARNING_TOO_MANY_WORKERS_EXIT] = $time_now;

+ 2 - 0
workerman/Core/Master.php

@@ -515,6 +515,8 @@ class Master
                     // 如果对应进程配置了不热启动则不重启对应进程
                     if(Lib\Config::get($worker_name.'.no_reload'))
                     {
+                        // 发送reload信号,以便触发onReload方法
+                        posix_kill($pid, SIGHUP);
                         continue;
                     }
                     $pids_to_restart[] = $pid;

+ 17 - 14
workerman/Core/SocketWorker.php

@@ -201,10 +201,7 @@ abstract class SocketWorker extends AbstractWorker
         $this->installSignal();
         
         // 触发该worker进程onStart事件,该进程整个生命周期只触发一次
-        if($this->onStart())
-        {
-            return;
-        }
+        $this->onStart();
 
         if($this->protocol == 'udp')
         {
@@ -230,11 +227,8 @@ abstract class SocketWorker extends AbstractWorker
     public function stop()
     {
         // 触发该worker进程onStop事件
-        if($this->onStop())
-        {
-            return;
-        }
-        
+        $this->onStop();
+       
         // 标记这个worker开始停止服务
         if($this->workerStatus != self::STATUS_SHUTDOWN)
         {
@@ -499,6 +493,7 @@ abstract class SocketWorker extends AbstractWorker
                 break;
             // 平滑重启
             case SIGHUP:
+                $this->onReload();
                 // 如果配置了no_reload则不重启该进程
                 if(\Man\Core\Lib\Config::get($this->workerName.'.no_reload'))
                 {
@@ -674,11 +669,6 @@ abstract class SocketWorker extends AbstractWorker
             $ip = $tmp[0];
         }
         
-        if(empty($ip) || '127.0.0.1' == $ip)
-        {
-            $ip = gethostbyname(trim(`hostname`));
-        }
-        
         return $ip;
     }
     
@@ -773,4 +763,17 @@ abstract class SocketWorker extends AbstractWorker
         return false;
     }
     
+    /**
+     * 该worker进程收到reload信号时触发
+     * 以下情况会收到reload信号
+     * 1、运行 workermand reload,全部进程都会收到reload信号
+     * 2、开启workerman.conf.debug=1,并且磁盘文件有更新,全部进程会收到reload信号
+     * 3、telnet远程控制workerman,运行 reload 命令,全部进程会收到reload信号
+     * 4、telnet远程控制workerman,运行 kill pid 命令,pid对应进程会收到reload信号
+     * 5、当前进程内存占用大于Monitor.conf.max_mem_limit 时当前进程会收到reload信号
+     */
+    protected function onReload()
+    {
+        return false;
+    }
 }

+ 8 - 1
workerman/conf/conf.d/Benchmark.conf

@@ -1,5 +1,12 @@
+;进程入口文件
 worker_file = ../applications/Benchmark/Benchmark.php
+;监听的ip端口
 listen = tcp://0.0.0.0:56789
+;是否是长连接
+persistent_connection = 1
+;启动多少服务进程,压测时启动cpu核数的8倍数左右
 start_workers = 48
+;以哪个用户运行该进程,为了安全请使用较低权限的用户,例如www-data
 user = root
-preread_length = 65535
+;每个请求预读长度
+preread_length = 65535

+ 12 - 0
workerman/conf/conf.d/BusinessWorker.conf

@@ -0,0 +1,12 @@
+;业务进程入口文件
+worker_file = ../applications/ChatDemo/Bootstrap/BusinessWorker.php
+;传输层协议 ip 及端口
+listen = tcp://0.0.0.0:8483
+;启动多少服务进程
+start_workers = 5
+;以哪个用户运行该进程,为了安全请使用权限较低的用户,例如www-data nobody
+user = root
+;请求到来时预读长度,这里固定27
+preread_length = 27
+;设置最大请求数,超过这个请求数后会安全重启该进程(主要是避免因业务代码不规范导致的内存泄露)
+max_requests=10000

+ 37 - 6
workerman/conf/conf.d/Gateway.conf

@@ -1,14 +1,45 @@
+;进程入口文件
 worker_file = ../applications/ChatDemo/Bootstrap/Gateway.php
+
+;传输层协议及监听的ip端口
 listen = tcp://0.0.0.0:8480
+
+;是否是长连接
 persistent_connection = 1
+
+;开多少服务进程
 start_workers = 5
+
+;以哪个用户运行,为了安全,应该使用权限较低的用户,例如www-data nobody
 user = root
+
+;每个请求预读长度,避免读取数据超过一个协议包,
+;一般设置为协议头的长度,当请求到来时在dealInput中根据头部标识的数据包长度计算还有多少数据没接收完毕,并返回这个值
+;这个demo传输的是以\n结尾的json数据,没有协议头,所以写了个较大的值
 preread_length = 65535
-lan_ip = 127.0.0.1
-lan_port_start = 40000
-game_worker[] = tcp://127.0.0.1:8483
-game_worker[] = tcp://127.0.0.1:8483
-;不reload
+
+;不reload,当有reload命令时是否安全重启这个进程
 no_reload = 1
-;不打印
+
+;workerman.conf.debug=1 时有效。echo var_dump 等输出是否打印到终端
 no_debug = 1
+
+
+;;;;;;;;;以上是workerman子进程通用配置;;;;;;;;;;;;;;
+;;;;;;;;;以下是gateway进程私有配置;;;;;;;;;;;;
+
+;内部通讯的局域网ip,worker进程会向这个ip发送数据
+lan_ip = 127.0.0.1
+
+;内部通讯端口起始值,假如开启5个gateway进程,则每个进程会监听一个端口,40001 40002 40003 40004 40005
+lan_port_start = 40000
+
+;业务进程通讯传输层协议、ip、端口
+business_worker[] = tcp://127.0.0.1:8483
+business_worker[] = tcp://127.0.0.1:8483
+
+;此gateway进程向客户端发送心跳时间间隔 单位:秒
+ping_interval = 10
+
+;发送的心跳数据,可以是字符串或者二进制数据,二进制数据需要配置成 文件路径 如ping_data=/yourpath/ping.bin
+ping_data = #ping#

+ 2 - 2
workerman/conf/conf.d/Monitor.conf

@@ -5,7 +5,7 @@
 ;④监控每个worker进程内存是否大于设定值,大于设定值则安全重启对应进程
 ;worker_file
 worker_file = Common/Monitor.php
-;监听ip及端口
+;监听ip及端口,不使用的情况为了安全请绑定到127.0.0.1只限本机访问,如果绑定0.0.0.0,则记得更改下面telnet密码配置
 listen = tcp://127.0.0.1:2009
 ;telnet需要长连接
 persistent_connection = 1
@@ -19,7 +19,7 @@ preread_length=64
 ;==以下是自定义的配置==
 ;如果worker进程1分钟内退出max_worker_exit_count次则触发告警
 max_worker_exit_count=2000
-;每个worker进程最大内存阈值,超过这个值安全重启这个进程
+;worker进程最大内存阈值(单位KByte),超过这个值安全重启(reload)这个进程
 max_mem_limit=124000
 ;telnet密码
 password=yourpassword

+ 0 - 6
workerman/conf/conf.d/Worker.conf

@@ -1,6 +0,0 @@
-worker_file = ../applications/ChatDemo/Bootstrap/Worker.php
-listen = tcp://0.0.0.0:8483
-start_workers = 5
-user = root
-preread_length = 23
-max_requests=10000