Master.php 30 KB


  1. <?php
  2. namespace Man\Core;
  3. use \Man\Core\Lib\Task;
  4. use \Man\Core\Lib\Config;
  5. use \Man\Core\Lib\Checker;
  6. use \Man\Core\Lib\Log;
  7. if(!defined('WORKERMAN_ROOT_DIR'))
  8. {
  9. define('WORKERMAN_ROOT_DIR', realpath(__DIR__."/../../")."/");
  10. }
  11. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Checker.php';
  12. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Config.php';
  13. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Task.php';
  14. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Log.php';
  15. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Mutex.php';
  16. require_once WORKERMAN_ROOT_DIR . 'Core/Events/interfaces.php';
  17. require_once WORKERMAN_ROOT_DIR . 'Core/Events/Select.php';
  18. require_once WORKERMAN_ROOT_DIR . 'Core/Events/Libevent.php';
  19. require_once WORKERMAN_ROOT_DIR . 'Core/AbstractWorker.php';
  20. require_once WORKERMAN_ROOT_DIR . 'Core/SocketWorker.php';
  21. /**
  22. *
  23. * 主进程
  24. *
  25. * @package Core
  26. *
  27. * @author walkor <walkor@workerman.net>
  28. * <b>使用示例:</b>
  29. * <pre>
  30. * <code>
  31. * Man\Core\Master::run();
  32. * <code>
  33. * </pre>
  34. *
  35. */
  36. class Master
  37. {
  38. /**
  39. * 版本
  40. * @var string
  41. */
  42. const VERSION = '2.1.6';
  43. /**
  44. * 服务名
  45. * @var string
  46. */
  47. const NAME = 'WorkerMan';
  48. /**
  49. * 服务状态 启动中
  50. * @var integer
  51. */
  52. const STATUS_STARTING = 1;
  53. /**
  54. * 服务状态 运行中
  55. * @var integer
  56. */
  57. const STATUS_RUNNING = 2;
  58. /**
  59. * 服务状态 关闭中
  60. * @var integer
  61. */
  62. const STATUS_SHUTDOWN = 4;
  63. /**
  64. * 服务状态 平滑重启中
  65. * @var integer
  66. */
  67. const STATUS_RESTARTING_WORKERS = 8;
  68. /**
  69. * 整个服务能够启动的最大进程数
  70. * @var integer
  71. */
  72. const SERVER_MAX_WORKER_COUNT = 5000;
  73. /**
  74. * 单个进程打开文件数限制
  75. * @var integer
  76. */
  77. const MIN_SOFT_OPEN_FILES = 10000;
  78. /**
  79. * 单个进程打开文件数限制 硬性限制
  80. * @var integer
  81. */
  82. const MIN_HARD_OPEN_FILES = 10000;
  83. /**
  84. * 共享内存中用于存储主进程统计信息的变量id
  85. * @var integer
  86. */
  87. const STATUS_VAR_ID = 1;
  88. /**
  89. * 发送停止命令多久后worker没退出则发送sigkill信号
  90. * @var integer
  91. */
  92. const KILL_WORKER_TIME_LONG = 4;
  93. /**
  94. * 默认listen的backlog,如果没配置backlog,则使用此值
  95. * @var integer
  96. */
  97. const DEFAULT_BACKLOG= 1024;
  98. /**
  99. * 用于保存所有子进程pid ['worker_name1'=>[pid1=>pid1,pid2=>pid2,..], 'worker_name2'=>[pid7,..], ...]
  100. * @var array
  101. */
  102. protected static $workerPidMap = array();
  103. /**
  104. * 服务的状态,默认是启动中
  105. * @var integer
  106. */
  107. protected static $serviceStatus = self::STATUS_STARTING;
  108. /**
  109. * 用来监听端口的Socket数组,用来fork worker使用
  110. * @var array
  111. */
  112. protected static $listenedSocketsArray = array();
  113. /**
  114. * 要重启r的pid数组 [pid1=>time_stamp, pid2=>time_stamp, ..]
  115. * @var array
  116. */
  117. protected static $pidsToRestart = array();
  118. /**
  119. * 共享内存resource id
  120. * @var resource
  121. */
  122. protected static $shmId = 0;
  123. /**
  124. * 消息队列 resource id
  125. * @var resource
  126. */
  127. protected static $queueId = 0;
  128. /**
  129. * master进程pid
  130. * @var integer
  131. */
  132. protected static $masterPid = 0;
  133. /**
  134. * 终端是否关闭
  135. * @var bool
  136. */
  137. protected static $terminalClosed = false;
  138. /**
  139. * 是否已经重定向了标准输出
  140. * @var bool
  141. */
  142. protected static $hasResetStd = false;
  143. /**
  144. * server统计信息 ['start_time'=>time_stamp, 'worker_exit_code'=>['worker_name1'=>[code1=>count1, code2=>count2,..], 'worker_name2'=>[code3=>count3,...], ..] ]
  145. * @var array
  146. */
  147. protected static $serviceStatusInfo = array(
  148. 'start_time' => 0,
  149. 'worker_exit_code' => array(),
  150. );
  151. /**
  152. * 服务运行
  153. * @return void
  154. */
  155. public static function run()
  156. {
  157. // 输出信息
  158. self::notice("Workerman is starting ...", true);
  159. // 检查环境
  160. self::checkEnv();
  161. // 初始化
  162. self::init();
  163. // 变成守护进程
  164. self::daemonize();
  165. // 保存进程pid
  166. self::savePid();
  167. // 安装信号
  168. self::installSignal();
  169. // 创建监听套接字
  170. self::createListeningSockets();
  171. // 创建worker进程
  172. self::spawnWorkers();
  173. // 输出信息
  174. self::notice("\033[1A\n\033[KWorkerman start success ...\033[0m", true);
  175. // 标记服务状态为运行中
  176. self::$serviceStatus = self::STATUS_RUNNING;
  177. // 初始化任务
  178. Task::init();
  179. // 关闭标准输出
  180. self::resetStdFd();
  181. // 主循环
  182. self::loop();
  183. }
  184. /**
  185. * 初始化 配置、进程名、共享内存、消息队列等
  186. * @return void
  187. */
  188. public static function init()
  189. {
  190. // 因为子进程要更换用户、开低端口等,必须是root启动
  191. if($user_info = posix_getpwuid(posix_getuid()))
  192. {
  193. if($user_info['name'] !== 'root')
  194. {
  195. exit("\033[31;40mYou should run workerman as root . Permission denied\033[0m\n");
  196. }
  197. }
  198. // 获取配置文件
  199. $config_path = Config::$configFile;
  200. // 设置进程名称,如果支持的话
  201. self::setProcTitle(self::NAME.':master with-config:' . $config_path);
  202. // 初始化共享内存消息队列
  203. if(extension_loaded('sysvmsg') && extension_loaded('sysvshm'))
  204. {
  205. self::$shmId = shm_attach(IPC_KEY, DEFAULT_SHM_SIZE);
  206. self::$queueId = msg_get_queue(IPC_KEY);
  207. msg_set_queue(self::$queueId,array('msg_qbytes'=>DEFAULT_MSG_QBYTES));
  208. }
  209. }
  210. /**
  211. * 检查环境配置
  212. * @return void
  213. */
  214. public static function checkEnv()
  215. {
  216. // 检查PID文件
  217. Checker::checkPidFile();
  218. // 检查扩展支持情况
  219. Checker::checkExtension();
  220. // 检查函数禁用情况
  221. Checker::checkDisableFunction();
  222. // 检查log目录是否可读
  223. Log::init();
  224. // 检查配置和语法错误等
  225. Checker::checkWorkersConfig();
  226. // 检查文件限制
  227. Checker::checkLimit();
  228. }
  229. /**
  230. * 使之脱离终端,变为守护进程
  231. * @return void
  232. */
  233. protected static function daemonize()
  234. {
  235. // 设置umask
  236. umask(0);
  237. // fork一次
  238. $pid = pcntl_fork();
  239. if(-1 == $pid)
  240. {
  241. // 出错退出
  242. exit("Can not fork");
  243. }
  244. elseif($pid > 0)
  245. {
  246. // 父进程,退出
  247. exit(0);
  248. }
  249. // 成为session leader
  250. if(-1 == posix_setsid())
  251. {
  252. // 出错退出
  253. exit("Setsid fail");
  254. }
  255. // 再fork一次,防止在符合SVR4标准的系统下进程再次获得终端
  256. $pid2 = pcntl_fork();
  257. if(-1 == $pid2)
  258. {
  259. // 出错退出
  260. exit("Can not fork");
  261. }
  262. elseif(0 !== $pid2)
  263. {
  264. // 禁止进程重新打开控制终端
  265. exit(0);
  266. }
  267. // 记录服务启动时间
  268. self::$serviceStatusInfo['start_time'] = time();
  269. }
  270. /**
  271. * 保存主进程pid
  272. * @return void
  273. */
  274. public static function savePid()
  275. {
  276. // 保存在变量中
  277. self::$masterPid = posix_getpid();
  278. // 保存到文件中,用于实现停止、重启
  279. if(false === @file_put_contents(WORKERMAN_PID_FILE, self::$masterPid))
  280. {
  281. exit("\033[31;40mCan not save pid to pid-file(" . WORKERMAN_PID_FILE . ")\033[0m\n\n\033[31;40mServer start fail\033[0m\n\n");
  282. }
  283. // 更改权限
  284. chmod(WORKERMAN_PID_FILE, 0644);
  285. }
  286. /**
  287. * 获取主进程pid
  288. * @return int
  289. */
  290. public static function getMasterPid()
  291. {
  292. return self::$masterPid;
  293. }
  294. /**
  295. * 根据配置文件,创建监听套接字
  296. * @return void
  297. */
  298. protected static function createListeningSockets()
  299. {
  300. // 循环读取配置创建socket
  301. foreach (Config::getAllWorkers() as $worker_name=>$config)
  302. {
  303. if(isset($config['listen']))
  304. {
  305. $context = self::getSocketContext($worker_name);
  306. $flags = substr($config['listen'], 0, 3) == 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
  307. $error_no = 0;
  308. $error_msg = '';
  309. // 创建监听socket
  310. if($context)
  311. {
  312. self::$listenedSocketsArray[$worker_name] = stream_socket_server($config['listen'], $error_no, $error_msg, $flags, $context);
  313. }
  314. else
  315. {
  316. self::$listenedSocketsArray[$worker_name] = stream_socket_server($config['listen'], $error_no, $error_msg, $flags);
  317. }
  318. if(!self::$listenedSocketsArray[$worker_name])
  319. {
  320. Log::add("can not create socket {$config['listen']} info:{$error_no} {$error_msg}\tServer start fail");
  321. exit("\n\033[31;40mCan not create socket {$config['listen']} {$error_msg}\033[0m\n\n\033[31;40mWorkerman start fail\033[0m\n\n");
  322. }
  323. }
  324. }
  325. }
  326. /**
  327. * 根据配置文件创建Workers
  328. * @return void
  329. */
  330. protected static function spawnWorkers()
  331. {
  332. // 生成一定量的worker进程
  333. foreach (Config::getAllWorkers() as $worker_name=>$config)
  334. {
  335. // 初始化
  336. if(empty(self::$workerPidMap[$worker_name]))
  337. {
  338. self::$workerPidMap[$worker_name] = array();
  339. }
  340. while(count(self::$workerPidMap[$worker_name]) < $config['start_workers'])
  341. {
  342. // 子进程退出
  343. if(self::createOneWorker($worker_name) == 0)
  344. {
  345. self::notice("Worker exit unexpected");
  346. exit(500);
  347. }
  348. }
  349. }
  350. }
  351. /**
  352. * 创建一个worker进程
  353. * @param string $worker_name 服务名
  354. * @return int 父进程:>0得到新worker的pid ;<0 出错; 子进程:始终为0
  355. */
  356. protected static function createOneWorker($worker_name)
  357. {
  358. // 创建子进程
  359. $pid = pcntl_fork();
  360. // 先处理收到的信号
  361. pcntl_signal_dispatch();
  362. // 父进程
  363. if($pid > 0)
  364. {
  365. // 初始化master的一些东东
  366. self::$workerPidMap[$worker_name][$pid] = $pid;
  367. // 更新进程信息到共享内存
  368. self::updateStatusToShm();
  369. return $pid;
  370. }
  371. // 子进程
  372. elseif($pid === 0)
  373. {
  374. // 忽略信号
  375. self::ignoreSignal();
  376. // 清空任务
  377. Task::delAll();
  378. // 关闭不用的监听socket
  379. foreach(self::$listenedSocketsArray as $tmp_worker_name => $tmp_socket)
  380. {
  381. if($tmp_worker_name != $worker_name)
  382. {
  383. fclose($tmp_socket);
  384. }
  385. }
  386. // 尝试以指定用户运行worker进程
  387. if($worker_user = Config::get($worker_name . '.user'))
  388. {
  389. self::setProcUser($worker_user);
  390. }
  391. // 关闭输出
  392. self::resetStdFd(Config::get($worker_name.'.no_debug'));
  393. // 尝试设置子进程进程名称
  394. self::setWorkerProcTitle($worker_name);
  395. // 查找worker文件
  396. $worker_file = Config::get($worker_name.'.worker_file');
  397. $class_name = basename($worker_file, '.php');
  398. // 如果有语法错误 sleep 5秒 避免狂刷日志
  399. if(Checker::checkSyntaxError($worker_file, $class_name))
  400. {
  401. sleep(5);
  402. }
  403. require_once $worker_file;
  404. // 创建实例
  405. $worker = new $class_name($worker_name);
  406. // 如果该worker有配置监听端口,则将监听端口的socket传递给子进程
  407. if(isset(self::$listenedSocketsArray[$worker_name]))
  408. {
  409. $worker->setListendSocket(self::$listenedSocketsArray[$worker_name]);
  410. }
  411. // 使worker开始服务
  412. $worker->start();
  413. return 0;
  414. }
  415. // 出错
  416. else
  417. {
  418. self::notice("create worker fail worker_name:$worker_name detail:pcntl_fork fail");
  419. return $pid;
  420. }
  421. }
  422. /**
  423. * 安装相关信号控制器
  424. * @return void
  425. */
  426. protected static function installSignal()
  427. {
  428. // 设置终止信号处理函数
  429. pcntl_signal(SIGINT, array('\Man\Core\Master', 'signalHandler'), false);
  430. // 设置SIGUSR1信号处理函数,测试用
  431. pcntl_signal(SIGUSR1, array('\Man\Core\Master', 'signalHandler'), false);
  432. // 设置SIGUSR2信号处理函数,平滑重启Server
  433. pcntl_signal(SIGHUP, array('\Man\Core\Master', 'signalHandler'), false);
  434. // 设置子进程退出信号处理函数
  435. pcntl_signal(SIGCHLD, array('\Man\Core\Master', 'signalHandler'), false);
  436. // 设置忽略信号
  437. pcntl_signal(SIGPIPE, SIG_IGN);
  438. pcntl_signal(SIGTTIN, SIG_IGN);
  439. pcntl_signal(SIGTTOU, SIG_IGN);
  440. pcntl_signal(SIGQUIT, SIG_IGN);
  441. pcntl_signal(SIGALRM, SIG_IGN);
  442. }
  443. /**
  444. * 忽略信号
  445. * @return void
  446. */
  447. protected static function ignoreSignal()
  448. {
  449. // 设置忽略信号
  450. pcntl_signal(SIGPIPE, SIG_IGN);
  451. pcntl_signal(SIGTTIN, SIG_IGN);
  452. pcntl_signal(SIGTTOU, SIG_IGN);
  453. pcntl_signal(SIGQUIT, SIG_IGN);
  454. pcntl_signal(SIGALRM, SIG_IGN);
  455. pcntl_signal(SIGINT, SIG_IGN);
  456. pcntl_signal(SIGUSR1, SIG_IGN);
  457. pcntl_signal(SIGUSR2, SIG_IGN);
  458. pcntl_signal(SIGHUP, SIG_IGN);
  459. }
  460. /**
  461. * 设置server信号处理函数
  462. * @param null $null
  463. * @param int $signal
  464. * @return void
  465. */
  466. public static function signalHandler($signal)
  467. {
  468. switch($signal)
  469. {
  470. // 停止服务信号
  471. case SIGINT:
  472. self::notice("Workerman is shutting down");
  473. self::stop();
  474. break;
  475. // 测试用
  476. case SIGUSR1:
  477. break;
  478. // worker退出信号
  479. case SIGCHLD:
  480. // 这里什么也不做
  481. // self::checkWorkerExit();
  482. break;
  483. // 平滑重启server信号
  484. case SIGHUP:
  485. Config::reload();
  486. self::notice("Workerman reloading");
  487. $pid_worker_name_map = self::getPidWorkerNameMap();
  488. $pids_to_restart = array();
  489. foreach($pid_worker_name_map as $pid=>$worker_name)
  490. {
  491. // 如果对应进程配置了不热启动则不重启对应进程
  492. if(Config::get($worker_name.'.no_reload'))
  493. {
  494. // 发送reload信号,以便触发onReload方法
  495. posix_kill($pid, SIGHUP);
  496. continue;
  497. }
  498. $pids_to_restart[] = $pid;
  499. }
  500. self::addToRestartPids($pids_to_restart);
  501. self::restartPids();
  502. break;
  503. }
  504. }
  505. /**
  506. * 设置子进程进程名称
  507. * @param string $worker_name
  508. * @return void
  509. */
  510. public static function setWorkerProcTitle($worker_name)
  511. {
  512. if(isset(self::$listenedSocketsArray[$worker_name]))
  513. {
  514. // 获得socket的信息
  515. $sock_name = stream_socket_get_name(self::$listenedSocketsArray[$worker_name], false);
  516. // 更改进程名,如果支持的话
  517. $mata_data = stream_get_meta_data(self::$listenedSocketsArray[$worker_name]);
  518. $protocol = substr($mata_data['stream_type'], 0, 3);
  519. self::setProcTitle(self::NAME.":worker $worker_name {$protocol}://$sock_name");
  520. }
  521. else
  522. {
  523. self::setProcTitle(self::NAME.":worker $worker_name");
  524. }
  525. }
  526. /**
  527. * 主进程主循环 主要是监听子进程退出、服务终止、平滑重启信号
  528. * @return void
  529. */
  530. public static function loop()
  531. {
  532. while(1)
  533. {
  534. sleep(1);
  535. // 检查是否有进程退出
  536. self::checkWorkerExit();
  537. // 检查终端是否关闭
  538. self::checkTty();
  539. // 触发信号处理
  540. pcntl_signal_dispatch();
  541. }
  542. }
  543. /**
  544. * 监控worker进程状态,退出重启
  545. * @param resource $channel
  546. * @param int $flag
  547. * @param int $pid 退出的进程id
  548. * @return mixed
  549. */
  550. public static function checkWorkerExit()
  551. {
  552. // 由于SIGCHLD信号可能重叠导致信号丢失,所以这里要循环获取所有退出的进程id
  553. while(($pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG)) != 0)
  554. {
  555. // 如果是重启的进程,则继续重启进程
  556. if(isset(self::$pidsToRestart[$pid]) && self::$serviceStatus != self::STATUS_SHUTDOWN)
  557. {
  558. unset(self::$pidsToRestart[$pid]);
  559. self::restartPids();
  560. }
  561. // 出错
  562. if($pid < 0)
  563. {
  564. $last_error = function_exists('pcntl_get_last_error') ? pcntl_get_last_error() : 'function pcntl_get_last_error not exists';
  565. self::notice('pcntl_waitpid return '.$pid.' and pcntl_get_last_error = ' . $last_error);
  566. return $pid;
  567. }
  568. // 查找子进程对应的woker_name
  569. $pid_workname_map = self::getPidWorkerNameMap();
  570. $worker_name = isset($pid_workname_map[$pid]) ? $pid_workname_map[$pid] : '';
  571. // 没找到worker_name说明出错了
  572. if(empty($worker_name))
  573. {
  574. self::notice("child exist but not found worker_name pid:$pid");
  575. break;
  576. }
  577. // 进程退出状态不是0,说明有问题了
  578. if($status !== 0)
  579. {
  580. self::notice("worker[$pid:$worker_name] exit with status $status");
  581. }
  582. // 记录进程退出状态
  583. self::$serviceStatusInfo['worker_exit_code'][$worker_name][$status] = isset(self::$serviceStatusInfo['worker_exit_code'][$worker_name][$status]) ? self::$serviceStatusInfo['worker_exit_code'][$worker_name][$status] + 1 : 1;
  584. // 更新状态到共享内存
  585. self::updateStatusToShm();
  586. // 清理这个进程的数据
  587. self::clearWorker($worker_name, $pid);
  588. // 如果服务是不是关闭中
  589. if(self::$serviceStatus != self::STATUS_SHUTDOWN)
  590. {
  591. // 重新创建worker
  592. self::spawnWorkers();
  593. }
  594. // 判断是否都重启完毕
  595. else
  596. {
  597. $all_worker_pid = self::getPidWorkerNameMap();
  598. if(empty($all_worker_pid))
  599. {
  600. // 删除共享内存
  601. self::removeShmAndQueue();
  602. // 发送提示
  603. self::notice("Workerman stoped");
  604. // 删除pid文件
  605. @unlink(WORKERMAN_PID_FILE);
  606. exit(0);
  607. }
  608. }//end if
  609. }//end while
  610. }
  611. /**
  612. * 获取pid 到 worker_name 的映射
  613. * @return array ['pid1'=>'worker_name1','pid2'=>'worker_name2', ...]
  614. */
  615. public static function getPidWorkerNameMap()
  616. {
  617. $all_pid = array();
  618. foreach(self::$workerPidMap as $worker_name=>$pid_array)
  619. {
  620. foreach($pid_array as $pid)
  621. {
  622. $all_pid[$pid] = $worker_name;
  623. }
  624. }
  625. return $all_pid;
  626. }
  627. /**
  628. * 放入重启队列中
  629. * @param array $restart_pids
  630. * @return void
  631. */
  632. public static function addToRestartPids($restart_pids)
  633. {
  634. if(!is_array($restart_pids))
  635. {
  636. self::notice("addToRestartPids(".var_export($restart_pids, true).") \$restart_pids not array");
  637. return false;
  638. }
  639. // 将pid放入重启队列
  640. foreach($restart_pids as $pid)
  641. {
  642. if(!isset(self::$pidsToRestart[$pid]))
  643. {
  644. // 重启时间=0
  645. self::$pidsToRestart[$pid] = 0;
  646. }
  647. }
  648. }
  649. /**
  650. * 重启workers
  651. * @return void
  652. */
  653. public static function restartPids()
  654. {
  655. // 标记server状态
  656. if(self::$serviceStatus != self::STATUS_RESTARTING_WORKERS && self::$serviceStatus != self::STATUS_SHUTDOWN)
  657. {
  658. self::$serviceStatus = self::STATUS_RESTARTING_WORKERS;
  659. }
  660. // 没有要重启的进程了
  661. if(empty(self::$pidsToRestart))
  662. {
  663. self::$serviceStatus = self::STATUS_RUNNING;
  664. self::notice("\nWorker Restart Success");
  665. return true;
  666. }
  667. // 遍历要重启的进程 标记它们重启时间
  668. foreach(self::$pidsToRestart as $pid => $stop_time)
  669. {
  670. if($stop_time == 0)
  671. {
  672. self::$pidsToRestart[$pid] = time();
  673. posix_kill($pid, SIGHUP);
  674. Task::add(self::KILL_WORKER_TIME_LONG, array('\Man\Core\Master', 'forceKillWorker'), array($pid), false);
  675. break;
  676. }
  677. }
  678. }
  679. /**
  680. * worker进程退出时,master进程的一些清理工作
  681. * @param string $worker_name
  682. * @param int $pid
  683. * @return void
  684. */
  685. protected static function clearWorker($worker_name, $pid)
  686. {
  687. // 释放一些不用了的数据
  688. unset(self::$pidsToRestart[$pid], self::$workerPidMap[$worker_name][$pid]);
  689. }
  690. /**
  691. * 停止服务
  692. * @return void
  693. */
  694. public static function stop()
  695. {
  696. // 如果没有子进程则直接退出
  697. $all_worker_pid = self::getPidWorkerNameMap();
  698. if(empty($all_worker_pid))
  699. {
  700. exit(0);
  701. }
  702. // 标记server开始关闭
  703. self::$serviceStatus = self::STATUS_SHUTDOWN;
  704. // killWorkerTimeLong 秒后如果还没停止则强制杀死所有进程
  705. Task::add(self::KILL_WORKER_TIME_LONG, array('\Man\Core\Master', 'stopAllWorker'), array(true), false);
  706. // 停止所有worker
  707. self::stopAllWorker();
  708. }
  709. /**
  710. * 停止所有worker
  711. * @param bool $force 是否强制退出
  712. * @return void
  713. */
  714. public static function stopAllWorker($force = false)
  715. {
  716. // 获得所有pid
  717. $all_worker_pid = self::getPidWorkerNameMap();
  718. // 强行杀死
  719. if($force)
  720. {
  721. // 杀死所有子进程
  722. foreach($all_worker_pid as $pid=>$worker_name)
  723. {
  724. // 发送SIGKILL信号
  725. self::forceKillWorker($pid);
  726. self::notice("Kill workers[$worker_name] force");
  727. }
  728. }
  729. else
  730. {
  731. // 向所有子进程发送终止信号
  732. foreach($all_worker_pid as $pid=>$worker_name)
  733. {
  734. // 发送SIGINT信号
  735. posix_kill($pid, SIGINT);
  736. }
  737. }
  738. }
  739. /**
  740. * 强制杀死进程
  741. * @param int $pid
  742. * @return void
  743. */
  744. public static function forceKillWorker($pid)
  745. {
  746. if(posix_kill($pid, 0))
  747. {
  748. self::notice("Kill workers $pid force!");
  749. posix_kill($pid, SIGKILL);
  750. }
  751. }
  752. /**
  753. * 设置运行用户
  754. * @param string $worker_user
  755. * @return void
  756. */
  757. protected static function setProcUser($worker_user)
  758. {
  759. $user_info = posix_getpwnam($worker_user);
  760. if($user_info['uid'] != posix_getuid() || $user_info['gid'] != posix_getgid())
  761. {
  762. // 尝试设置gid uid
  763. if(!posix_setgid($user_info['gid']) || !posix_setuid($user_info['uid']))
  764. {
  765. self::notice( 'Notice : Can not run woker as '.$worker_user." , You shuld be root\n", true);
  766. }
  767. }
  768. }
  769. /**
  770. * 获取共享内存资源id
  771. * @return resource
  772. */
  773. public static function getShmId()
  774. {
  775. return self::$shmId;
  776. }
  777. /**
  778. * 获取消息队列资源id
  779. * @return resource
  780. */
  781. public static function getQueueId()
  782. {
  783. return self::$queueId;
  784. }
  785. /**
  786. * 关闭标准输入输出
  787. * @return void
  788. */
  789. protected static function resetStdFd($force = false)
  790. {
  791. // 已经重定向了标准输出
  792. if(self::$hasResetStd)
  793. {
  794. return;
  795. }
  796. // 将标准输出重定向到$stdout_file
  797. global $STDOUT, $STDERR;
  798. $stdout_file = Config::get('workerman.stdout_file');
  799. if($stdout_file)
  800. {
  801. $handle = fopen($stdout_file,"a");
  802. if($handle)
  803. {
  804. unset($handle);
  805. @fclose(STDOUT);
  806. @fclose(STDERR);
  807. $STDOUT = fopen($stdout_file,"a");
  808. $STDERR = fopen($stdout_file,"a");
  809. self::$hasResetStd = true;
  810. return;
  811. }
  812. }
  813. // 如果此进程配置是no_debug,则关闭输出
  814. if(!$force)
  815. {
  816. // 没有设置 stdout_file并且debug开启并且终端没有关闭则不关闭标准输出,用于调试
  817. if(Config::get('workerman.debug') == 1 && @posix_ttyname(STDOUT))
  818. {
  819. return ;
  820. }
  821. }
  822. @fclose(STDOUT);
  823. @fclose(STDERR);
  824. $STDOUT = fopen('/dev/null',"a");
  825. $STDERR = fopen('/dev/null',"a");
  826. self::$hasResetStd = true;
  827. }
  828. /**
  829. * 是否已经重置了fd
  830. * @return boolean
  831. */
  832. public static function hasResetFd()
  833. {
  834. return self::$hasResetStd;
  835. }
  836. /**
  837. * 更新主进程收集的状态信息到共享内存
  838. * @return bool
  839. */
  840. protected static function updateStatusToShm()
  841. {
  842. if(!self::$shmId)
  843. {
  844. return true;
  845. }
  846. return shm_put_var(self::$shmId, self::STATUS_VAR_ID, array_merge(self::$serviceStatusInfo, array('pid_map'=>self::$workerPidMap)));
  847. }
  848. /**
  849. * 销毁共享内存以及消息队列
  850. * @return void
  851. */
  852. protected static function removeShmAndQueue()
  853. {
  854. if(self::$shmId)
  855. {
  856. shm_remove(self::$shmId);
  857. }
  858. if(self::$queueId)
  859. {
  860. msg_remove_queue(self::$queueId);
  861. }
  862. }
  863. /**
  864. * 设置进程名称,需要proctitle支持 或者php>=5.5
  865. * @param string $title
  866. * @return void
  867. */
  868. protected static function setProcTitle($title)
  869. {
  870. // >=php 5.5
  871. if (function_exists('cli_set_process_title'))
  872. {
  873. @cli_set_process_title($title);
  874. }
  875. // 需要扩展
  876. elseif(extension_loaded('proctitle') && function_exists('setproctitle'))
  877. {
  878. @setproctitle($title);
  879. }
  880. }
  881. /**
  882. * 获得socket的上下文选项
  883. * @param string $worker_name
  884. * @return resource
  885. */
  886. protected static function getSocketContext($worker_name)
  887. {
  888. $context = null;
  889. // 根据手册5.3.3之前版本stream_socket_server 不支持 backlog 选项
  890. if(version_compare(PHP_VERSION, '5.3.3') < 0)
  891. {
  892. return $context;
  893. }
  894. // 读取worker的backlog
  895. $backlog = (int)Config::get($worker_name . '.backlog');
  896. // 没有设置或者不合法则尝试使用workerman.conf中的backlog设置
  897. if($backlog <= 0)
  898. {
  899. $backlog = (int)Config::get('workerman.backlog');
  900. }
  901. // 都没设置backlog,使用默认值
  902. if($backlog <= 0)
  903. {
  904. $backlog = self::DEFAULT_BACKLOG;
  905. }
  906. // backlog选项
  907. $opts = array(
  908. 'socket' => array(
  909. 'backlog' => $backlog,
  910. ),
  911. );
  912. // 返回上下文
  913. $context = stream_context_create($opts);
  914. return $context;
  915. }
  916. /**
  917. * 检查控制终端是否已经关闭, 如果控制终端关闭,则停止打印数据到终端
  918. * @return void
  919. */
  920. public static function checkTty()
  921. {
  922. // 已经重置了fd,就不检查终端是否关闭,因为不会打印东西到屏幕了
  923. if(self::$hasResetStd)
  924. {
  925. return;
  926. }
  927. // 没开启debug不打印数据到屏幕,不检测终端是否关闭
  928. if(!Config::get('workerman.debug'))
  929. {
  930. return;
  931. }
  932. // 检测终端是否关闭
  933. if(!self::$terminalClosed && !@posix_ttyname(STDOUT))
  934. {
  935. self::resetStdFd(true);
  936. // 日志
  937. self::notice("terminal closed and reset workers fd");
  938. // 获取所有子进程pid
  939. $all_worker_pid = self::getPidWorkerNameMap();
  940. // 向所有子进程发送重置标准输入输出信号
  941. foreach($all_worker_pid as $pid=>$worker_name)
  942. {
  943. // 发送SIGTTOU信号
  944. posix_kill($pid, SIGTTOU);
  945. }
  946. // 设置标记
  947. self::$terminalClosed = true;
  948. }
  949. }
  950. /**
  951. * notice,记录到日志
  952. * @param string $msg
  953. * @param bool $display
  954. * @return void
  955. */
  956. public static function notice($msg, $display = false)
  957. {
  958. Log::add("Server:".trim($msg));
  959. if($display)
  960. {
  961. if(self::$serviceStatus == self::STATUS_STARTING && @posix_ttyname(STDOUT))
  962. {
  963. echo($msg."\n");
  964. }
  965. }
  966. }
  967. }