Master.php 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. <?php
  2. namespace WORKERMAN\Core;
  3. error_reporting(E_ALL);
  4. ini_set('display_errors', 'on');
  5. ini_set('limit_memory','512M');
  6. date_default_timezone_set('Asia/Shanghai');
  7. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Checker.php';
  8. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Config.php';
  9. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Task.php';
  10. require_once WORKERMAN_ROOT_DIR . 'Core/Lib/Log.php';
  11. /**
  12. *
  13. * 主进程
  14. *
  15. * @package Core
  16. *
  17. * @author walkor <worker-man@qq.com>
  18. * <b>使用示例:</b>
  19. * <pre>
  20. * <code>
  21. * Master::run();
  22. * <code>
  23. * </pre>
  24. *
  25. */
  26. class Master
  27. {
  28. /**
  29. * 版本
  30. * @var string
  31. */
  32. const VERSION = '2.0.1';
  33. /**
  34. * 服务名
  35. * @var string
  36. */
  37. const NAME = 'WorkerMan';
  38. /**
  39. * 服务状态 启动中
  40. * @var integer
  41. */
  42. const STATUS_STARTING = 1;
  43. /**
  44. * 服务状态 运行中
  45. * @var integer
  46. */
  47. const STATUS_RUNNING = 2;
  48. /**
  49. * 服务状态 关闭中
  50. * @var integer
  51. */
  52. const STATUS_SHUTDOWN = 4;
  53. /**
  54. * 服务状态 平滑重启中
  55. * @var integer
  56. */
  57. const STATUS_RESTARTING_WORKERS = 8;
  58. /**
  59. * 整个服务能够启动的最大进程数
  60. * @var integer
  61. */
  62. const SERVER_MAX_WORKER_COUNT = 5000;
  63. /**
  64. * 单个进程打开文件数限制
  65. * @var integer
  66. */
  67. const MIN_SOFT_OPEN_FILES = 10000;
  68. /**
  69. * 单个进程打开文件数限制 硬性限制
  70. * @var integer
  71. */
  72. const MIN_HARD_OPEN_FILES = 10000;
  73. /**
  74. * 共享内存中用于存储主进程统计信息的变量id
  75. * @var integer
  76. */
  77. const STATUS_VAR_ID = 1;
  78. /**
  79. * 发送停止命令多久后worker没退出则发送sigkill信号
  80. * @var integer
  81. */
  82. const KILL_WORKER_TIME_LONG = 4;
  83. /**
  84. * 用于保存所有子进程pid ['worker_name1'=>[pid1=>pid1,pid2=>pid2,..], 'worker_name2'=>[pid3,..], ...]
  85. * @var array
  86. */
  87. protected static $workerPids = array();
  88. /**
  89. * 服务的状态,默认是启动中
  90. * @var integer
  91. */
  92. protected static $serverStatus = self::STATUS_STARTING;
  93. /**
  94. * 用来监听端口的Socket数组,用来fork worker使用
  95. * @var array
  96. */
  97. protected static $listenedSockets = array();
  98. /**
  99. * 要重启的worker的pid数组 [pid1=>time_stamp, pid2=>time_stamp, ..]
  100. * @var array
  101. */
  102. protected static $workerToRestart = array();
  103. /**
  104. * 共享内存resource id
  105. * @var resource
  106. */
  107. protected static $shmId = 0;
  108. /**
  109. * 消息队列 resource id
  110. * @var resource
  111. */
  112. protected static $queueId = 0;
  113. /**
  114. * master进程pid
  115. * @var integer
  116. */
  117. protected static $masterPid = 0;
  118. /**
  119. * server统计信息 ['start_time'=>time_stamp, 'worker_exit_code'=>['worker_name1'=>[code1=>count1, code2=>count2,..], 'worker_name2'=>[code3=>count3,...], ..] ]
  120. * @var array
  121. */
  122. protected static $serverStatusInfo = array(
  123. 'start_time' => 0,
  124. 'worker_exit_code' => array(),
  125. );
  126. /**
  127. * 服务运行
  128. * @return void
  129. */
  130. public static function run()
  131. {
  132. self::notice("Server is starting ...", true);
  133. // 初始化
  134. self::init();
  135. // 检查环境
  136. self::checkEnv();
  137. // 变成守护进程
  138. self::daemonize();
  139. // 保存进程pid
  140. self::savePid();
  141. // 安装信号
  142. self::installSignal();
  143. // 创建监听套接字
  144. self::createSocketsAndListen();
  145. // 创建worker进程
  146. self::createWorkers();
  147. self::notice("Server start success ...", true);
  148. // 标记sever状态为运行中...
  149. self::$serverStatus = self::STATUS_RUNNING;
  150. // 关闭标准输出
  151. self::resetStdFd();
  152. // 主循环
  153. self::loop();
  154. }
  155. /**
  156. * 初始化 配置、进程名、共享内存、消息队列等
  157. * @return void
  158. */
  159. public static function init()
  160. {
  161. // 获取配置文件
  162. $config_path = Lib\Config::instance()->filename;
  163. // 设置进程名称,如果支持的话
  164. self::setProcessTitle(self::NAME.':master with-config:' . $config_path);
  165. // 初始化共享内存消息队列
  166. if(extension_loaded('sysvmsg') && extension_loaded('sysvshm'))
  167. {
  168. self::$shmId = shm_attach(IPC_KEY, DEFAULT_SHM_SIZE, 0666);
  169. self::$queueId = msg_get_queue(IPC_KEY, 0666);
  170. msg_set_queue(self::$queueId,array('msg_qbytes'=>65535));
  171. }
  172. }
  173. /**
  174. * 检查环境配置
  175. * @return void
  176. */
  177. public static function checkEnv()
  178. {
  179. Lib\Checker::checkPidFile();
  180. // 检查扩展支持情况
  181. Lib\Checker::checkExtension();
  182. // 检查函数禁用情况
  183. Lib\Checker::checkDisableFunction();
  184. // 检查log目录是否可读
  185. Lib\Log::init();
  186. // 检查配置和语法错误等
  187. Lib\Checker::checkWorkersConfig();
  188. // 检查文件限制
  189. Lib\Checker::checkLimit();
  190. }
  191. /**
  192. * 使之脱离终端,变为守护进程
  193. * @return void
  194. */
  195. protected static function daemonize()
  196. {
  197. // 设置umask
  198. umask(0);
  199. // fork一次
  200. $pid = pcntl_fork();
  201. if(-1 == $pid)
  202. {
  203. // 出错退出
  204. exit("Daemonize fail ,can not fork");
  205. }
  206. elseif($pid > 0)
  207. {
  208. // 父进程,退出
  209. exit(0);
  210. }
  211. // 子进程使之成为session leader
  212. if(-1 == posix_setsid())
  213. {
  214. // 出错退出
  215. exit("Daemonize fail ,setsid fail");
  216. }
  217. // 再fork一次
  218. $pid2 = pcntl_fork();
  219. if(-1 == $pid2)
  220. {
  221. // 出错退出
  222. exit("Daemonize fail ,can not fork");
  223. }
  224. elseif(0 !== $pid2)
  225. {
  226. // 结束第一子进程,用来禁止进程重新打开控制终端
  227. exit(0);
  228. }
  229. // 记录server启动时间
  230. self::$serverStatusInfo['start_time'] = time();
  231. }
  232. /**
  233. * 保存主进程pid
  234. */
  235. public static function savePid()
  236. {
  237. // 保存在变量中
  238. self::$masterPid = posix_getpid();
  239. // 保存到文件中,用于实现停止、重启
  240. if(false === @file_put_contents(WORKERMAN_PID_FILE, self::$masterPid))
  241. {
  242. exit("\033[31;40mCan not save pid to pid-file(" . WORKERMAN_PID_FILE . ")\033[0m\n\n\033[31;40mServer start fail\033[0m\n\n");
  243. }
  244. // 更改权限
  245. chmod(WORKERMAN_PID_FILE, 0644);
  246. }
  247. /**
  248. * 获取主进程pid
  249. * @return int
  250. */
  251. public static function getMasterPid()
  252. {
  253. return self::$masterPid;
  254. }
  255. /**
  256. * 根据配置文件,创建监听套接字
  257. * @return void
  258. */
  259. protected static function createSocketsAndListen()
  260. {
  261. // 循环读取配置创建socket
  262. foreach (Lib\Config::get('workers') as $worker_name=>$config)
  263. {
  264. if(isset($config['socket']))
  265. {
  266. $flags = $config['socket']['protocol'] == 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
  267. $ip = isset($config['socket']['ip']) ? $config['socket']['ip'] : "0.0.0.0";
  268. $error_no = 0;
  269. $error_msg = '';
  270. // 创建监听socket
  271. self::$listenedSockets[$worker_name] = stream_socket_server("{$config['socket']['protocol']}://{$ip}:{$config['socket']['port']}", $error_no, $error_msg, $flags);
  272. if(!self::$listenedSockets[$worker_name])
  273. {
  274. Lib\Log::add("can not create socket {$config['socket']['protocol']}://{$ip}:{$config['socket']['port']} info:{$error_no} {$error_msg}\tServer start fail");
  275. exit("\n\033[31;40mcan not create socket {$config['socket']['protocol']}://{$ip}:{$config['socket']['port']} info:{$error_no} {$error_msg}\033[0m\n\n\033[31;40mServer start fail\033[0m\n\n");
  276. }
  277. }
  278. }
  279. }
  280. /**
  281. * 根据配置文件创建Workers
  282. * @return void
  283. */
  284. protected static function createWorkers()
  285. {
  286. // 循环读取配置创建一定量的worker进程
  287. foreach (Lib\Config::get('workers') as $worker_name=>$config)
  288. {
  289. // 初始化
  290. if(empty(self::$workerPids[$worker_name]))
  291. {
  292. self::$workerPids[$worker_name] = array();
  293. }
  294. while(count(self::$workerPids[$worker_name]) < $config['children_count'])
  295. {
  296. $pid = self::forkOneWorker($worker_name);
  297. // 子进程退出
  298. if($pid == 0)
  299. {
  300. self::notice("CHILD EXIT ERR");
  301. }
  302. }
  303. }
  304. }
  305. /**
  306. * 创建一个worker进程
  307. * @param string $worker_name worker的名称
  308. * @return int 父进程:>0得到新worker的pid ;<0 出错; 子进程:始终为0
  309. */
  310. protected static function forkOneWorker($worker_name)
  311. {
  312. // 创建子进程
  313. $pid = pcntl_fork();
  314. // 先处理收到的信号
  315. pcntl_signal_dispatch();
  316. // 父进程
  317. if($pid > 0)
  318. {
  319. // 初始化master的一些东东
  320. self::$workerPids[$worker_name][$pid] = $pid;
  321. // 更新进程信息到共享内存
  322. self::updateStatusToShm();
  323. return $pid;
  324. }
  325. // 子进程
  326. elseif($pid === 0)
  327. {
  328. // 忽略信号
  329. self::ignoreSignal();
  330. // 清空任务
  331. Lib\Task::delAll();
  332. // 关闭不用的监听socket
  333. foreach(self::$listenedSockets as $tmp_worker_name => $tmp_socket)
  334. {
  335. if($tmp_worker_name != $worker_name)
  336. {
  337. fclose($tmp_socket);
  338. }
  339. }
  340. // 尝试以指定用户运行worker
  341. if($worker_user = Lib\Config::get('workers.' . $worker_name . '.user'))
  342. {
  343. self::setWorkerUser($worker_user);
  344. }
  345. // 关闭输出
  346. self::resetStdFd();
  347. // 尝试设置子进程进程名称
  348. self::setWorkerProcessTitle($worker_name);
  349. // 创建worker实例
  350. include_once WORKERMAN_ROOT_DIR . "Workers/$worker_name.php";
  351. $worker = new $worker_name($worker_name);
  352. // 如果该worker有配置监听端口,则将监听端口的socket传递给子进程
  353. if(isset(self::$listenedSockets[$worker_name]))
  354. {
  355. $worker->setListendSocket(self::$listenedSockets[$worker_name]);
  356. }
  357. // 使worker开始服务
  358. $worker->start();
  359. return 0;
  360. }
  361. // 出错
  362. else
  363. {
  364. self::notice("create worker fail worker_name:$worker_name detail:pcntl_fork fail");
  365. return $pid;
  366. }
  367. }
  368. /**
  369. * 安装相关信号控制器
  370. * @return void
  371. */
  372. protected static function installSignal()
  373. {
  374. // 设置终止信号处理函数
  375. pcntl_signal(SIGINT, array('\WORKERMAN\Core\Master', 'signalHandler'), false);
  376. // 设置SIGUSR1信号处理函数,测试用
  377. pcntl_signal(SIGUSR1, array('\WORKERMAN\Core\Master', 'signalHandler'), false);
  378. // 设置SIGUSR2信号处理函数,平滑重启Server
  379. pcntl_signal(SIGHUP, array('\WORKERMAN\Core\Master', 'signalHandler'), false);
  380. // 设置子进程退出信号处理函数
  381. pcntl_signal(SIGCHLD, array('\WORKERMAN\Core\Master', 'signalHandler'), false);
  382. // 设置忽略信号
  383. pcntl_signal(SIGPIPE, SIG_IGN);
  384. pcntl_signal(SIGTTIN, SIG_IGN);
  385. pcntl_signal(SIGTTOU, SIG_IGN);
  386. pcntl_signal(SIGQUIT, SIG_IGN);
  387. pcntl_signal(SIGALRM, SIG_IGN);
  388. }
  389. /**
  390. * 忽略信号
  391. * @return void
  392. */
  393. protected static function ignoreSignal()
  394. {
  395. // 设置忽略信号
  396. pcntl_signal(SIGPIPE, SIG_IGN);
  397. pcntl_signal(SIGTTIN, SIG_IGN);
  398. pcntl_signal(SIGTTOU, SIG_IGN);
  399. pcntl_signal(SIGQUIT, SIG_IGN);
  400. pcntl_signal(SIGALRM, SIG_IGN);
  401. pcntl_signal(SIGINT, SIG_IGN);
  402. pcntl_signal(SIGUSR1, SIG_IGN);
  403. pcntl_signal(SIGHUP, SIG_IGN);
  404. pcntl_signal(SIGCHLD, SIG_IGN);
  405. }
  406. /**
  407. * 设置server信号处理函数
  408. * @param null $null
  409. * @param int $signal
  410. * @return void
  411. */
  412. public static function signalHandler($signal)
  413. {
  414. switch($signal)
  415. {
  416. // 停止server信号
  417. case SIGINT:
  418. self::notice("Server is shutting down");
  419. self::stop();
  420. break;
  421. // 测试用
  422. case SIGUSR1:
  423. break;
  424. // worker退出信号
  425. case SIGCHLD:
  426. // 不要在这里fork,fork出来的子进程无法收到信号
  427. // self::checkWorkerExit();
  428. break;
  429. // 平滑重启server信号
  430. case SIGHUP:
  431. Lib\Config::reload();
  432. self::notice("Server reloading");
  433. self::addToRestartWorkers(array_keys(self::getPidWorkerNameMap()));
  434. self::restartWorkers();
  435. break;
  436. }
  437. }
  438. /**
  439. * 设置子进程进程名称
  440. * @param string $worker_name
  441. * @return void
  442. */
  443. public static function setWorkerProcessTitle($worker_name)
  444. {
  445. if(isset(self::$listenedSockets[$worker_name]))
  446. {
  447. // 获得socket的信息
  448. $sock_name = stream_socket_get_name(self::$listenedSockets[$worker_name], false);
  449. // 更改进程名,如果支持的话
  450. $mata_data = stream_get_meta_data(self::$listenedSockets[$worker_name]);
  451. $protocol = substr($mata_data['stream_type'], 0, 3);
  452. self::setProcessTitle(self::NAME.":worker $worker_name {$protocol}://$sock_name");
  453. }
  454. else
  455. {
  456. self::setProcessTitle(self::NAME.":worker $worker_name");
  457. }
  458. }
  459. /**
  460. * 主进程主循环 主要是监听子进程退出、服务终止、平滑重启信号
  461. * @return void
  462. */
  463. public static function loop()
  464. {
  465. $siginfo = array();
  466. while(1)
  467. {
  468. @pcntl_sigtimedwait(array(SIGCHLD), $siginfo, 1);
  469. // 初始化任务系统
  470. Lib\Task::tick();
  471. // 检查是否有进程退出
  472. self::checkWorkerExit();
  473. // 触发信号处理
  474. pcntl_signal_dispatch();
  475. }
  476. }
  477. /**
  478. * 监控worker进程状态,退出重启
  479. * @param resource $channel
  480. * @param int $flag
  481. * @param int $pid 退出的进程id
  482. * @return mixed
  483. */
  484. public static function checkWorkerExit()
  485. {
  486. // 由于SIGCHLD信号可能重叠导致信号丢失,所以这里要循环获取所有退出的进程id
  487. while(($pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG)) != 0)
  488. {
  489. // 如果是重启的进程,则继续重启进程
  490. if(isset(self::$workerToRestart[$pid]) && self::$serverStatus != self::STATUS_SHUTDOWN)
  491. {
  492. unset(self::$workerToRestart[$pid]);
  493. self::restartWorkers();
  494. }
  495. // 出错
  496. if($pid == -1)
  497. {
  498. // 没有子进程了,可能是出现Fatal Err 了
  499. if(pcntl_get_last_error() == 10)
  500. {
  501. self::notice('Server has no workers now');
  502. }
  503. return -1;
  504. }
  505. // 查找子进程对应的woker_name
  506. $pid_workname_map = self::getPidWorkerNameMap();
  507. $worker_name = isset($pid_workname_map[$pid]) ? $pid_workname_map[$pid] : '';
  508. // 没找到worker_name说明出错了 哪里来的野孩子?
  509. if(empty($worker_name))
  510. {
  511. self::notice("child exist but not found worker_name pid:$pid");
  512. break;
  513. }
  514. // 进程退出状态不是0,说明有问题了
  515. if($status !== 0)
  516. {
  517. self::notice("worker exit status $status pid:$pid worker:$worker_name");
  518. }
  519. // 记录进程退出状态
  520. self::$serverStatusInfo['worker_exit_code'][$worker_name][$status] = isset(self::$serverStatusInfo['worker_exit_code'][$worker_name][$status]) ? self::$serverStatusInfo['worker_exit_code'][$worker_name][$status] + 1 : 1;
  521. // 更新状态到共享内存
  522. self::updateStatusToShm();
  523. // 清理这个进程的数据
  524. self::clearWorker($worker_name, $pid);
  525. // 如果服务是不是关闭中
  526. if(self::$serverStatus != self::STATUS_SHUTDOWN)
  527. {
  528. // 重新创建worker
  529. self::createWorkers();
  530. }
  531. // 判断是否都重启完毕
  532. else
  533. {
  534. $all_worker_pid = self::getPidWorkerNameMap();
  535. if(empty($all_worker_pid))
  536. {
  537. // 删除共享内存
  538. self::removeShmAndQueue();
  539. // 发送提示
  540. self::notice("Server stoped");
  541. // 删除pid文件
  542. @unlink(WORKERMAN_PID_FILE);
  543. exit(0);
  544. }
  545. }//end if
  546. }//end while
  547. }
  548. /**
  549. * 获取pid 到 worker_name 的映射
  550. * @return array ['pid1'=>'worker_name1','pid2'=>'worker_name2', ...]
  551. */
  552. public static function getPidWorkerNameMap()
  553. {
  554. $all_pid = array();
  555. foreach(self::$workerPids as $worker_name=>$pid_array)
  556. {
  557. foreach($pid_array as $pid)
  558. {
  559. $all_pid[$pid] = $worker_name;
  560. }
  561. }
  562. return $all_pid;
  563. }
  564. /**
  565. * 放入重启队列中
  566. * @param array $restart_pids
  567. * @return void
  568. */
  569. public static function addToRestartWorkers($restart_pids)
  570. {
  571. if(!is_array($restart_pids))
  572. {
  573. self::notice("addToRestartWorkers(".var_export($restart_pids, true).") \$restart_pids not array");
  574. return false;
  575. }
  576. // 将pid放入重启队列
  577. foreach($restart_pids as $pid)
  578. {
  579. if(!isset(self::$workerToRestart[$pid]))
  580. {
  581. // 重启时间=0
  582. self::$workerToRestart[$pid] = 0;
  583. }
  584. }
  585. }
  586. /**
  587. * 重启workers
  588. * @return void
  589. */
  590. public static function restartWorkers()
  591. {
  592. // 标记server状态
  593. if(self::$serverStatus != self::STATUS_RESTARTING_WORKERS && self::$serverStatus != self::STATUS_SHUTDOWN)
  594. {
  595. self::$serverStatus = self::STATUS_RESTARTING_WORKERS;
  596. }
  597. // 没有要重启的进程了
  598. if(empty(self::$workerToRestart))
  599. {
  600. self::$serverStatus = self::STATUS_RUNNING;
  601. self::notice("\nWorker Restart Success");
  602. return true;
  603. }
  604. // 遍历要重启的进程 标记它们重启时间
  605. foreach(self::$workerToRestart as $pid => $stop_time)
  606. {
  607. if($stop_time == 0)
  608. {
  609. self::$workerToRestart[$pid] = time();
  610. posix_kill($pid, SIGHUP);
  611. Lib\Task::add(self::KILL_WORKER_TIME_LONG, array('\WORKERMAN\Core\Master', 'forceKillWorker'), array($pid), false);
  612. break;
  613. }
  614. }
  615. }
  616. /**
  617. * worker进程退出时,master进程的一些清理工作
  618. * @param string $worker_name
  619. * @param int $pid
  620. * @return void
  621. */
  622. protected static function clearWorker($worker_name, $pid)
  623. {
  624. // 释放一些不用了的数据
  625. unset(self::$workerToRestart[$pid], self::$workerPids[$worker_name][$pid]);
  626. }
  627. /**
  628. * 停止服务
  629. * @return void
  630. */
  631. public static function stop()
  632. {
  633. // 如果没有子进程则直接退出
  634. $all_worker_pid = self::getPidWorkerNameMap();
  635. if(empty($all_worker_pid))
  636. {
  637. exit(0);
  638. }
  639. // 标记server开始关闭
  640. self::$serverStatus = self::STATUS_SHUTDOWN;
  641. // killWorkerTimeLong 秒后如果还没停止则强制杀死所有进程
  642. Lib\Task::add(self::KILL_WORKER_TIME_LONG, array('\WORKERMAN\Core\Master', 'stopAllWorker'), array(true), false);
  643. // 停止所有worker
  644. self::stopAllWorker();
  645. }
  646. /**
  647. * 停止所有worker
  648. * @param bool $force 是否强制退出
  649. * @return void
  650. */
  651. public static function stopAllWorker($force = false)
  652. {
  653. // 获得所有pid
  654. $all_worker_pid = self::getPidWorkerNameMap();
  655. // 强行杀死?
  656. if($force)
  657. {
  658. // 杀死所有子进程
  659. foreach($all_worker_pid as $pid=>$worker_name)
  660. {
  661. // 发送kill信号
  662. self::forceKillWorker($pid);
  663. self::notice("Kill workers($worker_name) force!");
  664. }
  665. }
  666. else
  667. {
  668. // 向所有子进程发送终止信号
  669. foreach($all_worker_pid as $pid=>$worker_name)
  670. {
  671. // 发送SIGINT信号
  672. posix_kill($pid, SIGINT);
  673. }
  674. }
  675. }
  676. /**
  677. * 强制杀死进程
  678. * @param int $pid
  679. * @return void
  680. */
  681. public static function forceKillWorker($pid)
  682. {
  683. if(posix_kill($pid, 0))
  684. {
  685. self::notice("Kill workers $pid force!");
  686. posix_kill($pid, SIGKILL);
  687. }
  688. }
  689. /**
  690. * 设置运行用户
  691. * @param string $worker_user
  692. * @return void
  693. */
  694. protected static function setWorkerUser($worker_user)
  695. {
  696. $user_info = posix_getpwnam($worker_user);
  697. // 尝试设置gid uid
  698. if(!posix_setgid($user_info['gid']) || !posix_setuid($user_info['uid']))
  699. {
  700. $notice = 'Notice : Can not run woker as '.$worker_user." , You shuld be root\n";
  701. self::notice($notice, true);
  702. }
  703. }
  704. /**
  705. * 获取共享内存资源id
  706. * @return resource
  707. */
  708. public static function getShmId()
  709. {
  710. return self::$shmId;
  711. }
  712. /**
  713. * 获取消息队列资源id
  714. * @return resource
  715. */
  716. public static function getQueueId()
  717. {
  718. return self::$queueId;
  719. }
  720. /**
  721. * 关闭标准输入输出
  722. * @return void
  723. */
  724. protected static function resetStdFd()
  725. {
  726. // 开发环境不关闭标准输出,用于调试
  727. if(Lib\Config::get('debug') == 1 && posix_ttyname(STDOUT))
  728. {
  729. return;
  730. }
  731. global $STDOUT, $STDERR;
  732. @fclose(STDOUT);
  733. @fclose(STDERR);
  734. // 将标准输出重定向到/dev/null
  735. $STDOUT = fopen('/dev/null',"rw+");
  736. $STDERR = fopen('/dev/null',"rw+");
  737. }
  738. /**
  739. * 更新主进程收集的状态信息到共享内存
  740. * @return bool
  741. */
  742. protected static function updateStatusToShm()
  743. {
  744. if(!self::$shmId)
  745. {
  746. return true;
  747. }
  748. return shm_put_var(self::$shmId, self::STATUS_VAR_ID, array_merge(self::$serverStatusInfo, array('pid_map'=>self::$workerPids)));
  749. }
  750. /**
  751. * 销毁共享内存以及消息队列
  752. * @return void
  753. */
  754. protected static function removeShmAndQueue()
  755. {
  756. if(self::$shmId)
  757. {
  758. shm_remove(self::$shmId);
  759. }
  760. if(self::$queueId)
  761. {
  762. msg_remove_queue(self::$queueId);
  763. }
  764. }
  765. /**
  766. * 设置进程名称,需要proctitle支持 或者php>=5.5
  767. * @param string $title
  768. * @return void
  769. */
  770. protected static function setProcessTitle($title)
  771. {
  772. // >=php 5.5
  773. if (version_compare(phpversion(), "5.5", "ge") && function_exists('cli_set_process_title'))
  774. {
  775. cli_set_process_title($title);
  776. }
  777. // 需要扩展
  778. elseif(extension_loaded('proctitle') && function_exists('setproctitle'))
  779. {
  780. setproctitle($title);
  781. }
  782. }
  783. /**
  784. * notice,记录到日志
  785. * @param string $msg
  786. * @param bool $display
  787. * @return void
  788. */
  789. public static function notice($msg, $display = false)
  790. {
  791. Lib\Log::add("Server:".$msg);
  792. if($display)
  793. {
  794. if(self::$serverStatus == self::STATUS_STARTING)
  795. {
  796. echo($msg."\n");
  797. }
  798. }
  799. }
  800. }