Master.php 30 KB


  1. <?php
  2. namespace Man\Core;
  3. use \Man\Core\Lib\Task;
  4. use \Man\Core\Lib\Config;
  5. use \Man\Core\Lib\Checker;
  6. use \Man\Core\Lib\Log;
  7. if(!defined('WORKERMAN_ROOT_DIR'))
  8. {
  9. define('WORKERMAN_ROOT_DIR', realpath(__DIR__."/../../")."/");
  10. }
  11. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Checker.php';
  12. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Config.php';
  13. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Task.php';
  14. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Log.php';
  15. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Mutex.php';
  16. require_once WORKERMAN_ROOT_DIR . 'Core/Events/interfaces.php';
  17. require_once WORKERMAN_ROOT_DIR . 'Core/Events/Select.php';
  18. require_once WORKERMAN_ROOT_DIR . 'Core/Events/Libevent.php';
  19. require_once WORKERMAN_ROOT_DIR . 'Core/AbstractWorker.php';
  20. require_once WORKERMAN_ROOT_DIR . 'Core/SocketWorker.php';
  21. /**
  22. *
  23. * 主进程
  24. *
  25. * @package Core
  26. *
  27. * @author walkor <walkor@workerman.net>
  28. * <b>使用示例:</b>
  29. * <pre>
  30. * <code>
  31. * Man\Core\Master::run();
  32. * <code>
  33. * </pre>
  34. *
  35. */
  36. class Master
  37. {
  38. /**
  39. * 版本
  40. * @var string
  41. */
  42. const VERSION = '2.1.6';
  43. /**
  44. * 服务名
  45. * @var string
  46. */
  47. const NAME = 'WorkerMan';
  48. /**
  49. * 服务状态 启动中
  50. * @var integer
  51. */
  52. const STATUS_STARTING = 1;
  53. /**
  54. * 服务状态 运行中
  55. * @var integer
  56. */
  57. const STATUS_RUNNING = 2;
  58. /**
  59. * 服务状态 关闭中
  60. * @var integer
  61. */
  62. const STATUS_SHUTDOWN = 4;
  63. /**
  64. * 服务状态 平滑重启中
  65. * @var integer
  66. */
  67. const STATUS_RESTARTING_WORKERS = 8;
  68. /**
  69. * 整个服务能够启动的最大进程数
  70. * @var integer
  71. */
  72. const SERVER_MAX_WORKER_COUNT = 5000;
  73. /**
  74. * 单个进程打开文件数限制
  75. * @var integer
  76. */
  77. const MIN_SOFT_OPEN_FILES = 10000;
  78. /**
  79. * 单个进程打开文件数限制 硬性限制
  80. * @var integer
  81. */
  82. const MIN_HARD_OPEN_FILES = 10000;
  83. /**
  84. * 共享内存中用于存储主进程统计信息的变量id
  85. * @var integer
  86. */
  87. const STATUS_VAR_ID = 1;
  88. /**
  89. * 发送停止命令多久后worker没退出则发送sigkill信号
  90. * @var integer
  91. */
  92. const KILL_WORKER_TIME_LONG = 4;
  93. /**
  94. * 默认listen的backlog,如果没配置backlog,则使用此值
  95. * @var integer
  96. */
  97. const DEFAULT_BACKLOG= 1024;
  98. /**
  99. * 用于保存所有子进程pid ['worker_name1'=>[pid1=>pid1,pid2=>pid2,..], 'worker_name2'=>[pid7,..], ...]
  100. * @var array
  101. */
  102. protected static $workerPidMap = array();
  103. /**
  104. * 服务的状态,默认是启动中
  105. * @var integer
  106. */
  107. protected static $serviceStatus = self::STATUS_STARTING;
  108. /**
  109. * 用来监听端口的Socket数组,用来fork worker使用
  110. * @var array
  111. */
  112. protected static $listenedSocketsArray = array();
  113. /**
  114. * 要重启r的pid数组 [pid1=>time_stamp, pid2=>time_stamp, ..]
  115. * @var array
  116. */
  117. protected static $pidsToRestart = array();
  118. /**
  119. * 共享内存resource id
  120. * @var resource
  121. */
  122. protected static $shmId = 0;
  123. /**
  124. * 消息队列 resource id
  125. * @var resource
  126. */
  127. protected static $queueId = 0;
  128. /**
  129. * master进程pid
  130. * @var integer
  131. */
  132. protected static $masterPid = 0;
  133. /**
  134. * 终端是否关闭
  135. * @var bool
  136. */
  137. protected static $terminalClosed = false;
  138. /**
  139. * 是否已经重定向了标准输出
  140. * @var bool
  141. */
  142. protected static $hasResetStd = false;
  143. /**
  144. * server统计信息 ['start_time'=>time_stamp, 'worker_exit_code'=>['worker_name1'=>[code1=>count1, code2=>count2,..], 'worker_name2'=>[code3=>count3,...], ..] ]
  145. * @var array
  146. */
  147. protected static $serviceStatusInfo = array(
  148. 'start_time' => 0,
  149. 'worker_exit_code' => array(),
  150. );
  151. /**
  152. * 服务运行
  153. * @return void
  154. */
  155. public static function run()
  156. {
  157. // 输出信息
  158. self::notice("Workerman is starting ...", true);
  159. // 检查环境
  160. self::checkEnv();
  161. // 初始化
  162. self::init();
  163. // 变成守护进程
  164. self::daemonize();
  165. // 保存进程pid
  166. self::savePid();
  167. // 安装信号
  168. self::installSignal();
  169. // 创建监听套接字
  170. self::createListeningSockets();
  171. // 创建worker进程
  172. self::spawnWorkers();
  173. // 输出信息
  174. self::notice("\033[1A\n\033[KWorkerman start success ...\033[0m", true);
  175. // 标记服务状态为运行中
  176. self::$serviceStatus = self::STATUS_RUNNING;
  177. // 初始化任务
  178. Task::init();
  179. // 关闭标准输出
  180. self::resetStdFd();
  181. // 主循环
  182. self::loop();
  183. }
  184. /**
  185. * 初始化 配置、进程名、共享内存、消息队列等
  186. * @return void
  187. */
  188. public static function init()
  189. {
  190. // 因为子进程要更换用户、开低端口等,必须是root启动
  191. if($user_info = posix_getpwuid(posix_getuid()))
  192. {
  193. if($user_info['name'] !== 'root')
  194. {
  195. exit("\033[31;40mYou should run workerman as root . Permission denied\033[0m\n");
  196. }
  197. }
  198. // 获取配置文件
  199. $config_path = Config::$configFile;
  200. // 设置进程名称,如果支持的话
  201. self::setProcTitle(self::NAME.':master with-config:' . $config_path);
  202. // 初始化共享内存消息队列
  203. if(extension_loaded('sysvmsg') && extension_loaded('sysvshm'))
  204. {
  205. self::$shmId = shm_attach(IPC_KEY, DEFAULT_SHM_SIZE);
  206. self::$queueId = msg_get_queue(IPC_KEY);
  207. msg_set_queue(self::$queueId,array('msg_qbytes'=>DEFAULT_MSG_QBYTES));
  208. }
  209. }
  210. /**
  211. * 检查环境配置
  212. * @return void
  213. */
  214. public static function checkEnv()
  215. {
  216. // 检查PID文件
  217. Checker::checkPidFile();
  218. // 检查扩展支持情况
  219. Checker::checkExtension();
  220. // 检查函数禁用情况
  221. Checker::checkDisableFunction();
  222. // 检查log目录是否可读
  223. Log::init();
  224. // 检查配置和语法错误等
  225. Checker::checkWorkersConfig();
  226. // 检查文件限制
  227. Checker::checkLimit();
  228. }
  229. /**
  230. * 使之脱离终端,变为守护进程
  231. * @return void
  232. */
  233. protected static function daemonize()
  234. {
  235. // 设置umask
  236. umask(0);
  237. // fork一次
  238. $pid = pcntl_fork();
  239. if(-1 == $pid)
  240. {
  241. // 出错退出
  242. exit("Can not fork");
  243. }
  244. elseif($pid > 0)
  245. {
  246. // 父进程,退出
  247. exit(0);
  248. }
  249. // 成为session leader
  250. if(-1 == posix_setsid())
  251. {
  252. // 出错退出
  253. exit("Setsid fail");
  254. }
  255. // 再fork一次,防止在符合SVR4标准的系统下进程再次获得终端
  256. $pid2 = pcntl_fork();
  257. if(-1 == $pid2)
  258. {
  259. // 出错退出
  260. exit("Can not fork");
  261. }
  262. elseif(0 !== $pid2)
  263. {
  264. // 禁止进程重新打开控制终端
  265. exit(0);
  266. }
  267. // 记录服务启动时间
  268. self::$serviceStatusInfo['start_time'] = time();
  269. }
  270. /**
  271. * 保存主进程pid
  272. * @return void
  273. */
  274. public static function savePid()
  275. {
  276. // 保存在变量中
  277. self::$masterPid = posix_getpid();
  278. // 保存到文件中,用于实现停止、重启
  279. if(false === @file_put_contents(WORKERMAN_PID_FILE, self::$masterPid))
  280. {
  281. exit("\033[31;40mCan not save pid to pid-file(" . WORKERMAN_PID_FILE . ")\033[0m\n\n\033[31;40mServer start fail\033[0m\n\n");
  282. }
  283. // 更改权限
  284. chmod(WORKERMAN_PID_FILE, 0644);
  285. }
  286. /**
  287. * 获取主进程pid
  288. * @return int
  289. */
  290. public static function getMasterPid()
  291. {
  292. return self::$masterPid;
  293. }
  294. /**
  295. * 根据配置文件,创建监听套接字
  296. * @return void
  297. */
  298. protected static function createListeningSockets()
  299. {
  300. // 循环读取配置创建socket
  301. foreach (Config::getAllWorkers() as $worker_name=>$config)
  302. {
  303. if(isset($config['listen']))
  304. {
  305. $context = self::getSocketContext($worker_name);
  306. $flags = substr($config['listen'], 0, 3) == 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
  307. $error_no = 0;
  308. $error_msg = '';
  309. // 创建监听socket
  310. if($context)
  311. {
  312. self::$listenedSocketsArray[$worker_name] = stream_socket_server($config['listen'], $error_no, $error_msg, $flags, $context);
  313. }
  314. else
  315. {
  316. self::$listenedSocketsArray[$worker_name] = stream_socket_server($config['listen'], $error_no, $error_msg, $flags);
  317. }
  318. if(!self::$listenedSocketsArray[$worker_name])
  319. {
  320. Log::add("can not create socket {$config['listen']} info:{$error_no} {$error_msg}\tServer start fail");
  321. exit("\n\033[31;40mCan not create socket {$config['listen']} {$error_msg}\033[0m\n\n\033[31;40mWorkerman start fail\033[0m\n\n");
  322. }
  323. }
  324. }
  325. }
  326. /**
  327. * 根据配置文件创建Workers
  328. * @return void
  329. */
  330. protected static function spawnWorkers()
  331. {
  332. // 生成一定量的worker进程
  333. foreach (Config::getAllWorkers() as $worker_name=>$config)
  334. {
  335. // 初始化
  336. if(empty(self::$workerPidMap[$worker_name]))
  337. {
  338. self::$workerPidMap[$worker_name] = array();
  339. }
  340. while(count(self::$workerPidMap[$worker_name]) < $config['start_workers'])
  341. {
  342. // 子进程退出
  343. if(self::createOneWorker($worker_name) == 0)
  344. {
  345. self::notice("Worker exit unexpected");
  346. exit(500);
  347. }
  348. }
  349. }
  350. }
  351. /**
  352. * 创建一个worker进程
  353. * @param string $worker_name 服务名
  354. * @return int 父进程:>0得到新worker的pid ;<0 出错; 子进程:始终为0
  355. */
  356. protected static function createOneWorker($worker_name)
  357. {
  358. // 创建子进程
  359. $pid = pcntl_fork();
  360. // 先处理收到的信号
  361. pcntl_signal_dispatch();
  362. // 父进程
  363. if($pid > 0)
  364. {
  365. // 初始化master的一些东东
  366. self::$workerPidMap[$worker_name][$pid] = $pid;
  367. // 更新进程信息到共享内存
  368. self::updateStatusToShm();
  369. return $pid;
  370. }
  371. // 子进程
  372. elseif($pid === 0)
  373. {
  374. if($chdir = Config::get($worker_name.".chdir"))
  375. {
  376. chdir($chdir);
  377. }
  378. // 忽略信号
  379. self::ignoreSignal();
  380. // 清空任务
  381. Task::delAll();
  382. // 关闭不用的监听socket
  383. foreach(self::$listenedSocketsArray as $tmp_worker_name => $tmp_socket)
  384. {
  385. if($tmp_worker_name != $worker_name)
  386. {
  387. fclose($tmp_socket);
  388. }
  389. }
  390. // 尝试以指定用户运行worker进程
  391. if($worker_user = Config::get($worker_name . '.user'))
  392. {
  393. self::setProcUser($worker_user);
  394. }
  395. // 关闭输出
  396. self::resetStdFd(Config::get($worker_name.'.no_debug'));
  397. // 尝试设置子进程进程名称
  398. self::setWorkerProcTitle($worker_name);
  399. // 查找worker文件
  400. $worker_file = Config::get($worker_name.'.worker_file');
  401. $class_name = basename($worker_file, '.php');
  402. // 如果有语法错误 sleep 5秒 避免狂刷日志
  403. if(Checker::checkSyntaxError($worker_file, $class_name))
  404. {
  405. sleep(5);
  406. }
  407. require_once $worker_file;
  408. // 创建实例
  409. $worker = new $class_name($worker_name);
  410. // 如果该worker有配置监听端口,则将监听端口的socket传递给子进程
  411. if(isset(self::$listenedSocketsArray[$worker_name]))
  412. {
  413. $worker->setListendSocket(self::$listenedSocketsArray[$worker_name]);
  414. }
  415. // 使worker开始服务
  416. $worker->start();
  417. return 0;
  418. }
  419. // 出错
  420. else
  421. {
  422. self::notice("create worker fail worker_name:$worker_name detail:pcntl_fork fail");
  423. return $pid;
  424. }
  425. }
  426. /**
  427. * 安装相关信号控制器
  428. * @return void
  429. */
  430. protected static function installSignal()
  431. {
  432. // 设置终止信号处理函数
  433. pcntl_signal(SIGINT, array('\Man\Core\Master', 'signalHandler'), false);
  434. // 设置SIGUSR1信号处理函数,测试用
  435. pcntl_signal(SIGUSR1, array('\Man\Core\Master', 'signalHandler'), false);
  436. // 设置SIGUSR2信号处理函数,平滑重启Server
  437. pcntl_signal(SIGHUP, array('\Man\Core\Master', 'signalHandler'), false);
  438. // 设置子进程退出信号处理函数
  439. pcntl_signal(SIGCHLD, array('\Man\Core\Master', 'signalHandler'), false);
  440. // 设置忽略信号
  441. pcntl_signal(SIGPIPE, SIG_IGN);
  442. pcntl_signal(SIGTTIN, SIG_IGN);
  443. pcntl_signal(SIGTTOU, SIG_IGN);
  444. pcntl_signal(SIGQUIT, SIG_IGN);
  445. pcntl_signal(SIGALRM, SIG_IGN);
  446. }
  447. /**
  448. * 忽略信号
  449. * @return void
  450. */
  451. protected static function ignoreSignal()
  452. {
  453. // 设置忽略信号
  454. pcntl_signal(SIGPIPE, SIG_IGN);
  455. pcntl_signal(SIGTTIN, SIG_IGN);
  456. pcntl_signal(SIGTTOU, SIG_IGN);
  457. pcntl_signal(SIGQUIT, SIG_IGN);
  458. pcntl_signal(SIGALRM, SIG_IGN);
  459. pcntl_signal(SIGINT, SIG_IGN);
  460. pcntl_signal(SIGUSR1, SIG_IGN);
  461. pcntl_signal(SIGUSR2, SIG_IGN);
  462. pcntl_signal(SIGHUP, SIG_IGN);
  463. }
  464. /**
  465. * 设置server信号处理函数
  466. * @param null $null
  467. * @param int $signal
  468. * @return void
  469. */
  470. public static function signalHandler($signal)
  471. {
  472. switch($signal)
  473. {
  474. // 停止服务信号
  475. case SIGINT:
  476. self::notice("Workerman is shutting down");
  477. self::stop();
  478. break;
  479. // 测试用
  480. case SIGUSR1:
  481. break;
  482. // worker退出信号
  483. case SIGCHLD:
  484. // 这里什么也不做
  485. // self::checkWorkerExit();
  486. break;
  487. // 平滑重启server信号
  488. case SIGHUP:
  489. Config::reload();
  490. self::notice("Workerman reloading");
  491. $pid_worker_name_map = self::getPidWorkerNameMap();
  492. $pids_to_restart = array();
  493. foreach($pid_worker_name_map as $pid=>$worker_name)
  494. {
  495. // 如果对应进程配置了不热启动则不重启对应进程
  496. if(Config::get($worker_name.'.no_reload'))
  497. {
  498. // 发送reload信号,以便触发onReload方法
  499. posix_kill($pid, SIGHUP);
  500. continue;
  501. }
  502. $pids_to_restart[] = $pid;
  503. }
  504. self::addToRestartPids($pids_to_restart);
  505. self::restartPids();
  506. break;
  507. }
  508. }
  509. /**
  510. * 设置子进程进程名称
  511. * @param string $worker_name
  512. * @return void
  513. */
  514. public static function setWorkerProcTitle($worker_name)
  515. {
  516. if(isset(self::$listenedSocketsArray[$worker_name]))
  517. {
  518. // 获得socket的信息
  519. $sock_name = stream_socket_get_name(self::$listenedSocketsArray[$worker_name], false);
  520. // 更改进程名,如果支持的话
  521. $mata_data = stream_get_meta_data(self::$listenedSocketsArray[$worker_name]);
  522. $protocol = substr($mata_data['stream_type'], 0, 3);
  523. self::setProcTitle(self::NAME.":worker $worker_name {$protocol}://$sock_name");
  524. }
  525. else
  526. {
  527. self::setProcTitle(self::NAME.":worker $worker_name");
  528. }
  529. }
  530. /**
  531. * 主进程主循环 主要是监听子进程退出、服务终止、平滑重启信号
  532. * @return void
  533. */
  534. public static function loop()
  535. {
  536. while(1)
  537. {
  538. sleep(1);
  539. // 检查是否有进程退出
  540. self::checkWorkerExit();
  541. // 检查终端是否关闭
  542. self::checkTty();
  543. // 触发信号处理
  544. pcntl_signal_dispatch();
  545. }
  546. }
  547. /**
  548. * 监控worker进程状态,退出重启
  549. * @param resource $channel
  550. * @param int $flag
  551. * @param int $pid 退出的进程id
  552. * @return mixed
  553. */
  554. public static function checkWorkerExit()
  555. {
  556. // 由于SIGCHLD信号可能重叠导致信号丢失,所以这里要循环获取所有退出的进程id
  557. while(($pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG)) != 0)
  558. {
  559. // 如果是重启的进程,则继续重启进程
  560. if(isset(self::$pidsToRestart[$pid]) && self::$serviceStatus != self::STATUS_SHUTDOWN)
  561. {
  562. unset(self::$pidsToRestart[$pid]);
  563. self::restartPids();
  564. }
  565. // 出错
  566. if($pid < 0)
  567. {
  568. $last_error = function_exists('pcntl_get_last_error') ? pcntl_get_last_error() : 'function pcntl_get_last_error not exists';
  569. self::notice('pcntl_waitpid return '.$pid.' and pcntl_get_last_error = ' . $last_error);
  570. return $pid;
  571. }
  572. // 查找子进程对应的woker_name
  573. $pid_workname_map = self::getPidWorkerNameMap();
  574. $worker_name = isset($pid_workname_map[$pid]) ? $pid_workname_map[$pid] : '';
  575. // 没找到worker_name说明出错了
  576. if(empty($worker_name))
  577. {
  578. self::notice("child exist but not found worker_name pid:$pid");
  579. break;
  580. }
  581. // 进程退出状态不是0,说明有问题了
  582. if($status !== 0)
  583. {
  584. self::notice("worker[$pid:$worker_name] exit with status $status");
  585. }
  586. // 记录进程退出状态
  587. self::$serviceStatusInfo['worker_exit_code'][$worker_name][$status] = isset(self::$serviceStatusInfo['worker_exit_code'][$worker_name][$status]) ? self::$serviceStatusInfo['worker_exit_code'][$worker_name][$status] + 1 : 1;
  588. // 更新状态到共享内存
  589. self::updateStatusToShm();
  590. // 清理这个进程的数据
  591. self::clearWorker($worker_name, $pid);
  592. // 如果服务是不是关闭中
  593. if(self::$serviceStatus != self::STATUS_SHUTDOWN)
  594. {
  595. // 重新创建worker
  596. self::spawnWorkers();
  597. }
  598. // 判断是否都重启完毕
  599. else
  600. {
  601. $all_worker_pid = self::getPidWorkerNameMap();
  602. if(empty($all_worker_pid))
  603. {
  604. // 删除共享内存
  605. self::removeShmAndQueue();
  606. // 发送提示
  607. self::notice("Workerman stoped");
  608. // 删除pid文件
  609. @unlink(WORKERMAN_PID_FILE);
  610. exit(0);
  611. }
  612. }//end if
  613. }//end while
  614. }
  615. /**
  616. * 获取pid 到 worker_name 的映射
  617. * @return array ['pid1'=>'worker_name1','pid2'=>'worker_name2', ...]
  618. */
  619. public static function getPidWorkerNameMap()
  620. {
  621. $all_pid = array();
  622. foreach(self::$workerPidMap as $worker_name=>$pid_array)
  623. {
  624. foreach($pid_array as $pid)
  625. {
  626. $all_pid[$pid] = $worker_name;
  627. }
  628. }
  629. return $all_pid;
  630. }
  631. /**
  632. * 放入重启队列中
  633. * @param array $restart_pids
  634. * @return void
  635. */
  636. public static function addToRestartPids($restart_pids)
  637. {
  638. if(!is_array($restart_pids))
  639. {
  640. self::notice("addToRestartPids(".var_export($restart_pids, true).") \$restart_pids not array");
  641. return false;
  642. }
  643. // 将pid放入重启队列
  644. foreach($restart_pids as $pid)
  645. {
  646. if(!isset(self::$pidsToRestart[$pid]))
  647. {
  648. // 重启时间=0
  649. self::$pidsToRestart[$pid] = 0;
  650. }
  651. }
  652. }
  653. /**
  654. * 重启workers
  655. * @return void
  656. */
  657. public static function restartPids()
  658. {
  659. // 标记server状态
  660. if(self::$serviceStatus != self::STATUS_RESTARTING_WORKERS && self::$serviceStatus != self::STATUS_SHUTDOWN)
  661. {
  662. self::$serviceStatus = self::STATUS_RESTARTING_WORKERS;
  663. }
  664. // 没有要重启的进程了
  665. if(empty(self::$pidsToRestart))
  666. {
  667. self::$serviceStatus = self::STATUS_RUNNING;
  668. self::notice("\nWorker Restart Success");
  669. return true;
  670. }
  671. // 遍历要重启的进程 标记它们重启时间
  672. foreach(self::$pidsToRestart as $pid => $stop_time)
  673. {
  674. if($stop_time == 0)
  675. {
  676. self::$pidsToRestart[$pid] = time();
  677. posix_kill($pid, SIGHUP);
  678. Task::add(self::KILL_WORKER_TIME_LONG, array('\Man\Core\Master', 'forceKillWorker'), array($pid), false);
  679. break;
  680. }
  681. }
  682. }
  683. /**
  684. * worker进程退出时,master进程的一些清理工作
  685. * @param string $worker_name
  686. * @param int $pid
  687. * @return void
  688. */
  689. protected static function clearWorker($worker_name, $pid)
  690. {
  691. // 释放一些不用了的数据
  692. unset(self::$pidsToRestart[$pid], self::$workerPidMap[$worker_name][$pid]);
  693. }
  694. /**
  695. * 停止服务
  696. * @return void
  697. */
  698. public static function stop()
  699. {
  700. // 如果没有子进程则直接退出
  701. $all_worker_pid = self::getPidWorkerNameMap();
  702. if(empty($all_worker_pid))
  703. {
  704. exit(0);
  705. }
  706. // 标记server开始关闭
  707. self::$serviceStatus = self::STATUS_SHUTDOWN;
  708. // killWorkerTimeLong 秒后如果还没停止则强制杀死所有进程
  709. Task::add(self::KILL_WORKER_TIME_LONG, array('\Man\Core\Master', 'stopAllWorker'), array(true), false);
  710. // 停止所有worker
  711. self::stopAllWorker();
  712. }
  713. /**
  714. * 停止所有worker
  715. * @param bool $force 是否强制退出
  716. * @return void
  717. */
  718. public static function stopAllWorker($force = false)
  719. {
  720. // 获得所有pid
  721. $all_worker_pid = self::getPidWorkerNameMap();
  722. // 强行杀死
  723. if($force)
  724. {
  725. // 杀死所有子进程
  726. foreach($all_worker_pid as $pid=>$worker_name)
  727. {
  728. // 发送SIGKILL信号
  729. self::forceKillWorker($pid);
  730. self::notice("Kill workers[$worker_name] force");
  731. }
  732. }
  733. else
  734. {
  735. // 向所有子进程发送终止信号
  736. foreach($all_worker_pid as $pid=>$worker_name)
  737. {
  738. // 发送SIGINT信号
  739. posix_kill($pid, SIGINT);
  740. }
  741. }
  742. }
  743. /**
  744. * 强制杀死进程
  745. * @param int $pid
  746. * @return void
  747. */
  748. public static function forceKillWorker($pid)
  749. {
  750. if(posix_kill($pid, 0))
  751. {
  752. self::notice("Kill workers $pid force!");
  753. posix_kill($pid, SIGKILL);
  754. }
  755. }
  756. /**
  757. * 设置运行用户
  758. * @param string $worker_user
  759. * @return void
  760. */
  761. protected static function setProcUser($worker_user)
  762. {
  763. $user_info = posix_getpwnam($worker_user);
  764. if($user_info['uid'] != posix_getuid() || $user_info['gid'] != posix_getgid())
  765. {
  766. // 尝试设置gid uid
  767. if(!posix_setgid($user_info['gid']) || !posix_setuid($user_info['uid']))
  768. {
  769. self::notice( 'Notice : Can not run woker as '.$worker_user." , You shuld be root\n", true);
  770. }
  771. }
  772. }
  773. /**
  774. * 获取共享内存资源id
  775. * @return resource
  776. */
  777. public static function getShmId()
  778. {
  779. return self::$shmId;
  780. }
  781. /**
  782. * 获取消息队列资源id
  783. * @return resource
  784. */
  785. public static function getQueueId()
  786. {
  787. return self::$queueId;
  788. }
  789. /**
  790. * 关闭标准输入输出
  791. * @return void
  792. */
  793. protected static function resetStdFd($force = false)
  794. {
  795. // 已经重定向了标准输出
  796. if(self::$hasResetStd)
  797. {
  798. return;
  799. }
  800. // 将标准输出重定向到$stdout_file
  801. global $STDOUT, $STDERR;
  802. $stdout_file = Config::get('workerman.stdout_file');
  803. if($stdout_file)
  804. {
  805. $handle = fopen($stdout_file,"a");
  806. if($handle)
  807. {
  808. unset($handle);
  809. @fclose(STDOUT);
  810. @fclose(STDERR);
  811. $STDOUT = fopen($stdout_file,"a");
  812. $STDERR = fopen($stdout_file,"a");
  813. self::$hasResetStd = true;
  814. return;
  815. }
  816. }
  817. // 如果此进程配置是no_debug,则关闭输出
  818. if(!$force)
  819. {
  820. // 没有设置 stdout_file并且debug开启并且终端没有关闭则不关闭标准输出,用于调试
  821. if(Config::get('workerman.debug') == 1 && @posix_ttyname(STDOUT))
  822. {
  823. return ;
  824. }
  825. }
  826. @fclose(STDOUT);
  827. @fclose(STDERR);
  828. $STDOUT = fopen('/dev/null',"a");
  829. $STDERR = fopen('/dev/null',"a");
  830. self::$hasResetStd = true;
  831. }
  832. /**
  833. * 是否已经重置了fd
  834. * @return boolean
  835. */
  836. public static function hasResetFd()
  837. {
  838. return self::$hasResetStd;
  839. }
  840. /**
  841. * 更新主进程收集的状态信息到共享内存
  842. * @return bool
  843. */
  844. protected static function updateStatusToShm()
  845. {
  846. if(!self::$shmId)
  847. {
  848. return true;
  849. }
  850. return shm_put_var(self::$shmId, self::STATUS_VAR_ID, array_merge(self::$serviceStatusInfo, array('pid_map'=>self::$workerPidMap)));
  851. }
  852. /**
  853. * 销毁共享内存以及消息队列
  854. * @return void
  855. */
  856. protected static function removeShmAndQueue()
  857. {
  858. if(self::$shmId)
  859. {
  860. shm_remove(self::$shmId);
  861. }
  862. if(self::$queueId)
  863. {
  864. msg_remove_queue(self::$queueId);
  865. }
  866. }
  867. /**
  868. * 设置进程名称,需要proctitle支持 或者php>=5.5
  869. * @param string $title
  870. * @return void
  871. */
  872. protected static function setProcTitle($title)
  873. {
  874. // >=php 5.5
  875. if (function_exists('cli_set_process_title'))
  876. {
  877. @cli_set_process_title($title);
  878. }
  879. // 需要扩展
  880. elseif(extension_loaded('proctitle') && function_exists('setproctitle'))
  881. {
  882. @setproctitle($title);
  883. }
  884. }
  885. /**
  886. * 获得socket的上下文选项
  887. * @param string $worker_name
  888. * @return resource
  889. */
  890. protected static function getSocketContext($worker_name)
  891. {
  892. $context = null;
  893. // 根据手册5.3.3之前版本stream_socket_server 不支持 backlog 选项
  894. if(version_compare(PHP_VERSION, '5.3.3') < 0)
  895. {
  896. return $context;
  897. }
  898. // 读取worker的backlog
  899. $backlog = (int)Config::get($worker_name . '.backlog');
  900. // 没有设置或者不合法则尝试使用workerman.conf中的backlog设置
  901. if($backlog <= 0)
  902. {
  903. $backlog = (int)Config::get('workerman.backlog');
  904. }
  905. // 都没设置backlog,使用默认值
  906. if($backlog <= 0)
  907. {
  908. $backlog = self::DEFAULT_BACKLOG;
  909. }
  910. // backlog选项
  911. $opts = array(
  912. 'socket' => array(
  913. 'backlog' => $backlog,
  914. ),
  915. );
  916. // 返回上下文
  917. $context = stream_context_create($opts);
  918. return $context;
  919. }
  920. /**
  921. * 检查控制终端是否已经关闭, 如果控制终端关闭,则停止打印数据到终端
  922. * @return void
  923. */
  924. public static function checkTty()
  925. {
  926. // 已经重置了fd,就不检查终端是否关闭,因为不会打印东西到屏幕了
  927. if(self::$hasResetStd)
  928. {
  929. return;
  930. }
  931. // 没开启debug不打印数据到屏幕,不检测终端是否关闭
  932. if(!Config::get('workerman.debug'))
  933. {
  934. return;
  935. }
  936. // 检测终端是否关闭
  937. if(!self::$terminalClosed && !@posix_ttyname(STDOUT))
  938. {
  939. self::resetStdFd(true);
  940. // 日志
  941. self::notice("terminal closed and reset workers fd");
  942. // 获取所有子进程pid
  943. $all_worker_pid = self::getPidWorkerNameMap();
  944. // 向所有子进程发送重置标准输入输出信号
  945. foreach($all_worker_pid as $pid=>$worker_name)
  946. {
  947. // 发送SIGTTOU信号
  948. posix_kill($pid, SIGTTOU);
  949. }
  950. // 设置标记
  951. self::$terminalClosed = true;
  952. }
  953. }
  954. /**
  955. * notice,记录到日志
  956. * @param string $msg
  957. * @param bool $display
  958. * @return void
  959. */
  960. public static function notice($msg, $display = false)
  961. {
  962. Log::add("Server:".trim($msg));
  963. if($display)
  964. {
  965. if(self::$serviceStatus == self::STATUS_STARTING && @posix_ttyname(STDOUT))
  966. {
  967. echo($msg."\n");
  968. }
  969. }
  970. }
  971. }