|
|
@@ -18,7 +18,7 @@ require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Mutex.php';
|
|
|
*
|
|
|
* @package Core
|
|
|
*
|
|
|
-* @author walkor <worker-man@qq.com>
|
|
|
+* @author walkor <workerman.net>
|
|
|
* <b>使用示例:</b>
|
|
|
* <pre>
|
|
|
* <code>
|
|
|
@@ -96,28 +96,28 @@ class Master
|
|
|
const KILL_WORKER_TIME_LONG = 4;
|
|
|
|
|
|
/**
|
|
|
- * 用于保存所有子进程pid ['worker_name1'=>[pid1=>pid1,pid2=>pid2,..], 'worker_name2'=>[pid3,..], ...]
|
|
|
+ * 用于保存所有子进程pid ['worker_name1'=>[pid1=>pid1,pid2=>pid2,..], 'worker_name2'=>[pid7,..], ...]
|
|
|
* @var array
|
|
|
*/
|
|
|
- protected static $workerPids = array();
|
|
|
+ protected static $workerPidMap = array();
|
|
|
|
|
|
/**
|
|
|
* 服务的状态,默认是启动中
|
|
|
* @var integer
|
|
|
*/
|
|
|
- protected static $serverStatus = self::STATUS_STARTING;
|
|
|
+ protected static $serviceStatus = self::STATUS_STARTING;
|
|
|
|
|
|
/**
|
|
|
* 用来监听端口的Socket数组,用来fork worker使用
|
|
|
* @var array
|
|
|
*/
|
|
|
- protected static $listenedSockets = array();
|
|
|
+ protected static $listenedSocketsArray = array();
|
|
|
|
|
|
/**
|
|
|
- * 要重启的worker的pid数组 [pid1=>time_stamp, pid2=>time_stamp, ..]
|
|
|
+ * 要重启r的pid数组 [pid1=>time_stamp, pid2=>time_stamp, ..]
|
|
|
* @var array
|
|
|
*/
|
|
|
- protected static $workerToRestart = array();
|
|
|
+ protected static $pidsToRestart = array();
|
|
|
|
|
|
/**
|
|
|
* 共享内存resource id
|
|
|
@@ -141,7 +141,7 @@ class Master
|
|
|
* server统计信息 ['start_time'=>time_stamp, 'worker_exit_code'=>['worker_name1'=>[code1=>count1, code2=>count2,..], 'worker_name2'=>[code3=>count3,...], ..] ]
|
|
|
* @var array
|
|
|
*/
|
|
|
- protected static $serverStatusInfo = array(
|
|
|
+ protected static $serviceStatusInfo = array(
|
|
|
'start_time' => 0,
|
|
|
'worker_exit_code' => array(),
|
|
|
);
|
|
|
@@ -165,13 +165,13 @@ class Master
|
|
|
// 安装信号
|
|
|
self::installSignal();
|
|
|
// 创建监听套接字
|
|
|
- self::createSocketsAndListen();
|
|
|
+ self::createListeningSockets();
|
|
|
// 创建worker进程
|
|
|
- self::createWorkers();
|
|
|
+ self::spawnWorkers();
|
|
|
// 输出信息
|
|
|
self::notice("\033[1A\n\033[KWorkerman start success ...\033[0m", true);
|
|
|
- // 标记sever状态为运行中...
|
|
|
- self::$serverStatus = self::STATUS_RUNNING;
|
|
|
+ // 标记服务状态为运行中
|
|
|
+ self::$serviceStatus = self::STATUS_RUNNING;
|
|
|
// 关闭标准输出
|
|
|
self::resetStdFd();
|
|
|
// 主循环
|
|
|
@@ -186,10 +186,10 @@ class Master
|
|
|
public static function init()
|
|
|
{
|
|
|
// 获取配置文件
|
|
|
- $config_path = Lib\Config::$filename;
|
|
|
+ $config_path = Lib\Config::$configFile;
|
|
|
|
|
|
// 设置进程名称,如果支持的话
|
|
|
- self::setProcessTitle(self::NAME.':master with-config:' . $config_path);
|
|
|
+ self::setProcTitle(self::NAME.':master with-config:' . $config_path);
|
|
|
|
|
|
// 初始化共享内存消息队列
|
|
|
if(extension_loaded('sysvmsg') && extension_loaded('sysvshm'))
|
|
|
@@ -238,18 +238,18 @@ class Master
|
|
|
if(-1 == $pid)
|
|
|
{
|
|
|
// 出错退出
|
|
|
- exit("Daemonize fail ,can not fork");
|
|
|
+ exit("Can not fork");
|
|
|
}
|
|
|
elseif($pid > 0)
|
|
|
{
|
|
|
// 父进程,退出
|
|
|
exit(0);
|
|
|
}
|
|
|
- // 子进程使之成为session leader
|
|
|
+ // 成为session leader
|
|
|
if(-1 == posix_setsid())
|
|
|
{
|
|
|
// 出错退出
|
|
|
- exit("Daemonize fail ,setsid fail");
|
|
|
+ exit("Setsid fail");
|
|
|
}
|
|
|
|
|
|
// 再fork一次
|
|
|
@@ -257,16 +257,16 @@ class Master
|
|
|
if(-1 == $pid2)
|
|
|
{
|
|
|
// 出错退出
|
|
|
- exit("Daemonize fail ,can not fork");
|
|
|
+ exit("Can not fork");
|
|
|
}
|
|
|
elseif(0 !== $pid2)
|
|
|
{
|
|
|
- // 结束第一子进程,用来禁止进程重新打开控制终端
|
|
|
+ // 禁止进程重新打开控制终端
|
|
|
exit(0);
|
|
|
}
|
|
|
|
|
|
- // 记录server启动时间
|
|
|
- self::$serverStatusInfo['start_time'] = time();
|
|
|
+ // 记录服务启动时间
|
|
|
+ self::$serviceStatusInfo['start_time'] = time();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -301,7 +301,7 @@ class Master
|
|
|
* 根据配置文件,创建监听套接字
|
|
|
* @return void
|
|
|
*/
|
|
|
- protected static function createSocketsAndListen()
|
|
|
+ protected static function createListeningSockets()
|
|
|
{
|
|
|
// 循环读取配置创建socket
|
|
|
foreach (Lib\Config::getAllWorkers() as $worker_name=>$config)
|
|
|
@@ -312,11 +312,11 @@ class Master
|
|
|
$error_no = 0;
|
|
|
$error_msg = '';
|
|
|
// 创建监听socket
|
|
|
- self::$listenedSockets[$worker_name] = stream_socket_server($config['listen'], $error_no, $error_msg, $flags);
|
|
|
- if(!self::$listenedSockets[$worker_name])
|
|
|
+ self::$listenedSocketsArray[$worker_name] = stream_socket_server($config['listen'], $error_no, $error_msg, $flags);
|
|
|
+ if(!self::$listenedSocketsArray[$worker_name])
|
|
|
{
|
|
|
Lib\Log::add("can not create socket {$config['listen']} info:{$error_no} {$error_msg}\tServer start fail");
|
|
|
- exit("\n\033[31;40mcan not create socket {$config['listen']} info:{$error_no} {$error_msg}\033[0m\n\n\033[31;40mServer start fail\033[0m\n\n");
|
|
|
+ exit("\n\033[31;40mCan not create socket {$config['listen']} {$error_msg}\033[0m\n\n\033[31;40mWorkerman start fail\033[0m\n\n");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -327,24 +327,23 @@ class Master
|
|
|
* 根据配置文件创建Workers
|
|
|
* @return void
|
|
|
*/
|
|
|
- protected static function createWorkers()
|
|
|
+ protected static function spawnWorkers()
|
|
|
{
|
|
|
- // 循环读取配置创建一定量的worker进程
|
|
|
+ // 生成一定量的worker进程
|
|
|
foreach (Lib\Config::getAllWorkers() as $worker_name=>$config)
|
|
|
{
|
|
|
// 初始化
|
|
|
- if(empty(self::$workerPids[$worker_name]))
|
|
|
+ if(empty(self::$workerPidMap[$worker_name]))
|
|
|
{
|
|
|
- self::$workerPids[$worker_name] = array();
|
|
|
+ self::$workerPidMap[$worker_name] = array();
|
|
|
}
|
|
|
|
|
|
- while(count(self::$workerPids[$worker_name]) < $config['start_workers'])
|
|
|
+ while(count(self::$workerPidMap[$worker_name]) < $config['start_workers'])
|
|
|
{
|
|
|
- $pid = self::forkOneWorker($worker_name);
|
|
|
// 子进程退出
|
|
|
- if($pid == 0)
|
|
|
+ if(self::createOneWorker($worker_name) == 0)
|
|
|
{
|
|
|
- self::notice("CHILD EXIT ERR");
|
|
|
+ self::notice("Worker exit unexpected");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -352,10 +351,10 @@ class Master
|
|
|
|
|
|
/**
|
|
|
* 创建一个worker进程
|
|
|
- * @param string $worker_name worker的名称
|
|
|
+ * @param string $worker_name 服务名
|
|
|
* @return int 父进程:>0得到新worker的pid ;<0 出错; 子进程:始终为0
|
|
|
*/
|
|
|
- protected static function forkOneWorker($worker_name)
|
|
|
+ protected static function createOneWorker($worker_name)
|
|
|
{
|
|
|
// 创建子进程
|
|
|
$pid = pcntl_fork();
|
|
|
@@ -367,7 +366,7 @@ class Master
|
|
|
if($pid > 0)
|
|
|
{
|
|
|
// 初始化master的一些东东
|
|
|
- self::$workerPids[$worker_name][$pid] = $pid;
|
|
|
+ self::$workerPidMap[$worker_name][$pid] = $pid;
|
|
|
// 更新进程信息到共享内存
|
|
|
self::updateStatusToShm();
|
|
|
|
|
|
@@ -383,7 +382,7 @@ class Master
|
|
|
Lib\Task::delAll();
|
|
|
|
|
|
// 关闭不用的监听socket
|
|
|
- foreach(self::$listenedSockets as $tmp_worker_name => $tmp_socket)
|
|
|
+ foreach(self::$listenedSocketsArray as $tmp_worker_name => $tmp_socket)
|
|
|
{
|
|
|
if($tmp_worker_name != $worker_name)
|
|
|
{
|
|
|
@@ -391,30 +390,24 @@ class Master
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // 尝试以指定用户运行worker
|
|
|
+ // 尝试以指定用户运行worker进程
|
|
|
if($worker_user = Lib\Config::get($worker_name . '.user'))
|
|
|
{
|
|
|
- self::setWorkerUser($worker_user);
|
|
|
+ self::setProcUser($worker_user);
|
|
|
}
|
|
|
|
|
|
// 关闭输出
|
|
|
self::resetStdFd();
|
|
|
|
|
|
// 尝试设置子进程进程名称
|
|
|
- self::setWorkerProcessTitle($worker_name);
|
|
|
+ self::setWorkerProcTitle($worker_name);
|
|
|
|
|
|
+ // 包含必要文件
|
|
|
require_once WORKERMAN_ROOT_DIR . 'Core/SocketWorker.php';
|
|
|
|
|
|
// 查找worker文件
|
|
|
- if($worker_file = \Man\Core\Lib\Config::get($worker_name.'.worker_file'))
|
|
|
- {
|
|
|
- $class_name = basename($worker_file, '.php');
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- $class_name = $worker_name;
|
|
|
- $worker_file = WORKERMAN_ROOT_DIR . "workers/$worker_name.php";
|
|
|
- }
|
|
|
+ $worker_file = \Man\Core\Lib\Config::get($worker_name.'.worker_file');
|
|
|
+ $class_name = basename($worker_file, '.php');
|
|
|
|
|
|
// 如果有语法错误 sleep 5秒 避免狂刷日志
|
|
|
if(\Man\Core\Lib\Checker::checkSyntaxError($worker_file, $class_name))
|
|
|
@@ -427,9 +420,9 @@ class Master
|
|
|
$worker = new $class_name($worker_name);
|
|
|
|
|
|
// 如果该worker有配置监听端口,则将监听端口的socket传递给子进程
|
|
|
- if(isset(self::$listenedSockets[$worker_name]))
|
|
|
+ if(isset(self::$listenedSocketsArray[$worker_name]))
|
|
|
{
|
|
|
- $worker->setListendSocket(self::$listenedSockets[$worker_name]);
|
|
|
+ $worker->setListendSocket(self::$listenedSocketsArray[$worker_name]);
|
|
|
}
|
|
|
|
|
|
// 使worker开始服务
|
|
|
@@ -495,9 +488,9 @@ class Master
|
|
|
{
|
|
|
switch($signal)
|
|
|
{
|
|
|
- // 停止server信号
|
|
|
+ // 停止服务信号
|
|
|
case SIGINT:
|
|
|
- self::notice("Server is shutting down");
|
|
|
+ self::notice("Workerman is shutting down");
|
|
|
self::stop();
|
|
|
break;
|
|
|
// 测试用
|
|
|
@@ -505,13 +498,13 @@ class Master
|
|
|
break;
|
|
|
// worker退出信号
|
|
|
case SIGCHLD:
|
|
|
- // 不要在这里fork,fork出来的子进程无法收到信号
|
|
|
+ // 这里什么也不做
|
|
|
// self::checkWorkerExit();
|
|
|
break;
|
|
|
// 平滑重启server信号
|
|
|
case SIGHUP:
|
|
|
Lib\Config::reload();
|
|
|
- self::notice("Server reloading");
|
|
|
+ self::notice("Workerman reloading");
|
|
|
self::addToRestartWorkers(array_keys(self::getPidWorkerNameMap()));
|
|
|
self::restartWorkers();
|
|
|
break;
|
|
|
@@ -523,21 +516,21 @@ class Master
|
|
|
* @param string $worker_name
|
|
|
* @return void
|
|
|
*/
|
|
|
- public static function setWorkerProcessTitle($worker_name)
|
|
|
+ public static function setWorkerProcTitle($worker_name)
|
|
|
{
|
|
|
- if(isset(self::$listenedSockets[$worker_name]))
|
|
|
+ if(isset(self::$listenedSocketsArray[$worker_name]))
|
|
|
{
|
|
|
// 获得socket的信息
|
|
|
- $sock_name = stream_socket_get_name(self::$listenedSockets[$worker_name], false);
|
|
|
+ $sock_name = stream_socket_get_name(self::$listenedSocketsArray[$worker_name], false);
|
|
|
|
|
|
// 更改进程名,如果支持的话
|
|
|
- $mata_data = stream_get_meta_data(self::$listenedSockets[$worker_name]);
|
|
|
+ $mata_data = stream_get_meta_data(self::$listenedSocketsArray[$worker_name]);
|
|
|
$protocol = substr($mata_data['stream_type'], 0, 3);
|
|
|
- self::setProcessTitle(self::NAME.":worker $worker_name {$protocol}://$sock_name");
|
|
|
+ self::setProcTitle(self::NAME.":worker $worker_name {$protocol}://$sock_name");
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- self::setProcessTitle(self::NAME.":worker $worker_name");
|
|
|
+ self::setProcTitle(self::NAME.":worker $worker_name");
|
|
|
}
|
|
|
|
|
|
}
|
|
|
@@ -575,7 +568,7 @@ class Master
|
|
|
while(($pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG)) != 0)
|
|
|
{
|
|
|
// 如果是重启的进程,则继续重启进程
|
|
|
- if(isset(self::$workerToRestart[$pid]) && self::$serverStatus != self::STATUS_SHUTDOWN)
|
|
|
+ if(isset(self::$workerToRestart[$pid]) && self::$serviceStatus != self::STATUS_SHUTDOWN)
|
|
|
{
|
|
|
unset(self::$workerToRestart[$pid]);
|
|
|
self::restartWorkers();
|
|
|
@@ -604,7 +597,7 @@ class Master
|
|
|
self::notice("worker[$pid:$worker_name] exit width status $status");
|
|
|
}
|
|
|
// 记录进程退出状态
|
|
|
- self::$serverStatusInfo['worker_exit_code'][$worker_name][$status] = isset(self::$serverStatusInfo['worker_exit_code'][$worker_name][$status]) ? self::$serverStatusInfo['worker_exit_code'][$worker_name][$status] + 1 : 1;
|
|
|
+ self::$serviceStatusInfo['worker_exit_code'][$worker_name][$status] = isset(self::$serviceStatusInfo['worker_exit_code'][$worker_name][$status]) ? self::$serviceStatusInfo['worker_exit_code'][$worker_name][$status] + 1 : 1;
|
|
|
// 更新状态到共享内存
|
|
|
self::updateStatusToShm();
|
|
|
|
|
|
@@ -612,10 +605,10 @@ class Master
|
|
|
self::clearWorker($worker_name, $pid);
|
|
|
|
|
|
// 如果服务是不是关闭中
|
|
|
- if(self::$serverStatus != self::STATUS_SHUTDOWN)
|
|
|
+ if(self::$serviceStatus != self::STATUS_SHUTDOWN)
|
|
|
{
|
|
|
// 重新创建worker
|
|
|
- self::createWorkers();
|
|
|
+ self::spawnWorkers();
|
|
|
}
|
|
|
// 判断是否都重启完毕
|
|
|
else
|
|
|
@@ -642,7 +635,7 @@ class Master
|
|
|
public static function getPidWorkerNameMap()
|
|
|
{
|
|
|
$all_pid = array();
|
|
|
- foreach(self::$workerPids as $worker_name=>$pid_array)
|
|
|
+ foreach(self::$workerPidMap as $worker_name=>$pid_array)
|
|
|
{
|
|
|
foreach($pid_array as $pid)
|
|
|
{
|
|
|
@@ -683,21 +676,21 @@ class Master
|
|
|
public static function restartWorkers()
|
|
|
{
|
|
|
// 标记server状态
|
|
|
- if(self::$serverStatus != self::STATUS_RESTARTING_WORKERS && self::$serverStatus != self::STATUS_SHUTDOWN)
|
|
|
+ if(self::$serviceStatus != self::STATUS_RESTARTING_WORKERS && self::$serviceStatus != self::STATUS_SHUTDOWN)
|
|
|
{
|
|
|
- self::$serverStatus = self::STATUS_RESTARTING_WORKERS;
|
|
|
+ self::$serviceStatus = self::STATUS_RESTARTING_WORKERS;
|
|
|
}
|
|
|
|
|
|
// 没有要重启的进程了
|
|
|
if(empty(self::$workerToRestart))
|
|
|
{
|
|
|
- self::$serverStatus = self::STATUS_RUNNING;
|
|
|
+ self::$serviceStatus = self::STATUS_RUNNING;
|
|
|
self::notice("\nWorker Restart Success");
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
// 遍历要重启的进程 标记它们重启时间
|
|
|
- foreach(self::$workerToRestart as $pid => $stop_time)
|
|
|
+ foreach(self::$pidsToRestart as $pid => $stop_time)
|
|
|
{
|
|
|
if($stop_time == 0)
|
|
|
{
|
|
|
@@ -718,7 +711,7 @@ class Master
|
|
|
protected static function clearWorker($worker_name, $pid)
|
|
|
{
|
|
|
// 释放一些不用了的数据
|
|
|
- unset(self::$workerToRestart[$pid], self::$workerPids[$worker_name][$pid]);
|
|
|
+ unset(self::$workerToRestart[$pid], self::$workerPidMap[$worker_name][$pid]);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -736,7 +729,7 @@ class Master
|
|
|
}
|
|
|
|
|
|
// 标记server开始关闭
|
|
|
- self::$serverStatus = self::STATUS_SHUTDOWN;
|
|
|
+ self::$serviceStatus = self::STATUS_SHUTDOWN;
|
|
|
|
|
|
// killWorkerTimeLong 秒后如果还没停止则强制杀死所有进程
|
|
|
Lib\Task::add(self::KILL_WORKER_TIME_LONG, array('\Man\Core\Master', 'stopAllWorker'), array(true), false);
|
|
|
@@ -798,13 +791,13 @@ class Master
|
|
|
* @param string $worker_user
|
|
|
* @return void
|
|
|
*/
|
|
|
- protected static function setWorkerUser($worker_user)
|
|
|
+ protected static function setProcUser($worker_user)
|
|
|
{
|
|
|
$user_info = posix_getpwnam($worker_user);
|
|
|
// 尝试设置gid uid
|
|
|
if(!posix_setgid($user_info['gid']) || !posix_setuid($user_info['uid']))
|
|
|
{
|
|
|
- self::notice( 'Notice : Can not run woker as '.$worker_user." , You shuld be login as root\n", true);
|
|
|
+ self::notice( 'Notice : Can not run woker as '.$worker_user." , You shuld be root\n", true);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -856,7 +849,7 @@ class Master
|
|
|
{
|
|
|
return true;
|
|
|
}
|
|
|
- return shm_put_var(self::$shmId, self::STATUS_VAR_ID, array_merge(self::$serverStatusInfo, array('pid_map'=>self::$workerPids)));
|
|
|
+ return shm_put_var(self::$shmId, self::STATUS_VAR_ID, array_merge(self::$serviceStatusInfo, array('pid_map'=>self::$workerPidMap)));
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -880,17 +873,17 @@ class Master
|
|
|
* @param string $title
|
|
|
* @return void
|
|
|
*/
|
|
|
- protected static function setProcessTitle($title)
|
|
|
+ protected static function setProcTitle($title)
|
|
|
{
|
|
|
// >=php 5.5
|
|
|
- if (version_compare(phpversion(), "5.5", "ge") && function_exists('cli_set_process_title'))
|
|
|
+ if (function_exists('cli_set_process_title'))
|
|
|
{
|
|
|
- cli_set_process_title($title);
|
|
|
+ @cli_set_process_title($title);
|
|
|
}
|
|
|
// 需要扩展
|
|
|
elseif(extension_loaded('proctitle') && function_exists('setproctitle'))
|
|
|
{
|
|
|
- setproctitle($title);
|
|
|
+ @setproctitle($title);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -905,7 +898,7 @@ class Master
|
|
|
Lib\Log::add("Server:".$msg);
|
|
|
if($display)
|
|
|
{
|
|
|
- if(self::$serverStatus == self::STATUS_STARTING)
|
|
|
+ if(self::$serviceStatus == self::STATUS_STARTING)
|
|
|
{
|
|
|
echo($msg."\n");
|
|
|
}
|