* 使用示例: *
*
* Master::run();
*
*
*
*/
class Master
{
/**
* 版本
* @var string
*/
const VERSION = '2.0.1';
/**
* 服务名
* @var string
*/
const NAME = 'WorkerMan';
/**
* 服务状态 启动中
* @var integer
*/
const STATUS_STARTING = 1;
/**
* 服务状态 运行中
* @var integer
*/
const STATUS_RUNNING = 2;
/**
* 服务状态 关闭中
* @var integer
*/
const STATUS_SHUTDOWN = 4;
/**
* 服务状态 平滑重启中
* @var integer
*/
const STATUS_RESTARTING_WORKERS = 8;
/**
* 整个服务能够启动的最大进程数
* @var integer
*/
const SERVER_MAX_WORKER_COUNT = 5000;
/**
* 单个进程打开文件数限制
* @var integer
*/
const MIN_SOFT_OPEN_FILES = 10000;
/**
* 单个进程打开文件数限制 硬性限制
* @var integer
*/
const MIN_HARD_OPEN_FILES = 10000;
/**
* 共享内存中用于存储主进程统计信息的变量id
* @var integer
*/
const STATUS_VAR_ID = 1;
/**
* 发送停止命令多久后worker没退出则发送sigkill信号
* @var integer
*/
const KILL_WORKER_TIME_LONG = 4;
/**
* 用于保存所有子进程pid ['worker_name1'=>[pid1=>pid1,pid2=>pid2,..], 'worker_name2'=>[pid3,..], ...]
* @var array
*/
protected static $workerPids = array();
/**
* 服务的状态,默认是启动中
* @var integer
*/
protected static $serverStatus = self::STATUS_STARTING;
/**
* 用来监听端口的Socket数组,用来fork worker使用
* @var array
*/
protected static $listenedSockets = array();
/**
* 要重启的worker的pid数组 [pid1=>time_stamp, pid2=>time_stamp, ..]
* @var array
*/
protected static $workerToRestart = array();
/**
* 共享内存resource id
* @var resource
*/
protected static $shmId = 0;
/**
* 消息队列 resource id
* @var resource
*/
protected static $queueId = 0;
/**
* master进程pid
* @var integer
*/
protected static $masterPid = 0;
/**
* server统计信息 ['start_time'=>time_stamp, 'worker_exit_code'=>['worker_name1'=>[code1=>count1, code2=>count2,..], 'worker_name2'=>[code3=>count3,...], ..] ]
* @var array
*/
protected static $serverStatusInfo = array(
'start_time' => 0,
'worker_exit_code' => array(),
);
/**
* 服务运行
* @return void
*/
public static function run()
{
// 输出信息
self::notice("Server is starting ...", true);
// 初始化
self::init();
// 检查环境
self::checkEnv();
// 变成守护进程
self::daemonize();
// 保存进程pid
self::savePid();
// 安装信号
self::installSignal();
// 创建监听套接字
self::createSocketsAndListen();
// 创建worker进程
self::createWorkers();
// 输出信息
self::notice("Server start success ...", true);
// 标记sever状态为运行中...
self::$serverStatus = self::STATUS_RUNNING;
// 关闭标准输出
self::resetStdFd();
// 主循环
self::loop();
}
/**
* 初始化 配置、进程名、共享内存、消息队列等
* @return void
*/
public static function init()
{
// 获取配置文件
$config_path = Lib\Config::$filename;
// 设置进程名称,如果支持的话
self::setProcessTitle(self::NAME.':master with-config:' . $config_path);
// 初始化共享内存消息队列
if(extension_loaded('sysvmsg') && extension_loaded('sysvshm'))
{
self::$shmId = shm_attach(IPC_KEY, DEFAULT_SHM_SIZE, 0666);
self::$queueId = msg_get_queue(IPC_KEY, 0666);
msg_set_queue(self::$queueId,array('msg_qbytes'=>65535));
}
}
/**
* 检查环境配置
* @return void
*/
public static function checkEnv()
{
// 检查PID文件
Lib\Checker::checkPidFile();
// 检查扩展支持情况
Lib\Checker::checkExtension();
// 检查函数禁用情况
Lib\Checker::checkDisableFunction();
// 检查log目录是否可读
Lib\Log::init();
// 检查配置和语法错误等
Lib\Checker::checkWorkersConfig();
// 检查文件限制
Lib\Checker::checkLimit();
}
/**
* 使之脱离终端,变为守护进程
* @return void
*/
protected static function daemonize()
{
// 设置umask
umask(0);
// fork一次
$pid = pcntl_fork();
if(-1 == $pid)
{
// 出错退出
exit("Daemonize fail ,can not fork");
}
elseif($pid > 0)
{
// 父进程,退出
exit(0);
}
// 子进程使之成为session leader
if(-1 == posix_setsid())
{
// 出错退出
exit("Daemonize fail ,setsid fail");
}
// 再fork一次
$pid2 = pcntl_fork();
if(-1 == $pid2)
{
// 出错退出
exit("Daemonize fail ,can not fork");
}
elseif(0 !== $pid2)
{
// 结束第一子进程,用来禁止进程重新打开控制终端
exit(0);
}
// 记录server启动时间
self::$serverStatusInfo['start_time'] = time();
}
/**
* 保存主进程pid
* @return void
*/
public static function savePid()
{
// 保存在变量中
self::$masterPid = posix_getpid();
// 保存到文件中,用于实现停止、重启
if(false === @file_put_contents(WORKERMAN_PID_FILE, self::$masterPid))
{
exit("\033[31;40mCan not save pid to pid-file(" . WORKERMAN_PID_FILE . ")\033[0m\n\n\033[31;40mServer start fail\033[0m\n\n");
}
// 更改权限
chmod(WORKERMAN_PID_FILE, 0644);
}
/**
* 获取主进程pid
* @return int
*/
public static function getMasterPid()
{
return self::$masterPid;
}
/**
* 根据配置文件,创建监听套接字
* @return void
*/
protected static function createSocketsAndListen()
{
// 循环读取配置创建socket
foreach (Lib\Config::getAllWorkers() as $worker_name=>$config)
{
if(isset($config['listen']))
{
$flags = substr($config['listen'], 0, 3) == 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$error_no = 0;
$error_msg = '';
// 创建监听socket
self::$listenedSockets[$worker_name] = stream_socket_server($config['listen'], $error_no, $error_msg, $flags);
if(!self::$listenedSockets[$worker_name])
{
Lib\Log::add("can not create socket {$config['listen']} info:{$error_no} {$error_msg}\tServer start fail");
exit("\n\033[31;40mcan not create socket {{$config['listen']} info:{$error_no} {$error_msg}\033[0m\n\n\033[31;40mServer start fail\033[0m\n\n");
}
}
}
}
/**
* 根据配置文件创建Workers
* @return void
*/
protected static function createWorkers()
{
// 循环读取配置创建一定量的worker进程
foreach (Lib\Config::getAllWorkers() as $worker_name=>$config)
{
// 初始化
if(empty(self::$workerPids[$worker_name]))
{
self::$workerPids[$worker_name] = array();
}
while(count(self::$workerPids[$worker_name]) < $config['start_workers'])
{
$pid = self::forkOneWorker($worker_name);
// 子进程退出
if($pid == 0)
{
self::notice("CHILD EXIT ERR");
}
}
}
}
/**
* 创建一个worker进程
* @param string $worker_name worker的名称
* @return int 父进程:>0得到新worker的pid ;<0 出错; 子进程:始终为0
*/
protected static function forkOneWorker($worker_name)
{
// 创建子进程
$pid = pcntl_fork();
// 先处理收到的信号
pcntl_signal_dispatch();
// 父进程
if($pid > 0)
{
// 初始化master的一些东东
self::$workerPids[$worker_name][$pid] = $pid;
// 更新进程信息到共享内存
self::updateStatusToShm();
return $pid;
}
// 子进程
elseif($pid === 0)
{
// 忽略信号
self::ignoreSignal();
// 清空任务
Lib\Task::delAll();
// 关闭不用的监听socket
foreach(self::$listenedSockets as $tmp_worker_name => $tmp_socket)
{
if($tmp_worker_name != $worker_name)
{
fclose($tmp_socket);
}
}
// 尝试以指定用户运行worker
if($worker_user = Lib\Config::get($worker_name . '.user'))
{
self::setWorkerUser($worker_user);
}
// 关闭输出
self::resetStdFd();
// 尝试设置子进程进程名称
self::setWorkerProcessTitle($worker_name);
// 查找worker文件
if($worker_file = \Man\Core\Lib\Config::get($worker_name.'.worker_file'))
{
include_once $worker_file;
$class_name = basename($worker_file, '.php');
}
else
{
$class_name = $worker_name;
include_once WORKERMAN_ROOT_DIR . "workers/$worker_name.php";
}
// 创建实例
$worker = new $class_name($worker_name);
// 如果该worker有配置监听端口,则将监听端口的socket传递给子进程
if(isset(self::$listenedSockets[$worker_name]))
{
$worker->setListendSocket(self::$listenedSockets[$worker_name]);
}
// 使worker开始服务
$worker->start();
return 0;
}
// 出错
else
{
self::notice("create worker fail worker_name:$worker_name detail:pcntl_fork fail");
return $pid;
}
}
/**
* 安装相关信号控制器
* @return void
*/
protected static function installSignal()
{
// 设置终止信号处理函数
pcntl_signal(SIGINT, array('\Man\Core\Master', 'signalHandler'), false);
// 设置SIGUSR1信号处理函数,测试用
pcntl_signal(SIGUSR1, array('\Man\Core\Master', 'signalHandler'), false);
// 设置SIGUSR2信号处理函数,平滑重启Server
pcntl_signal(SIGHUP, array('\Man\Core\Master', 'signalHandler'), false);
// 设置子进程退出信号处理函数
pcntl_signal(SIGCHLD, array('\Man\Core\Master', 'signalHandler'), false);
// 设置忽略信号
pcntl_signal(SIGPIPE, SIG_IGN);
pcntl_signal(SIGTTIN, SIG_IGN);
pcntl_signal(SIGTTOU, SIG_IGN);
pcntl_signal(SIGQUIT, SIG_IGN);
pcntl_signal(SIGALRM, SIG_IGN);
}
/**
* 忽略信号
* @return void
*/
protected static function ignoreSignal()
{
// 设置忽略信号
pcntl_signal(SIGPIPE, SIG_IGN);
pcntl_signal(SIGTTIN, SIG_IGN);
pcntl_signal(SIGTTOU, SIG_IGN);
pcntl_signal(SIGQUIT, SIG_IGN);
pcntl_signal(SIGALRM, SIG_IGN);
pcntl_signal(SIGINT, SIG_IGN);
pcntl_signal(SIGUSR1, SIG_IGN);
pcntl_signal(SIGHUP, SIG_IGN);
pcntl_signal(SIGCHLD, SIG_IGN);
}
/**
* 设置server信号处理函数
* @param null $null
* @param int $signal
* @return void
*/
public static function signalHandler($signal)
{
switch($signal)
{
// 停止server信号
case SIGINT:
self::notice("Server is shutting down");
self::stop();
break;
// 测试用
case SIGUSR1:
break;
// worker退出信号
case SIGCHLD:
// 不要在这里fork,fork出来的子进程无法收到信号
// self::checkWorkerExit();
break;
// 平滑重启server信号
case SIGHUP:
Lib\Config::reload();
self::notice("Server reloading");
self::addToRestartWorkers(array_keys(self::getPidWorkerNameMap()));
self::restartWorkers();
break;
}
}
/**
* 设置子进程进程名称
* @param string $worker_name
* @return void
*/
public static function setWorkerProcessTitle($worker_name)
{
if(isset(self::$listenedSockets[$worker_name]))
{
// 获得socket的信息
$sock_name = stream_socket_get_name(self::$listenedSockets[$worker_name], false);
// 更改进程名,如果支持的话
$mata_data = stream_get_meta_data(self::$listenedSockets[$worker_name]);
$protocol = substr($mata_data['stream_type'], 0, 3);
self::setProcessTitle(self::NAME.":worker $worker_name {$protocol}://$sock_name");
}
else
{
self::setProcessTitle(self::NAME.":worker $worker_name");
}
}
/**
* 主进程主循环 主要是监听子进程退出、服务终止、平滑重启信号
* @return void
*/
public static function loop()
{
$siginfo = array();
while(1)
{
@pcntl_sigtimedwait(array(SIGCHLD), $siginfo, 1);
// 初始化任务系统
Lib\Task::tick();
// 检查是否有进程退出
self::checkWorkerExit();
// 触发信号处理
pcntl_signal_dispatch();
}
}
/**
* 监控worker进程状态,退出重启
* @param resource $channel
* @param int $flag
* @param int $pid 退出的进程id
* @return mixed
*/
public static function checkWorkerExit()
{
// 由于SIGCHLD信号可能重叠导致信号丢失,所以这里要循环获取所有退出的进程id
while(($pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG)) != 0)
{
// 如果是重启的进程,则继续重启进程
if(isset(self::$workerToRestart[$pid]) && self::$serverStatus != self::STATUS_SHUTDOWN)
{
unset(self::$workerToRestart[$pid]);
self::restartWorkers();
}
// 出错
if($pid == -1)
{
// 没有子进程了,可能是出现Fatal Err 了
if(pcntl_get_last_error() == 10)
{
self::notice('Server has no workers now');
}
return -1;
}
// 查找子进程对应的woker_name
$pid_workname_map = self::getPidWorkerNameMap();
$worker_name = isset($pid_workname_map[$pid]) ? $pid_workname_map[$pid] : '';
// 没找到worker_name说明出错了 哪里来的野孩子?
if(empty($worker_name))
{
self::notice("child exist but not found worker_name pid:$pid");
break;
}
// 进程退出状态不是0,说明有问题了
if($status !== 0)
{
self::notice("worker exit status $status pid:$pid worker:$worker_name");
}
// 记录进程退出状态
self::$serverStatusInfo['worker_exit_code'][$worker_name][$status] = isset(self::$serverStatusInfo['worker_exit_code'][$worker_name][$status]) ? self::$serverStatusInfo['worker_exit_code'][$worker_name][$status] + 1 : 1;
// 更新状态到共享内存
self::updateStatusToShm();
// 清理这个进程的数据
self::clearWorker($worker_name, $pid);
// 如果服务是不是关闭中
if(self::$serverStatus != self::STATUS_SHUTDOWN)
{
// 重新创建worker
self::createWorkers();
}
// 判断是否都重启完毕
else
{
$all_worker_pid = self::getPidWorkerNameMap();
if(empty($all_worker_pid))
{
// 删除共享内存
self::removeShmAndQueue();
// 发送提示
self::notice("Server stoped");
// 删除pid文件
@unlink(WORKERMAN_PID_FILE);
exit(0);
}
}//end if
}//end while
}
/**
* 获取pid 到 worker_name 的映射
* @return array ['pid1'=>'worker_name1','pid2'=>'worker_name2', ...]
*/
public static function getPidWorkerNameMap()
{
$all_pid = array();
foreach(self::$workerPids as $worker_name=>$pid_array)
{
foreach($pid_array as $pid)
{
$all_pid[$pid] = $worker_name;
}
}
return $all_pid;
}
/**
* 放入重启队列中
* @param array $restart_pids
* @return void
*/
public static function addToRestartWorkers($restart_pids)
{
if(!is_array($restart_pids))
{
self::notice("addToRestartWorkers(".var_export($restart_pids, true).") \$restart_pids not array");
return false;
}
// 将pid放入重启队列
foreach($restart_pids as $pid)
{
if(!isset(self::$workerToRestart[$pid]))
{
// 重启时间=0
self::$workerToRestart[$pid] = 0;
}
}
}
/**
* 重启workers
* @return void
*/
public static function restartWorkers()
{
// 标记server状态
if(self::$serverStatus != self::STATUS_RESTARTING_WORKERS && self::$serverStatus != self::STATUS_SHUTDOWN)
{
self::$serverStatus = self::STATUS_RESTARTING_WORKERS;
}
// 没有要重启的进程了
if(empty(self::$workerToRestart))
{
self::$serverStatus = self::STATUS_RUNNING;
self::notice("\nWorker Restart Success");
return true;
}
// 遍历要重启的进程 标记它们重启时间
foreach(self::$workerToRestart as $pid => $stop_time)
{
if($stop_time == 0)
{
self::$workerToRestart[$pid] = time();
posix_kill($pid, SIGHUP);
Lib\Task::add(self::KILL_WORKER_TIME_LONG, array('\Man\Core\Master', 'forceKillWorker'), array($pid), false);
break;
}
}
}
/**
* worker进程退出时,master进程的一些清理工作
* @param string $worker_name
* @param int $pid
* @return void
*/
protected static function clearWorker($worker_name, $pid)
{
// 释放一些不用了的数据
unset(self::$workerToRestart[$pid], self::$workerPids[$worker_name][$pid]);
}
/**
* 停止服务
* @return void
*/
public static function stop()
{
// 如果没有子进程则直接退出
$all_worker_pid = self::getPidWorkerNameMap();
if(empty($all_worker_pid))
{
exit(0);
}
// 标记server开始关闭
self::$serverStatus = self::STATUS_SHUTDOWN;
// killWorkerTimeLong 秒后如果还没停止则强制杀死所有进程
Lib\Task::add(self::KILL_WORKER_TIME_LONG, array('\Man\Core\Master', 'stopAllWorker'), array(true), false);
// 停止所有worker
self::stopAllWorker();
}
/**
* 停止所有worker
* @param bool $force 是否强制退出
* @return void
*/
public static function stopAllWorker($force = false)
{
// 获得所有pid
$all_worker_pid = self::getPidWorkerNameMap();
// 强行杀死?
if($force)
{
// 杀死所有子进程
foreach($all_worker_pid as $pid=>$worker_name)
{
// 发送kill信号
self::forceKillWorker($pid);
self::notice("Kill workers($worker_name) force!");
}
}
else
{
// 向所有子进程发送终止信号
foreach($all_worker_pid as $pid=>$worker_name)
{
// 发送SIGINT信号
posix_kill($pid, SIGINT);
}
}
}
/**
* 强制杀死进程
* @param int $pid
* @return void
*/
public static function forceKillWorker($pid)
{
if(posix_kill($pid, 0))
{
self::notice("Kill workers $pid force!");
posix_kill($pid, SIGKILL);
}
}
/**
* 设置运行用户
* @param string $worker_user
* @return void
*/
protected static function setWorkerUser($worker_user)
{
$user_info = posix_getpwnam($worker_user);
// 尝试设置gid uid
if(!posix_setgid($user_info['gid']) || !posix_setuid($user_info['uid']))
{
$notice = 'Notice : Can not run woker as '.$worker_user." , You shuld be root\n";
self::notice($notice, true);
}
}
/**
* 获取共享内存资源id
* @return resource
*/
public static function getShmId()
{
return self::$shmId;
}
/**
* 获取消息队列资源id
* @return resource
*/
public static function getQueueId()
{
return self::$queueId;
}
/**
* 关闭标准输入输出
* @return void
*/
protected static function resetStdFd()
{
// 开发环境不关闭标准输出,用于调试
if(Lib\Config::get('workerman.debug') == 1 && posix_ttyname(STDOUT))
{
return;
}
global $STDOUT, $STDERR;
@fclose(STDOUT);
@fclose(STDERR);
// 将标准输出重定向到/dev/null
$STDOUT = fopen('/dev/null',"rw+");
$STDERR = fopen('/dev/null',"rw+");
}
/**
* 更新主进程收集的状态信息到共享内存
* @return bool
*/
protected static function updateStatusToShm()
{
if(!self::$shmId)
{
return true;
}
return shm_put_var(self::$shmId, self::STATUS_VAR_ID, array_merge(self::$serverStatusInfo, array('pid_map'=>self::$workerPids)));
}
/**
* 销毁共享内存以及消息队列
* @return void
*/
protected static function removeShmAndQueue()
{
if(self::$shmId)
{
shm_remove(self::$shmId);
}
if(self::$queueId)
{
msg_remove_queue(self::$queueId);
}
}
/**
* 设置进程名称,需要proctitle支持 或者php>=5.5
* @param string $title
* @return void
*/
protected static function setProcessTitle($title)
{
// >=php 5.5
if (version_compare(phpversion(), "5.5", "ge") && function_exists('cli_set_process_title'))
{
cli_set_process_title($title);
}
// 需要扩展
elseif(extension_loaded('proctitle') && function_exists('setproctitle'))
{
setproctitle($title);
}
}
/**
* notice,记录到日志
* @param string $msg
* @param bool $display
* @return void
*/
public static function notice($msg, $display = false)
{
Lib\Log::add("Server:".$msg);
if($display)
{
if(self::$serverStatus == self::STATUS_STARTING)
{
echo($msg."\n");
}
}
}
}