Worker.php 80 KB


  1. <?php
  2. /**
  3. * This file is part of workerman.
  4. *
  5. * Licensed under The MIT License
  6. * For full copyright and license information, please see the MIT-LICENSE.txt
  7. * Redistributions of files must retain the above copyright notice.
  8. *
  9. * @author walkor<walkor@workerman.net>
  10. * @copyright walkor<walkor@workerman.net>
  11. * @link http://www.workerman.net/
  12. * @license http://www.opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Workerman;
  15. use Exception;
  16. use stdClass;
  17. use Throwable;
  18. use Workerman\Connection\ConnectionInterface;
  19. use Workerman\Connection\TcpConnection;
  20. use Workerman\Connection\UdpConnection;
  21. use Workerman\Events\Event;
  22. use Workerman\Events\EventInterface;
  23. use Workerman\Events\Revolt;
  24. use Workerman\Events\Select;
  25. use Revolt\EventLoop;
  26. /**
  27. * Worker class
  28. * A container for listening ports
  29. */
  30. #[\AllowDynamicProperties]
  31. class Worker
  32. {
  33. /**
  34. * Version.
  35. *
  36. * @var string
  37. */
  38. const VERSION = '5.0.0-beta.2';
  39. /**
  40. * Status starting.
  41. *
  42. * @var int
  43. */
  44. const STATUS_STARTING = 1;
  45. /**
  46. * Status running.
  47. *
  48. * @var int
  49. */
  50. const STATUS_RUNNING = 2;
  51. /**
  52. * Status shutdown.
  53. *
  54. * @var int
  55. */
  56. const STATUS_SHUTDOWN = 4;
  57. /**
  58. * Status reloading.
  59. *
  60. * @var int
  61. */
  62. const STATUS_RELOADING = 8;
  63. /**
  64. * Default backlog. Backlog is the maximum length of the queue of pending connections.
  65. *
  66. * @var int
  67. */
  68. const DEFAULT_BACKLOG = 102400;
  69. /**
  70. * The safe distance for columns adjacent
  71. *
  72. * @var int
  73. */
  74. const UI_SAFE_LENGTH = 4;
  75. /**
  76. * Worker id.
  77. *
  78. * @var int
  79. */
  80. public int $id = 0;
  81. /**
  82. * Name of the worker processes.
  83. *
  84. * @var string
  85. */
  86. public string $name = 'none';
  87. /**
  88. * Number of worker processes.
  89. *
  90. * @var int
  91. */
  92. public int $count = 1;
  93. /**
  94. * Unix user of processes, needs appropriate privileges (usually root).
  95. *
  96. * @var string
  97. */
  98. public string $user = '';
  99. /**
  100. * Unix group of processes, needs appropriate privileges (usually root).
  101. *
  102. * @var string
  103. */
  104. public string $group = '';
  105. /**
  106. * reloadable.
  107. *
  108. * @var bool
  109. */
  110. public bool $reloadable = true;
  111. /**
  112. * reuse port.
  113. *
  114. * @var bool
  115. */
  116. public bool $reusePort = false;
  117. /**
  118. * Emitted when worker processes start.
  119. *
  120. * @var ?callable
  121. */
  122. public $onWorkerStart = null;
  123. /**
  124. * Emitted when a socket connection is successfully established.
  125. *
  126. * @var ?callable
  127. */
  128. public $onConnect = null;
  129. /**
  130. * Emitted when websocket handshake completed (Only work when protocol is ws).
  131. *
  132. * @var ?callable
  133. */
  134. public $onWebSocketConnect = null;
  135. /**
  136. * Emitted when data is received.
  137. *
  138. * @var callable
  139. */
  140. public $onMessage = null;
  141. /**
  142. * Emitted when the other end of the socket sends a FIN packet.
  143. *
  144. * @var ?callable
  145. */
  146. public $onClose = null;
  147. /**
  148. * Emitted when an error occurs with connection.
  149. *
  150. * @var ?callable
  151. */
  152. public $onError = null;
  153. /**
  154. * Emitted when the send buffer becomes full.
  155. *
  156. * @var ?callable
  157. */
  158. public $onBufferFull = null;
  159. /**
  160. * Emitted when the send buffer becomes empty.
  161. *
  162. * @var ?callable
  163. */
  164. public $onBufferDrain = null;
  165. /**
  166. * Emitted when worker processes stopped.
  167. *
  168. * @var ?callable
  169. */
  170. public $onWorkerStop = null;
  171. /**
  172. * Emitted when worker processes get reload signal.
  173. *
  174. * @var ?callable
  175. */
  176. public $onWorkerReload = null;
  177. /**
  178. * Transport layer protocol.
  179. *
  180. * @var string
  181. */
  182. public string $transport = 'tcp';
  183. /**
  184. * Store all connections of clients.
  185. *
  186. * @var array
  187. */
  188. public array $connections = [];
  189. /**
  190. * Application layer protocol.
  191. *
  192. * @var ?string
  193. */
  194. public ?string $protocol = null;
  195. /**
  196. * Pause accept new connections or not.
  197. *
  198. * @var bool
  199. */
  200. protected bool $pauseAccept = true;
  201. /**
  202. * Is worker stopping ?
  203. * @var bool
  204. */
  205. public bool $stopping = false;
  206. /**
  207. * Daemonize.
  208. *
  209. * @var bool
  210. */
  211. public static bool $daemonize = false;
  212. /**
  213. * Stdout file.
  214. *
  215. * @var string
  216. */
  217. public static string $stdoutFile = '/dev/null';
  218. /**
  219. * The file to store master process PID.
  220. *
  221. * @var string
  222. */
  223. public static string $pidFile = '';
  224. /**
  225. * The file used to store the master process status file.
  226. *
  227. * @var string
  228. */
  229. public static string $statusFile = '';
  230. /**
  231. * Log file.
  232. *
  233. * @var mixed
  234. */
  235. public static mixed $logFile = '';
  236. /**
  237. * Global event loop.
  238. *
  239. * @var ?EventInterface
  240. */
  241. public static ?EventInterface $globalEvent = null;
  242. /**
  243. * Emitted when the master process get reload signal.
  244. *
  245. * @var ?callable
  246. */
  247. public static $onMasterReload = null;
  248. /**
  249. * Emitted when the master process terminated.
  250. *
  251. * @var ?callable
  252. */
  253. public static $onMasterStop = null;
  254. /**
  255. * Emitted when worker processes exited.
  256. *
  257. * @var ?callable
  258. */
  259. public static $onWorkerExit = null;
  260. /**
  261. * EventLoopClass
  262. *
  263. * @var class-string
  264. */
  265. public static string $eventLoopClass = '';
  266. /**
  267. * Process title
  268. *
  269. * @var string
  270. */
  271. public static string $processTitle = 'WorkerMan';
  272. /**
  273. * After sending the stop command to the child process stopTimeout seconds,
  274. * if the process is still living then forced to kill.
  275. *
  276. * @var int
  277. */
  278. public static int $stopTimeout = 2;
  279. /**
  280. * Command
  281. * @var string
  282. */
  283. public static string $command = '';
  284. /**
  285. * The PID of master process.
  286. *
  287. * @var int
  288. */
  289. protected static int $masterPid = 0;
  290. /**
  291. * Listening socket.
  292. *
  293. * @var resource
  294. */
  295. protected $mainSocket = null;
  296. /**
  297. * Socket name. The format is like this http://0.0.0.0:80 .
  298. *
  299. * @var string
  300. */
  301. protected string $socketName = '';
  302. /**
  303. * parse from socketName avoid parse again in master or worker
  304. * LocalSocket The format is like tcp://0.0.0.0:8080
  305. * @var ?string
  306. */
  307. protected ?string $localSocket = null;
  308. /**
  309. * Context of socket.
  310. *
  311. * @var resource
  312. */
  313. protected $socketContext = null;
  314. /**
  315. * @var stdClass
  316. */
  317. protected stdClass $context;
  318. /**
  319. * All worker instances.
  320. *
  321. * @var Worker[]
  322. */
  323. protected static array $workers = [];
  324. /**
  325. * All worker processes pid.
  326. * The format is like this [worker_id=>[pid=>pid, pid=>pid, ..], ..]
  327. *
  328. * @var array
  329. */
  330. protected static array $pidMap = [];
  331. /**
  332. * All worker processes waiting for restart.
  333. * The format is like this [pid=>pid, pid=>pid].
  334. *
  335. * @var array
  336. */
  337. protected static array $pidsToRestart = [];
  338. /**
  339. * Mapping from PID to worker process ID.
  340. * The format is like this [worker_id=>[0=>$pid, 1=>$pid, ..], ..].
  341. *
  342. * @var array
  343. */
  344. protected static array $idMap = [];
  345. /**
  346. * Current status.
  347. *
  348. * @var int
  349. */
  350. protected static int $status = self::STATUS_STARTING;
  351. /**
  352. * Maximum length of the worker names.
  353. *
  354. * @var int
  355. */
  356. protected static int $maxWorkerNameLength = 12;
  357. /**
  358. * Maximum length of the socket names.
  359. *
  360. * @var int
  361. */
  362. protected static int $maxSocketNameLength = 12;
  363. /**
  364. * Maximum length of the process user names.
  365. *
  366. * @var int
  367. */
  368. protected static int $maxUserNameLength = 12;
  369. /**
  370. * Maximum length of the Proto names.
  371. *
  372. * @var int
  373. */
  374. protected static int $maxProtoNameLength = 4;
  375. /**
  376. * Maximum length of the Processes names.
  377. *
  378. * @var int
  379. */
  380. protected static int $maxProcessesNameLength = 9;
  381. /**
  382. * Maximum length of the state names.
  383. *
  384. * @var int
  385. */
  386. protected static int $maxStateNameLength = 1;
  387. /**
  388. * The file to store status info of current worker process.
  389. *
  390. * @var string
  391. */
  392. protected static string $statisticsFile = '';
  393. /**
  394. * Start file.
  395. *
  396. * @var string
  397. */
  398. protected static string $startFile = '';
  399. /**
  400. * Processes for windows.
  401. *
  402. * @var array
  403. */
  404. protected static array $processForWindows = [];
  405. /**
  406. * Status info of current worker process.
  407. *
  408. * @var array
  409. */
  410. protected static array $globalStatistics = [
  411. 'start_timestamp' => 0,
  412. 'worker_exit_info' => []
  413. ];
  414. /**
  415. * Available event loops.
  416. *
  417. * @var array<string, string>
  418. */
  419. protected static array $availableEventLoops = [
  420. 'event' => Event::class,
  421. ];
  422. /**
  423. * PHP built-in protocols.
  424. *
  425. * @var array<string,string>
  426. */
  427. const BUILD_IN_TRANSPORTS = [
  428. 'tcp' => 'tcp',
  429. 'udp' => 'udp',
  430. 'unix' => 'unix',
  431. 'ssl' => 'tcp'
  432. ];
  433. /**
  434. * PHP built-in error types.
  435. *
  436. * @var array<int,string>
  437. */
  438. const ERROR_TYPE = [
  439. \E_ERROR => 'E_ERROR', // 1
  440. \E_WARNING => 'E_WARNING', // 2
  441. \E_PARSE => 'E_PARSE', // 4
  442. \E_NOTICE => 'E_NOTICE', // 8
  443. \E_CORE_ERROR => 'E_CORE_ERROR', // 16
  444. \E_CORE_WARNING => 'E_CORE_WARNING', // 32
  445. \E_COMPILE_ERROR => 'E_COMPILE_ERROR', // 64
  446. \E_COMPILE_WARNING => 'E_COMPILE_WARNING', // 128
  447. \E_USER_ERROR => 'E_USER_ERROR', // 256
  448. \E_USER_WARNING => 'E_USER_WARNING', // 512
  449. \E_USER_NOTICE => 'E_USER_NOTICE', // 1024
  450. \E_STRICT => 'E_STRICT', // 2048
  451. \E_RECOVERABLE_ERROR => 'E_RECOVERABLE_ERROR', // 4096
  452. \E_DEPRECATED => 'E_DEPRECATED', // 8192
  453. \E_USER_DEPRECATED => 'E_USER_DEPRECATED' // 16384
  454. ];
  455. /**
  456. * Graceful stop or not.
  457. *
  458. * @var bool
  459. */
  460. protected static bool $gracefulStop = false;
  461. /**
  462. * Standard output stream
  463. * @var resource
  464. */
  465. protected static $outputStream = null;
  466. /**
  467. * If $outputStream support decorated
  468. * @var bool
  469. */
  470. protected static ?bool $outputDecorated = null;
  471. /**
  472. * Worker object's hash id(unique identifier).
  473. *
  474. * @var ?string
  475. */
  476. protected ?string $workerId = null;
  477. /**
  478. * Run all worker instances.
  479. *
  480. * @return void
  481. * @throws Exception
  482. */
  483. public static function runAll()
  484. {
  485. static::checkSapiEnv();
  486. static::init();
  487. static::parseCommand();
  488. static::lock();
  489. static::daemonize();
  490. static::initWorkers();
  491. static::installSignal();
  492. static::saveMasterPid();
  493. static::lock(\LOCK_UN);
  494. static::displayUI();
  495. static::forkWorkers();
  496. static::resetStd();
  497. static::monitorWorkers();
  498. }
  499. /**
  500. * Check sapi.
  501. *
  502. * @return void
  503. */
  504. protected static function checkSapiEnv()
  505. {
  506. // Only for cli.
  507. if (\PHP_SAPI !== 'cli') {
  508. exit("Only run in command line mode \n");
  509. }
  510. }
  511. /**
  512. * Init.
  513. *
  514. * @return void
  515. */
  516. protected static function init()
  517. {
  518. \set_error_handler(function ($code, $msg, $file, $line) {
  519. static::safeEcho("$msg in file $file on line $line\n");
  520. });
  521. // Start file.
  522. $backtrace = \debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS);
  523. static::$startFile = end($backtrace)['file'];
  524. $uniquePrefix = \str_replace('/', '_', static::$startFile);
  525. // Pid file.
  526. if (empty(static::$pidFile)) {
  527. static::$pidFile = __DIR__ . "/../$uniquePrefix.pid";
  528. }
  529. // Log file.
  530. if (empty(static::$logFile)) {
  531. static::$logFile = __DIR__ . '/../../workerman.log';
  532. }
  533. if (!\is_file(static::$logFile)) {
  534. // if /runtime/logs default folder not exists
  535. if (!is_dir(dirname(static::$logFile))) {
  536. @mkdir(dirname(static::$logFile), 0777, true);
  537. }
  538. \touch(static::$logFile);
  539. \chmod(static::$logFile, 0622);
  540. }
  541. // State.
  542. static::$status = static::STATUS_STARTING;
  543. // For statistics.
  544. static::$globalStatistics['start_timestamp'] = \time();
  545. // Process title.
  546. static::setProcessTitle(static::$processTitle . ': master process start_file=' . static::$startFile);
  547. // Init data for worker id.
  548. static::initId();
  549. // Timer init.
  550. Timer::init();
  551. }
  552. /**
  553. * Lock.
  554. *
  555. * @return void
  556. */
  557. protected static function lock($flag = \LOCK_EX)
  558. {
  559. static $fd;
  560. if (\DIRECTORY_SEPARATOR !== '/') {
  561. return;
  562. }
  563. $lockFile = static::$pidFile . '.lock';
  564. $fd = $fd ?: \fopen($lockFile, 'a+');
  565. if ($fd) {
  566. flock($fd, $flag);
  567. if ($flag === \LOCK_UN) {
  568. fclose($fd);
  569. $fd = null;
  570. clearstatcache();
  571. if (\is_file($lockFile)) {
  572. unlink($lockFile);
  573. }
  574. }
  575. }
  576. }
  577. /**
  578. * Init All worker instances.
  579. *
  580. * @return void
  581. * @throws Exception
  582. */
  583. protected static function initWorkers()
  584. {
  585. if (\DIRECTORY_SEPARATOR !== '/') {
  586. return;
  587. }
  588. static::$statisticsFile = static::$statusFile ?: __DIR__ . '/../workerman-' . posix_getpid() . '.status';
  589. foreach (static::$workers as $worker) {
  590. // Worker name.
  591. if (empty($worker->name)) {
  592. $worker->name = 'none';
  593. }
  594. // Get unix user of the worker process.
  595. if (empty($worker->user)) {
  596. $worker->user = static::getCurrentUser();
  597. } else {
  598. if (\posix_getuid() !== 0 && $worker->user !== static::getCurrentUser()) {
  599. static::log('Warning: You must have the root privileges to change uid and gid.');
  600. }
  601. }
  602. // Socket name.
  603. $worker->context->statusSocket = $worker->getSocketName();
  604. // Status name.
  605. $worker->context->statusState = '<g> [OK] </g>';
  606. // Get column mapping for UI
  607. foreach (static::getUiColumns() as $columnName => $prop) {
  608. !isset($worker->$prop) && !isset($worker->context->$prop) && $worker->context->$prop = 'NNNN';
  609. $propLength = \strlen($worker->$prop ?? $worker->context->$prop);
  610. $key = 'max' . \ucfirst(\strtolower($columnName)) . 'NameLength';
  611. static::$$key = \max(static::$$key, $propLength);
  612. }
  613. // Listen.
  614. if (!$worker->reusePort) {
  615. $worker->listen();
  616. }
  617. }
  618. }
  619. /**
  620. * Get all worker instances.
  621. *
  622. * @return Worker[]
  623. */
  624. public static function getAllWorkers(): array
  625. {
  626. return static::$workers;
  627. }
  628. /**
  629. * Get global event-loop instance.
  630. *
  631. * @return EventInterface
  632. */
  633. public static function getEventLoop(): EventInterface
  634. {
  635. return static::$globalEvent;
  636. }
  637. /**
  638. * Get main socket resource
  639. * @return resource
  640. */
  641. public function getMainSocket()
  642. {
  643. return $this->mainSocket;
  644. }
  645. /**
  646. * Init idMap.
  647. *
  648. * @return void
  649. */
  650. protected static function initId()
  651. {
  652. foreach (static::$workers as $workerId => $worker) {
  653. $newIdMap = [];
  654. $worker->count = max($worker->count, 1);
  655. for ($key = 0; $key < $worker->count; $key++) {
  656. $newIdMap[$key] = static::$idMap[$workerId][$key] ?? 0;
  657. }
  658. static::$idMap[$workerId] = $newIdMap;
  659. }
  660. }
  661. /**
  662. * Get unix user of current porcess.
  663. *
  664. * @return string
  665. */
  666. protected static function getCurrentUser(): string
  667. {
  668. $userInfo = \posix_getpwuid(\posix_getuid());
  669. return $userInfo['name'];
  670. }
  671. /**
  672. * Display staring UI.
  673. *
  674. * @return void
  675. */
  676. protected static function displayUI()
  677. {
  678. $tmpArgv = static::getArgv();
  679. if (\in_array('-q', $tmpArgv)) {
  680. return;
  681. }
  682. if (\DIRECTORY_SEPARATOR !== '/') {
  683. static::safeEcho("----------------------- WORKERMAN -----------------------------\r\n");
  684. static::safeEcho('Workerman version:' . static::VERSION . ' PHP version:' . \PHP_VERSION . "\r\n");
  685. static::safeEcho("------------------------ WORKERS -------------------------------\r\n");
  686. static::safeEcho("worker listen processes status\r\n");
  687. return;
  688. }
  689. //show version
  690. $lineVersion = 'Workerman version:' . static::VERSION . \str_pad('PHP version:', 16, ' ', \STR_PAD_LEFT) . \PHP_VERSION . \str_pad('Event-loop:', 16, ' ', \STR_PAD_LEFT) . static::getEventLoopName() . \PHP_EOL;
  691. !\defined('LINE_VERSIOIN_LENGTH') && \define('LINE_VERSIOIN_LENGTH', \strlen($lineVersion));
  692. $totalLength = static::getSingleLineTotalLength();
  693. $lineOne = '<n>' . \str_pad('<w> WORKERMAN </w>', $totalLength + \strlen('<w></w>'), '-', \STR_PAD_BOTH) . '</n>' . \PHP_EOL;
  694. $lineTwo = \str_pad('<w> WORKERS </w>', $totalLength + \strlen('<w></w>'), '-', \STR_PAD_BOTH) . \PHP_EOL;
  695. static::safeEcho($lineOne . $lineVersion . $lineTwo);
  696. //Show title
  697. $title = '';
  698. foreach (static::getUiColumns() as $columnName => $prop) {
  699. $key = 'max' . \ucfirst(\strtolower($columnName)) . 'NameLength';
  700. //just keep compatible with listen name
  701. $columnName === 'socket' && $columnName = 'listen';
  702. $title .= "<w>{$columnName}</w>" . \str_pad('', static::$$key + static::UI_SAFE_LENGTH - \strlen($columnName));
  703. }
  704. $title && static::safeEcho($title . \PHP_EOL);
  705. //Show content
  706. foreach (static::$workers as $worker) {
  707. $content = '';
  708. foreach (static::getUiColumns() as $columnName => $prop) {
  709. $propValue = $worker->$prop ?? $worker->context->$prop;
  710. $key = 'max' . \ucfirst(\strtolower($columnName)) . 'NameLength';
  711. \preg_match_all("/(<n>|<\/n>|<w>|<\/w>|<g>|<\/g>)/is", $propValue, $matches);
  712. $placeHolderLength = !empty($matches) ? \strlen(\implode('', $matches[0])) : 0;
  713. $content .= \str_pad($propValue, static::$$key + static::UI_SAFE_LENGTH + $placeHolderLength);
  714. }
  715. $content && static::safeEcho($content . \PHP_EOL);
  716. }
  717. //Show last line
  718. $lineLast = \str_pad('', static::getSingleLineTotalLength(), '-') . \PHP_EOL;
  719. !empty($content) && static::safeEcho($lineLast);
  720. if (static::$daemonize) {
  721. global $argv;
  722. $startFile = $argv[0];
  723. static::safeEcho('Input "php ' . $startFile . ' stop" to stop. Start success.' . "\n\n");
  724. } else {
  725. static::safeEcho("Press Ctrl+C to stop. Start success.\n");
  726. }
  727. }
  728. /**
  729. * Get UI columns to be shown in terminal
  730. *
  731. * 1. $columnMap: ['ui_column_name' => 'clas_property_name']
  732. * 2. Consider move into configuration in future
  733. *
  734. * @return array
  735. */
  736. public static function getUiColumns(): array
  737. {
  738. return [
  739. 'proto' => 'transport',
  740. 'user' => 'user',
  741. 'worker' => 'name',
  742. 'socket' => 'statusSocket',
  743. 'processes' => 'count',
  744. 'state' => 'statusState',
  745. ];
  746. }
  747. /**
  748. * Get single line total length for ui
  749. *
  750. * @return int
  751. */
  752. public static function getSingleLineTotalLength(): int
  753. {
  754. $totalLength = 0;
  755. foreach (static::getUiColumns() as $columnName => $prop) {
  756. $key = 'max' . \ucfirst(\strtolower($columnName)) . 'NameLength';
  757. $totalLength += static::$$key + static::UI_SAFE_LENGTH;
  758. }
  759. //Keep beauty when show less columns
  760. !\defined('LINE_VERSIOIN_LENGTH') && \define('LINE_VERSIOIN_LENGTH', 0);
  761. $totalLength <= LINE_VERSIOIN_LENGTH && $totalLength = LINE_VERSIOIN_LENGTH;
  762. return $totalLength;
  763. }
  764. /**
  765. * Parse command.
  766. *
  767. * @return void
  768. */
  769. protected static function parseCommand()
  770. {
  771. if (\DIRECTORY_SEPARATOR !== '/') {
  772. return;
  773. }
  774. global $argv;
  775. // Check argv;
  776. $startFile = $argv[0];
  777. $usage = "Usage: php yourfile <command> [mode]\nCommands: \nstart\t\tStart worker in DEBUG mode.\n\t\tUse mode -d to start in DAEMON mode.\nstop\t\tStop worker.\n\t\tUse mode -g to stop gracefully.\nrestart\t\tRestart workers.\n\t\tUse mode -d to start in DAEMON mode.\n\t\tUse mode -g to stop gracefully.\nreload\t\tReload codes.\n\t\tUse mode -g to reload gracefully.\nstatus\t\tGet worker status.\n\t\tUse mode -d to show live status.\nconnections\tGet worker connections.\n";
  778. $availableCommands = [
  779. 'start',
  780. 'stop',
  781. 'restart',
  782. 'reload',
  783. 'status',
  784. 'connections',
  785. ];
  786. $availableMode = [
  787. '-d',
  788. '-g'
  789. ];
  790. $command = $mode = '';
  791. foreach (static::getArgv() as $value) {
  792. if (\in_array($value, $availableCommands)) {
  793. $command = $value;
  794. } elseif (\in_array($value, $availableMode)) {
  795. $mode = $value;
  796. }
  797. }
  798. if (!$command) {
  799. exit($usage);
  800. }
  801. // Start command.
  802. $modeStr = '';
  803. if ($command === 'start') {
  804. if ($mode === '-d' || static::$daemonize) {
  805. $modeStr = 'in DAEMON mode';
  806. } else {
  807. $modeStr = 'in DEBUG mode';
  808. }
  809. }
  810. static::log("Workerman[$startFile] $command $modeStr");
  811. // Get master process PID.
  812. $masterPid = \is_file(static::$pidFile) ? (int)\file_get_contents(static::$pidFile) : 0;
  813. // Master is still alive?
  814. if (static::checkMasterIsAlive($masterPid)) {
  815. if ($command === 'start') {
  816. static::log("Workerman[$startFile] already running");
  817. exit;
  818. }
  819. } elseif ($command !== 'start' && $command !== 'restart') {
  820. static::log("Workerman[$startFile] not run");
  821. exit;
  822. }
  823. $statisticsFile = static::$statusFile ?: __DIR__ . "/../workerman-$masterPid.status";
  824. // execute command.
  825. switch ($command) {
  826. case 'start':
  827. if ($mode === '-d') {
  828. static::$daemonize = true;
  829. }
  830. break;
  831. case 'status':
  832. while (1) {
  833. if (\is_file($statisticsFile)) {
  834. @\unlink($statisticsFile);
  835. }
  836. // Master process will send SIGIOT signal to all child processes.
  837. \posix_kill($masterPid, SIGIOT);
  838. // Sleep 1 second.
  839. \sleep(1);
  840. // Clear terminal.
  841. if ($mode === '-d') {
  842. static::safeEcho("\33[H\33[2J\33(B\33[m", true);
  843. }
  844. // Echo status data.
  845. static::safeEcho(static::formatStatusData($statisticsFile));
  846. if ($mode !== '-d') {
  847. @\unlink($statisticsFile);
  848. exit(0);
  849. }
  850. static::safeEcho("\nPress Ctrl+C to quit.\n\n");
  851. }
  852. case 'connections':
  853. if (\is_file($statisticsFile) && \is_writable($statisticsFile)) {
  854. \unlink($statisticsFile);
  855. }
  856. // Master process will send SIGIO signal to all child processes.
  857. \posix_kill($masterPid, SIGIO);
  858. // Waiting amoment.
  859. \usleep(500000);
  860. // Display statisitcs data from a disk file.
  861. if (\is_readable($statisticsFile)) {
  862. \readfile($statisticsFile);
  863. }
  864. exit(0);
  865. case 'restart':
  866. case 'stop':
  867. if ($mode === '-g') {
  868. static::$gracefulStop = true;
  869. $sig = \SIGQUIT;
  870. static::log("Workerman[$startFile] is gracefully stopping ...");
  871. } else {
  872. static::$gracefulStop = false;
  873. $sig = \SIGINT;
  874. static::log("Workerman[$startFile] is stopping ...");
  875. }
  876. // Send stop signal to master process.
  877. $masterPid && \posix_kill($masterPid, $sig);
  878. // Timeout.
  879. $timeout = static::$stopTimeout + 3;
  880. $startTime = \time();
  881. // Check master process is still alive?
  882. while (1) {
  883. $masterIsAlive = $masterPid && \posix_kill($masterPid, 0);
  884. if ($masterIsAlive) {
  885. // Timeout?
  886. if (!static::$gracefulStop && \time() - $startTime >= $timeout) {
  887. static::log("Workerman[$startFile] stop fail");
  888. exit;
  889. }
  890. // Waiting amoment.
  891. \usleep(10000);
  892. continue;
  893. }
  894. // Stop success.
  895. static::log("Workerman[$startFile] stop success");
  896. if ($command === 'stop') {
  897. exit(0);
  898. }
  899. if ($mode === '-d') {
  900. static::$daemonize = true;
  901. }
  902. break;
  903. }
  904. break;
  905. case 'reload':
  906. if ($mode === '-g') {
  907. $sig = \SIGUSR2;
  908. } else {
  909. $sig = \SIGUSR1;
  910. }
  911. \posix_kill($masterPid, $sig);
  912. exit;
  913. default :
  914. static::safeEcho('Unknown command: ' . $command . "\n");
  915. exit($usage);
  916. }
  917. }
  918. /**
  919. * Get argv.
  920. *
  921. * @return mixed
  922. */
  923. public static function getArgv(): mixed
  924. {
  925. global $argv;
  926. return isset($argv[1]) ? $argv : (static::$command ? \explode(' ', static::$command) : $argv);
  927. }
  928. /**
  929. * Format status data.
  930. *
  931. * @param $statisticsFile
  932. * @return string
  933. */
  934. protected static function formatStatusData($statisticsFile): string
  935. {
  936. static $totalRequestCache = [];
  937. if (!\is_readable($statisticsFile)) {
  938. return '';
  939. }
  940. $info = \file($statisticsFile, \FILE_IGNORE_NEW_LINES);
  941. if (!$info) {
  942. return '';
  943. }
  944. $statusStr = '';
  945. $currentTotalRequest = [];
  946. $workerInfo = \unserialize($info[0]);
  947. \ksort($workerInfo, SORT_NUMERIC);
  948. unset($info[0]);
  949. $dataWaitingSort = [];
  950. $readProcessStatus = false;
  951. $totalRequests = 0;
  952. $totalQps = 0;
  953. $totalConnections = 0;
  954. $totalFails = 0;
  955. $totalMemory = 0;
  956. $totalTimers = 0;
  957. $maxLen1 = static::$maxSocketNameLength;
  958. $maxLen2 = static::$maxWorkerNameLength;
  959. foreach ($info as $value) {
  960. if (!$readProcessStatus) {
  961. $statusStr .= $value . "\n";
  962. if (\preg_match('/^pid.*?memory.*?listening/', $value)) {
  963. $readProcessStatus = true;
  964. }
  965. continue;
  966. }
  967. if (\preg_match('/^[0-9]+/', $value, $pidMath)) {
  968. $pid = $pidMath[0];
  969. $dataWaitingSort[$pid] = $value;
  970. if (\preg_match('/^\S+?\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?(\S+?)\s+?/', $value, $match)) {
  971. $totalMemory += \intval(\str_ireplace('M', '', $match[1]));
  972. $maxLen1 = \max($maxLen1, \strlen($match[2]));
  973. $maxLen2 = \max($maxLen2, \strlen($match[3]));
  974. $totalConnections += \intval($match[4]);
  975. $totalFails += \intval($match[5]);
  976. $totalTimers += \intval($match[6]);
  977. $currentTotalRequest[$pid] = $match[7];
  978. $totalRequests += \intval($match[7]);
  979. }
  980. }
  981. }
  982. foreach ($workerInfo as $pid => $info) {
  983. if (!isset($dataWaitingSort[$pid])) {
  984. $statusStr .= "$pid\t" . \str_pad('N/A', 7) . " "
  985. . \str_pad($info['listen'], static::$maxSocketNameLength) . " "
  986. . \str_pad($info['name'], static::$maxWorkerNameLength) . " "
  987. . \str_pad('N/A', 11) . " " . \str_pad('N/A', 9) . " "
  988. . \str_pad('N/A', 7) . " " . \str_pad('N/A', 13) . " N/A [busy] \n";
  989. continue;
  990. }
  991. //$qps = isset($totalRequestCache[$pid]) ? $currentTotalRequest[$pid]
  992. if (!isset($totalRequestCache[$pid]) || !isset($currentTotalRequest[$pid])) {
  993. $qps = 0;
  994. } else {
  995. $qps = $currentTotalRequest[$pid] - $totalRequestCache[$pid];
  996. $totalQps += $qps;
  997. }
  998. $statusStr .= $dataWaitingSort[$pid] . " " . \str_pad($qps, 6) . " [idle]\n";
  999. }
  1000. $totalRequestCache = $currentTotalRequest;
  1001. $statusStr .= "----------------------------------------------PROCESS STATUS---------------------------------------------------\n";
  1002. $statusStr .= "Summary\t" . \str_pad($totalMemory . 'M', 7) . " "
  1003. . \str_pad('-', $maxLen1) . " "
  1004. . \str_pad('-', $maxLen2) . " "
  1005. . \str_pad($totalConnections, 11) . " " . \str_pad($totalFails, 9) . " "
  1006. . \str_pad($totalTimers, 7) . " " . \str_pad($totalRequests, 13) . " "
  1007. . \str_pad($totalQps, 6) . " [Summary] \n";
  1008. return $statusStr;
  1009. }
  1010. /**
  1011. * Install signal handler.
  1012. *
  1013. * @return void
  1014. */
  1015. protected static function installSignal()
  1016. {
  1017. if (\DIRECTORY_SEPARATOR !== '/') {
  1018. return;
  1019. }
  1020. $signals = [\SIGINT, \SIGTERM, \SIGHUP, \SIGTSTP, \SIGQUIT, \SIGUSR1, \SIGUSR2, \SIGIOT, \SIGIO];
  1021. foreach ($signals as $signal) {
  1022. \pcntl_signal($signal, [static::class, 'signalHandler'], false);
  1023. }
  1024. // ignore
  1025. \pcntl_signal(\SIGPIPE, \SIG_IGN, false);
  1026. }
  1027. /**
  1028. * Reinstall signal handler.
  1029. *
  1030. * @return void
  1031. */
  1032. protected static function reinstallSignal()
  1033. {
  1034. if (\DIRECTORY_SEPARATOR !== '/') {
  1035. return;
  1036. }
  1037. $signals = [\SIGINT, \SIGTERM, \SIGHUP, \SIGTSTP, \SIGQUIT, \SIGUSR1, \SIGUSR2, \SIGIOT, \SIGIO];
  1038. foreach ($signals as $signal) {
  1039. \pcntl_signal($signal, \SIG_IGN, false);
  1040. static::$globalEvent->onSignal($signal, [static::class, 'signalHandler']);
  1041. };
  1042. }
  1043. /**
  1044. * Signal handler.
  1045. *
  1046. * @param int $signal
  1047. * @throws Exception
  1048. */
  1049. public static function signalHandler(int $signal)
  1050. {
  1051. switch ($signal) {
  1052. // Stop.
  1053. case \SIGINT:
  1054. case \SIGTERM:
  1055. case \SIGHUP:
  1056. case \SIGTSTP:
  1057. static::$gracefulStop = false;
  1058. static::stopAll();
  1059. break;
  1060. // Graceful stop.
  1061. case \SIGQUIT:
  1062. static::$gracefulStop = true;
  1063. static::stopAll();
  1064. break;
  1065. // Reload.
  1066. case \SIGUSR2:
  1067. case \SIGUSR1:
  1068. if (static::$status === static::STATUS_RELOADING || static::$status === static::STATUS_SHUTDOWN) {
  1069. return;
  1070. }
  1071. static::$gracefulStop = $signal === \SIGUSR2;
  1072. static::$pidsToRestart = static::getAllWorkerPids();
  1073. static::reload();
  1074. break;
  1075. // Show status.
  1076. case \SIGIOT:
  1077. static::writeStatisticsToStatusFile();
  1078. break;
  1079. // Show connection status.
  1080. case \SIGIO:
  1081. static::writeConnectionsStatisticsToStatusFile();
  1082. break;
  1083. }
  1084. }
  1085. /**
  1086. * Run as daemon mode.
  1087. *
  1088. * @throws Exception
  1089. */
  1090. protected static function daemonize()
  1091. {
  1092. if (!static::$daemonize || \DIRECTORY_SEPARATOR !== '/') {
  1093. return;
  1094. }
  1095. \umask(0);
  1096. $pid = \pcntl_fork();
  1097. if (-1 === $pid) {
  1098. throw new Exception('Fork fail');
  1099. } elseif ($pid > 0) {
  1100. exit(0);
  1101. }
  1102. if (-1 === \posix_setsid()) {
  1103. throw new Exception("Setsid fail");
  1104. }
  1105. // Fork again avoid SVR4 system regain the control of terminal.
  1106. $pid = \pcntl_fork();
  1107. if (-1 === $pid) {
  1108. throw new Exception("Fork fail");
  1109. } elseif (0 !== $pid) {
  1110. exit(0);
  1111. }
  1112. }
  1113. /**
  1114. * Redirect standard input and output.
  1115. *
  1116. * @param bool $throwException
  1117. * @return void
  1118. * @throws Exception
  1119. */
  1120. public static function resetStd(bool $throwException = true)
  1121. {
  1122. if (!static::$daemonize || \DIRECTORY_SEPARATOR !== '/') {
  1123. return;
  1124. }
  1125. global $STDOUT, $STDERR;
  1126. $handle = \fopen(static::$stdoutFile, "a");
  1127. if ($handle) {
  1128. unset($handle);
  1129. \set_error_handler(function () {
  1130. });
  1131. if ($STDOUT) {
  1132. \fclose($STDOUT);
  1133. }
  1134. if ($STDERR) {
  1135. \fclose($STDERR);
  1136. }
  1137. if (\is_resource(\STDOUT)) {
  1138. \fclose(\STDOUT);
  1139. }
  1140. if (\is_resource(\STDERR)) {
  1141. \fclose(\STDERR);
  1142. }
  1143. $STDOUT = \fopen(static::$stdoutFile, "a");
  1144. $STDERR = \fopen(static::$stdoutFile, "a");
  1145. // Fix standard output cannot redirect of PHP 8.1.8's bug
  1146. if (\function_exists('posix_isatty') && \posix_isatty(2)) {
  1147. \ob_start(function ($string) {
  1148. \file_put_contents(static::$stdoutFile, $string, FILE_APPEND);
  1149. }, 1);
  1150. }
  1151. // change output stream
  1152. static::$outputStream = null;
  1153. static::outputStream($STDOUT);
  1154. \restore_error_handler();
  1155. return;
  1156. }
  1157. if ($throwException) {
  1158. throw new Exception('Can not open stdoutFile ' . static::$stdoutFile);
  1159. }
  1160. }
  1161. /**
  1162. * Save pid.
  1163. *
  1164. * @throws Exception
  1165. */
  1166. protected static function saveMasterPid()
  1167. {
  1168. if (\DIRECTORY_SEPARATOR !== '/') {
  1169. return;
  1170. }
  1171. static::$masterPid = \posix_getpid();
  1172. if (false === \file_put_contents(static::$pidFile, static::$masterPid)) {
  1173. throw new Exception('can not save pid to ' . static::$pidFile);
  1174. }
  1175. }
  1176. /**
  1177. * Get event loop name.
  1178. *
  1179. * @return string
  1180. */
  1181. protected static function getEventLoopName(): string
  1182. {
  1183. if (static::$eventLoopClass) {
  1184. return static::$eventLoopClass;
  1185. }
  1186. if (\class_exists(EventLoop::class)) {
  1187. static::$eventLoopClass = Revolt::class;
  1188. return static::$eventLoopClass;
  1189. }
  1190. $loopName = '';
  1191. foreach (static::$availableEventLoops as $name => $class) {
  1192. if (\extension_loaded($name)) {
  1193. $loopName = $name;
  1194. break;
  1195. }
  1196. }
  1197. if ($loopName) {
  1198. static::$eventLoopClass = static::$availableEventLoops[$loopName];
  1199. } else {
  1200. static::$eventLoopClass = Select::class;
  1201. }
  1202. return static::$eventLoopClass;
  1203. }
  1204. /**
  1205. * Get all pids of worker processes.
  1206. *
  1207. * @return array
  1208. */
  1209. protected static function getAllWorkerPids(): array
  1210. {
  1211. $pidArray = [];
  1212. foreach (static::$pidMap as $workerPidArray) {
  1213. foreach ($workerPidArray as $workerPid) {
  1214. $pidArray[$workerPid] = $workerPid;
  1215. }
  1216. }
  1217. return $pidArray;
  1218. }
  1219. /**
  1220. * Fork some worker processes.
  1221. *
  1222. * @return void
  1223. */
  1224. protected static function forkWorkers()
  1225. {
  1226. if (\DIRECTORY_SEPARATOR === '/') {
  1227. static::forkWorkersForLinux();
  1228. } else {
  1229. static::forkWorkersForWindows();
  1230. }
  1231. }
  1232. /**
  1233. * Fork some worker processes.
  1234. *
  1235. * @return void
  1236. * @throws Exception
  1237. */
  1238. protected static function forkWorkersForLinux()
  1239. {
  1240. foreach (static::$workers as $worker) {
  1241. if (static::$status === static::STATUS_STARTING) {
  1242. if (empty($worker->name)) {
  1243. $worker->name = $worker->getSocketName();
  1244. }
  1245. $workerNameLength = \strlen($worker->name);
  1246. if (static::$maxWorkerNameLength < $workerNameLength) {
  1247. static::$maxWorkerNameLength = $workerNameLength;
  1248. }
  1249. }
  1250. while (\count(static::$pidMap[$worker->workerId]) < $worker->count) {
  1251. static::forkOneWorkerForLinux($worker);
  1252. }
  1253. }
  1254. }
  1255. /**
  1256. * Fork some worker processes.
  1257. *
  1258. * @return void
  1259. * @throws Exception
  1260. */
  1261. protected static function forkWorkersForWindows()
  1262. {
  1263. $files = static::getStartFilesForWindows();
  1264. if (\in_array('-q', static::getArgv()) || \count($files) === 1) {
  1265. if (\count(static::$workers) > 1) {
  1266. static::safeEcho("@@@ Error: multi workers init in one php file are not support @@@\r\n");
  1267. static::safeEcho("@@@ See https://www.workerman.net/doc/workerman/faq/multi-woker-for-windows.html @@@\r\n");
  1268. } elseif (\count(static::$workers) <= 0) {
  1269. exit("@@@no worker inited@@@\r\n\r\n");
  1270. }
  1271. \reset(static::$workers);
  1272. /** @var Worker $worker */
  1273. $worker = current(static::$workers);
  1274. Timer::delAll();
  1275. //Update process state.
  1276. static::$status = static::STATUS_RUNNING;
  1277. // Register shutdown function for checking errors.
  1278. \register_shutdown_function(["\\Workerman\\Worker", 'checkErrors']);
  1279. // Create a global event loop.
  1280. if (!static::$globalEvent) {
  1281. $eventLoopClass = static::getEventLoopName();
  1282. static::$globalEvent = new $eventLoopClass;
  1283. static::$globalEvent->setErrorHandler(function ($exception) {
  1284. static::stopAll(250, $exception);
  1285. });
  1286. }
  1287. // Reinstall signal.
  1288. static::reinstallSignal();
  1289. // Init Timer.
  1290. Timer::init(static::$globalEvent);
  1291. \restore_error_handler();
  1292. // Display UI.
  1293. static::safeEcho(\str_pad($worker->name, 21) . \str_pad($worker->getSocketName(), 36) . \str_pad($worker->count, 10) . "[ok]\n");
  1294. $worker->listen();
  1295. $worker->run();
  1296. static::$globalEvent->run();
  1297. if (static::$status !== self::STATUS_SHUTDOWN) {
  1298. $err = new Exception('event-loop exited');
  1299. static::log($err);
  1300. exit(250);
  1301. }
  1302. exit(0);
  1303. } else {
  1304. static::$globalEvent = new Select();
  1305. static::$globalEvent->setErrorHandler(function ($exception) {
  1306. static::stopAll(250, $exception);
  1307. });
  1308. Timer::init(static::$globalEvent);
  1309. foreach ($files as $startFile) {
  1310. static::forkOneWorkerForWindows($startFile);
  1311. }
  1312. }
  1313. }
  1314. /**
  1315. * Get start files for windows.
  1316. *
  1317. * @return array
  1318. */
  1319. public static function getStartFilesForWindows(): array
  1320. {
  1321. $files = [];
  1322. foreach (static::getArgv() as $file) {
  1323. if (\is_file($file)) {
  1324. $files[$file] = $file;
  1325. }
  1326. }
  1327. return $files;
  1328. }
  1329. /**
  1330. * Fork one worker process.
  1331. *
  1332. * @param string $startFile
  1333. */
  1334. public static function forkOneWorkerForWindows(string $startFile)
  1335. {
  1336. $startFile = \realpath($startFile);
  1337. $descriptorspec = array(
  1338. STDIN, STDOUT, STDOUT
  1339. );
  1340. $pipes = array();
  1341. $process = \proc_open('"' . PHP_BINARY . '" ' . " \"$startFile\" -q", $descriptorspec, $pipes, null, null, ['bypass_shell' => true]);
  1342. if (empty(static::$globalEvent)) {
  1343. static::$globalEvent = new Select();
  1344. static::$globalEvent->setErrorHandler(function ($exception) {
  1345. static::stopAll(250, $exception);
  1346. });
  1347. Timer::init(static::$globalEvent);
  1348. }
  1349. // 保存子进程句柄
  1350. static::$processForWindows[$startFile] = array($process, $startFile);
  1351. }
  1352. /**
  1353. * check worker status for windows.
  1354. * @return void
  1355. */
  1356. public static function checkWorkerStatusForWindows()
  1357. {
  1358. foreach (static::$processForWindows as $processData) {
  1359. $process = $processData[0];
  1360. $startFile = $processData[1];
  1361. $status = \proc_get_status($process);
  1362. if (isset($status['running'])) {
  1363. if (!$status['running']) {
  1364. static::safeEcho("process $startFile terminated and try to restart\n");
  1365. \proc_close($process);
  1366. static::forkOneWorkerForWindows($startFile);
  1367. }
  1368. } else {
  1369. static::safeEcho("proc_get_status fail\n");
  1370. }
  1371. }
  1372. }
  1373. /**
  1374. * Fork one worker process.
  1375. *
  1376. * @param self $worker
  1377. * @throws Exception
  1378. */
  1379. protected static function forkOneWorkerForLinux(self $worker)
  1380. {
  1381. // Get available worker id.
  1382. $id = static::getId($worker->workerId, 0);
  1383. $pid = \pcntl_fork();
  1384. // For master process.
  1385. if ($pid > 0) {
  1386. static::$pidMap[$worker->workerId][$pid] = $pid;
  1387. static::$idMap[$worker->workerId][$id] = $pid;
  1388. } // For child processes.
  1389. elseif (0 === $pid) {
  1390. \srand();
  1391. \mt_srand();
  1392. static::$gracefulStop = false;
  1393. if (static::$status === static::STATUS_STARTING) {
  1394. static::resetStd();
  1395. }
  1396. static::$pidsToRestart = static::$pidMap = [];
  1397. // Remove other listener.
  1398. foreach (static::$workers as $key => $oneWorker) {
  1399. if ($oneWorker->workerId !== $worker->workerId) {
  1400. $oneWorker->unlisten();
  1401. unset(static::$workers[$key]);
  1402. }
  1403. }
  1404. Timer::delAll();
  1405. //Update process state.
  1406. static::$status = static::STATUS_RUNNING;
  1407. // Register shutdown function for checking errors.
  1408. \register_shutdown_function(["\\Workerman\\Worker", 'checkErrors']);
  1409. // Create a global event loop.
  1410. if (!static::$globalEvent) {
  1411. $eventLoopClass = static::getEventLoopName();
  1412. static::$globalEvent = new $eventLoopClass;
  1413. static::$globalEvent->setErrorHandler(function ($exception) {
  1414. static::stopAll(250, $exception);
  1415. });
  1416. }
  1417. // Reinstall signal.
  1418. static::reinstallSignal();
  1419. // Init Timer.
  1420. Timer::init(static::$globalEvent);
  1421. \restore_error_handler();
  1422. static::setProcessTitle(self::$processTitle . ': worker process ' . $worker->name . ' ' . $worker->getSocketName());
  1423. $worker->setUserAndGroup();
  1424. $worker->id = $id;
  1425. $worker->run();
  1426. // Main loop.
  1427. static::$globalEvent->run();
  1428. if (static::$status !== self::STATUS_SHUTDOWN) {
  1429. $err = new Exception('event-loop exited');
  1430. static::log($err);
  1431. exit(250);
  1432. }
  1433. exit(0);
  1434. } else {
  1435. throw new Exception("forkOneWorker fail");
  1436. }
  1437. }
  1438. /**
  1439. * Get worker id.
  1440. *
  1441. * @param string $workerId
  1442. * @param int $pid
  1443. *
  1444. * @return false|int|string
  1445. */
  1446. protected static function getId(string $workerId, int $pid): bool|int|string
  1447. {
  1448. return \array_search($pid, static::$idMap[$workerId]);
  1449. }
  1450. /**
  1451. * Set unix user and group for current process.
  1452. *
  1453. * @return void
  1454. */
  1455. public function setUserAndGroup()
  1456. {
  1457. // Get uid.
  1458. $userInfo = \posix_getpwnam($this->user);
  1459. if (!$userInfo) {
  1460. static::log("Warning: User {$this->user} not exists");
  1461. return;
  1462. }
  1463. $uid = $userInfo['uid'];
  1464. // Get gid.
  1465. if ($this->group) {
  1466. $groupInfo = \posix_getgrnam($this->group);
  1467. if (!$groupInfo) {
  1468. static::log("Warning: Group {$this->group} not exists");
  1469. return;
  1470. }
  1471. $gid = $groupInfo['gid'];
  1472. } else {
  1473. $gid = $userInfo['gid'];
  1474. }
  1475. // Set uid and gid.
  1476. if ($uid !== \posix_getuid() || $gid !== \posix_getgid()) {
  1477. if (!\posix_setgid($gid) || !\posix_initgroups($userInfo['name'], $gid) || !\posix_setuid($uid)) {
  1478. static::log("Warning: change gid or uid fail.");
  1479. }
  1480. }
  1481. }
  1482. /**
  1483. * Set process name.
  1484. *
  1485. * @param string $title
  1486. * @return void
  1487. */
  1488. protected static function setProcessTitle(string $title)
  1489. {
  1490. \set_error_handler(function () {
  1491. });
  1492. \cli_set_process_title($title);
  1493. \restore_error_handler();
  1494. }
  1495. /**
  1496. * Monitor all child processes.
  1497. *
  1498. * @return void
  1499. */
  1500. protected static function monitorWorkers()
  1501. {
  1502. if (\DIRECTORY_SEPARATOR === '/') {
  1503. static::monitorWorkersForLinux();
  1504. } else {
  1505. static::monitorWorkersForWindows();
  1506. }
  1507. }
  1508. /**
  1509. * Monitor all child processes.
  1510. *
  1511. * @return void
  1512. * @throws Exception
  1513. */
  1514. protected static function monitorWorkersForLinux()
  1515. {
  1516. static::$status = static::STATUS_RUNNING;
  1517. while (1) {
  1518. // Calls signal handlers for pending signals.
  1519. \pcntl_signal_dispatch();
  1520. // Suspends execution of the current process until a child has exited, or until a signal is delivered
  1521. $status = 0;
  1522. $pid = \pcntl_wait($status, \WUNTRACED);
  1523. // Calls signal handlers for pending signals again.
  1524. \pcntl_signal_dispatch();
  1525. // If a child has already exited.
  1526. if ($pid > 0) {
  1527. // Find out which worker process exited.
  1528. foreach (static::$pidMap as $workerId => $workerPidArray) {
  1529. if (isset($workerPidArray[$pid])) {
  1530. $worker = static::$workers[$workerId];
  1531. // Fix exit with status 2 for php8.2
  1532. if ($status === \SIGINT && static::$status === static::STATUS_SHUTDOWN) {
  1533. $status = 0;
  1534. }
  1535. // Exit status.
  1536. if ($status !== 0) {
  1537. static::log("worker[{$worker->name}:$pid] exit with status $status");
  1538. }
  1539. // onWorkerExit
  1540. if (static::$onWorkerExit) {
  1541. try {
  1542. (static::$onWorkerExit)($worker, $status, $pid);
  1543. } catch (Throwable $exception) {
  1544. static::log("worker[{$worker->name}] onWorkerExit $exception");
  1545. }
  1546. }
  1547. // For Statistics.
  1548. if (!isset(static::$globalStatistics['worker_exit_info'][$workerId][$status])) {
  1549. static::$globalStatistics['worker_exit_info'][$workerId][$status] = 0;
  1550. }
  1551. ++static::$globalStatistics['worker_exit_info'][$workerId][$status];
  1552. // Clear process data.
  1553. unset(static::$pidMap[$workerId][$pid]);
  1554. // Mark id is available.
  1555. $id = static::getId($workerId, $pid);
  1556. static::$idMap[$workerId][$id] = 0;
  1557. break;
  1558. }
  1559. }
  1560. // Is still running state then fork a new worker process.
  1561. if (static::$status !== static::STATUS_SHUTDOWN) {
  1562. static::forkWorkers();
  1563. // If reloading continue.
  1564. if (isset(static::$pidsToRestart[$pid])) {
  1565. unset(static::$pidsToRestart[$pid]);
  1566. static::reload();
  1567. }
  1568. }
  1569. }
  1570. // If shutdown state and all child processes exited then master process exit.
  1571. if (static::$status === static::STATUS_SHUTDOWN && !static::getAllWorkerPids()) {
  1572. static::exitAndClearAll();
  1573. }
  1574. }
  1575. }
  1576. /**
  1577. * Monitor all child processes.
  1578. *
  1579. * @return void
  1580. */
  1581. protected static function monitorWorkersForWindows()
  1582. {
  1583. Timer::add(1, "\\Workerman\\Worker::checkWorkerStatusForWindows");
  1584. static::$globalEvent->run();
  1585. }
  1586. /**
  1587. * Exit current process.
  1588. */
  1589. protected static function exitAndClearAll()
  1590. {
  1591. foreach (static::$workers as $worker) {
  1592. $socketName = $worker->getSocketName();
  1593. if ($worker->transport === 'unix' && $socketName) {
  1594. list(, $address) = \explode(':', $socketName, 2);
  1595. $address = substr($address, strpos($address, '/') + 2);
  1596. @\unlink($address);
  1597. }
  1598. }
  1599. @\unlink(static::$pidFile);
  1600. static::log("Workerman[" . \basename(static::$startFile) . "] has been stopped");
  1601. if (static::$onMasterStop) {
  1602. \call_user_func(static::$onMasterStop);
  1603. }
  1604. exit(0);
  1605. }
  1606. /**
  1607. * Execute reload.
  1608. *
  1609. * @return void
  1610. * @throws Exception
  1611. */
  1612. protected static function reload()
  1613. {
  1614. // For master process.
  1615. if (static::$masterPid === \posix_getpid()) {
  1616. $sig = static::$gracefulStop ? \SIGUSR2 : \SIGUSR1;
  1617. // Set reloading state.
  1618. if (static::$status !== static::STATUS_RELOADING && static::$status !== static::STATUS_SHUTDOWN) {
  1619. static::log("Workerman[" . \basename(static::$startFile) . "] reloading");
  1620. static::$status = static::STATUS_RELOADING;
  1621. static::resetStd(false);
  1622. // Try to emit onMasterReload callback.
  1623. if (static::$onMasterReload) {
  1624. try {
  1625. \call_user_func(static::$onMasterReload);
  1626. } catch (Throwable $e) {
  1627. static::stopAll(250, $e);
  1628. }
  1629. static::initId();
  1630. }
  1631. // Send reload signal to all child processes.
  1632. $reloadablePidArray = [];
  1633. foreach (static::$pidMap as $workerId => $workerPidArray) {
  1634. $worker = static::$workers[$workerId];
  1635. if ($worker->reloadable) {
  1636. foreach ($workerPidArray as $pid) {
  1637. $reloadablePidArray[$pid] = $pid;
  1638. }
  1639. } else {
  1640. foreach ($workerPidArray as $pid) {
  1641. // Send reload signal to a worker process which reloadable is false.
  1642. \posix_kill($pid, $sig);
  1643. }
  1644. }
  1645. }
  1646. // Get all pids that are waiting reload.
  1647. static::$pidsToRestart = \array_intersect(static::$pidsToRestart, $reloadablePidArray);
  1648. }
  1649. // Reload complete.
  1650. if (empty(static::$pidsToRestart)) {
  1651. if (static::$status !== static::STATUS_SHUTDOWN) {
  1652. static::$status = static::STATUS_RUNNING;
  1653. }
  1654. return;
  1655. }
  1656. // Continue reload.
  1657. $oneWorkerPid = \current(static::$pidsToRestart);
  1658. // Send reload signal to a worker process.
  1659. \posix_kill($oneWorkerPid, $sig);
  1660. // If the process does not exit after stopTimeout seconds try to kill it.
  1661. if (!static::$gracefulStop) {
  1662. Timer::add(static::$stopTimeout, '\posix_kill', [$oneWorkerPid, \SIGKILL], false);
  1663. }
  1664. } // For child processes.
  1665. else {
  1666. \reset(static::$workers);
  1667. $worker = \current(static::$workers);
  1668. // Try to emit onWorkerReload callback.
  1669. if ($worker->onWorkerReload) {
  1670. try {
  1671. \call_user_func($worker->onWorkerReload, $worker);
  1672. } catch (Throwable $e) {
  1673. static::stopAll(250, $e);
  1674. }
  1675. }
  1676. if ($worker->reloadable) {
  1677. static::stopAll();
  1678. } else {
  1679. static::resetStd(false);
  1680. }
  1681. }
  1682. }
  1683. /**
  1684. * Stop all.
  1685. *
  1686. * @param int $code
  1687. * @param mixed $log
  1688. */
  1689. public static function stopAll(int $code = 0, mixed $log = '')
  1690. {
  1691. if ($log) {
  1692. static::log($log);
  1693. }
  1694. static::$status = static::STATUS_SHUTDOWN;
  1695. // For master process.
  1696. if (\DIRECTORY_SEPARATOR === '/' && static::$masterPid === \posix_getpid()) {
  1697. static::log("Workerman[" . \basename(static::$startFile) . "] stopping ...");
  1698. $workerPidArray = static::getAllWorkerPids();
  1699. // Send stop signal to all child processes.
  1700. $sig = static::$gracefulStop ? \SIGQUIT : \SIGINT;
  1701. foreach ($workerPidArray as $workerPid) {
  1702. // Fix exit with status 2 for php8.2
  1703. if ($sig === \SIGINT && !static::$daemonize) {
  1704. Timer::add(1, '\posix_kill', [$workerPid, \SIGINT], false);
  1705. } else {
  1706. \posix_kill($workerPid, $sig);
  1707. }
  1708. if (!static::$gracefulStop) {
  1709. Timer::add(ceil(static::$stopTimeout), '\posix_kill', [$workerPid, \SIGKILL], false);
  1710. }
  1711. }
  1712. Timer::add(1, "\\Workerman\\Worker::checkIfChildRunning");
  1713. // Remove statistics file.
  1714. if (\is_file(static::$statisticsFile)) {
  1715. @\unlink(static::$statisticsFile);
  1716. }
  1717. } // For child processes.
  1718. else {
  1719. // Execute exit.
  1720. $workers = array_reverse(static::$workers);
  1721. foreach ($workers as $worker) {
  1722. if (!$worker->stopping) {
  1723. $worker->stop();
  1724. $worker->stopping = true;
  1725. }
  1726. }
  1727. if (!static::$gracefulStop || ConnectionInterface::$statistics['connection_count'] <= 0) {
  1728. static::$workers = [];
  1729. if (static::$globalEvent) {
  1730. static::$globalEvent->stop();
  1731. }
  1732. try {
  1733. exit($code);
  1734. } catch (Exception $e) {
  1735. }
  1736. }
  1737. }
  1738. }
  1739. /**
  1740. * check if child processes is really running
  1741. */
  1742. public static function checkIfChildRunning()
  1743. {
  1744. foreach (static::$pidMap as $workerId => $workerPidArray) {
  1745. foreach ($workerPidArray as $pid => $workerPid) {
  1746. if (!\posix_kill($pid, 0)) {
  1747. unset(static::$pidMap[$workerId][$pid]);
  1748. }
  1749. }
  1750. }
  1751. }
  1752. /**
  1753. * Get process status.
  1754. *
  1755. * @return int
  1756. */
  1757. public static function getStatus(): int
  1758. {
  1759. return static::$status;
  1760. }
  1761. /**
  1762. * If stop gracefully.
  1763. *
  1764. * @return bool
  1765. */
  1766. public static function getGracefulStop(): bool
  1767. {
  1768. return static::$gracefulStop;
  1769. }
  1770. /**
  1771. * Write statistics data to disk.
  1772. *
  1773. * @return void
  1774. */
  1775. protected static function writeStatisticsToStatusFile()
  1776. {
  1777. // For master process.
  1778. if (static::$masterPid === \posix_getpid()) {
  1779. $allWorkerInfo = [];
  1780. foreach (static::$pidMap as $workerId => $pidArray) {
  1781. /** @var /Workerman/Worker $worker */
  1782. $worker = static::$workers[$workerId];
  1783. foreach ($pidArray as $pid) {
  1784. $allWorkerInfo[$pid] = ['name' => $worker->name, 'listen' => $worker->getSocketName()];
  1785. }
  1786. }
  1787. \file_put_contents(static::$statisticsFile, \serialize($allWorkerInfo) . "\n", \FILE_APPEND);
  1788. $loadavg = \function_exists('sys_getloadavg') ? \array_map('round', \sys_getloadavg(), [2, 2, 2]) : ['-', '-', '-'];
  1789. \file_put_contents(static::$statisticsFile,
  1790. "----------------------------------------------GLOBAL STATUS----------------------------------------------------\n", \FILE_APPEND);
  1791. \file_put_contents(static::$statisticsFile,
  1792. 'Workerman version:' . static::VERSION . " PHP version:" . \PHP_VERSION . "\n", \FILE_APPEND);
  1793. \file_put_contents(static::$statisticsFile, 'start time:' . \date('Y-m-d H:i:s',
  1794. static::$globalStatistics['start_timestamp']) . ' run ' . \floor((\time() - static::$globalStatistics['start_timestamp']) / (24 * 60 * 60)) . ' days ' . \floor(((\time() - static::$globalStatistics['start_timestamp']) % (24 * 60 * 60)) / (60 * 60)) . " hours \n",
  1795. FILE_APPEND);
  1796. $loadStr = 'load average: ' . \implode(", ", $loadavg);
  1797. \file_put_contents(static::$statisticsFile,
  1798. \str_pad($loadStr, 33) . 'event-loop:' . static::getEventLoopName() . "\n", \FILE_APPEND);
  1799. \file_put_contents(static::$statisticsFile,
  1800. \count(static::$pidMap) . ' workers ' . \count(static::getAllWorkerPids()) . " processes\n",
  1801. \FILE_APPEND);
  1802. \file_put_contents(static::$statisticsFile,
  1803. \str_pad('worker_name', static::$maxWorkerNameLength) . " exit_status exit_count\n", \FILE_APPEND);
  1804. foreach (static::$pidMap as $workerId => $workerPidArray) {
  1805. $worker = static::$workers[$workerId];
  1806. if (isset(static::$globalStatistics['worker_exit_info'][$workerId])) {
  1807. foreach (static::$globalStatistics['worker_exit_info'][$workerId] as $workerExitStatus => $workerExitCount) {
  1808. \file_put_contents(static::$statisticsFile,
  1809. \str_pad($worker->name, static::$maxWorkerNameLength) . " " . \str_pad($workerExitStatus,
  1810. 16) . " $workerExitCount\n", \FILE_APPEND);
  1811. }
  1812. } else {
  1813. \file_put_contents(static::$statisticsFile,
  1814. \str_pad($worker->name, static::$maxWorkerNameLength) . " " . \str_pad(0, 16) . " 0\n",
  1815. \FILE_APPEND);
  1816. }
  1817. }
  1818. \file_put_contents(static::$statisticsFile,
  1819. "----------------------------------------------PROCESS STATUS---------------------------------------------------\n",
  1820. \FILE_APPEND);
  1821. \file_put_contents(static::$statisticsFile,
  1822. "pid\tmemory " . \str_pad('listening', static::$maxSocketNameLength) . " " . \str_pad('worker_name',
  1823. static::$maxWorkerNameLength) . " connections " . \str_pad('send_fail', 9) . " "
  1824. . \str_pad('timers', 8) . \str_pad('total_request', 13) . " qps status\n", \FILE_APPEND);
  1825. \chmod(static::$statisticsFile, 0722);
  1826. foreach (static::getAllWorkerPids() as $workerPid) {
  1827. \posix_kill($workerPid, \SIGIOT);
  1828. }
  1829. return;
  1830. }
  1831. // For child processes.
  1832. \gc_collect_cycles();
  1833. if (\function_exists('gc_mem_caches')) {
  1834. \gc_mem_caches();
  1835. }
  1836. \reset(static::$workers);
  1837. /** @var static $worker */
  1838. $worker = current(static::$workers);
  1839. $workerStatusStr = \posix_getpid() . "\t" . \str_pad(round(memory_get_usage() / (1024 * 1024), 2) . "M", 7)
  1840. . " " . \str_pad($worker->getSocketName(), static::$maxSocketNameLength) . " "
  1841. . \str_pad(($worker->name === $worker->getSocketName() ? 'none' : $worker->name), static::$maxWorkerNameLength)
  1842. . " ";
  1843. $workerStatusStr .= \str_pad(ConnectionInterface::$statistics['connection_count'], 11)
  1844. . " " . \str_pad(ConnectionInterface::$statistics['send_fail'], 9)
  1845. . " " . \str_pad(static::$globalEvent->getTimerCount(), 7)
  1846. . " " . \str_pad(ConnectionInterface::$statistics['total_request'], 13) . "\n";
  1847. \file_put_contents(static::$statisticsFile, $workerStatusStr, \FILE_APPEND);
  1848. }
  1849. /**
  1850. * Write statistics data to disk.
  1851. *
  1852. * @return void
  1853. */
  1854. protected static function writeConnectionsStatisticsToStatusFile()
  1855. {
  1856. // For master process.
  1857. if (static::$masterPid === \posix_getpid()) {
  1858. \file_put_contents(static::$statisticsFile, "--------------------------------------------------------------------- WORKERMAN CONNECTION STATUS --------------------------------------------------------------------------------\n", \FILE_APPEND);
  1859. \file_put_contents(static::$statisticsFile, "PID Worker CID Trans Protocol ipv4 ipv6 Recv-Q Send-Q Bytes-R Bytes-W Status Local Address Foreign Address\n", \FILE_APPEND);
  1860. \chmod(static::$statisticsFile, 0722);
  1861. foreach (static::getAllWorkerPids() as $workerPid) {
  1862. \posix_kill($workerPid, \SIGIO);
  1863. }
  1864. return;
  1865. }
  1866. // For child processes.
  1867. $bytesFormat = function ($bytes) {
  1868. if ($bytes > 1024 * 1024 * 1024 * 1024) {
  1869. return round($bytes / (1024 * 1024 * 1024 * 1024), 1) . "TB";
  1870. }
  1871. if ($bytes > 1024 * 1024 * 1024) {
  1872. return round($bytes / (1024 * 1024 * 1024), 1) . "GB";
  1873. }
  1874. if ($bytes > 1024 * 1024) {
  1875. return round($bytes / (1024 * 1024), 1) . "MB";
  1876. }
  1877. if ($bytes > 1024) {
  1878. return round($bytes / (1024), 1) . "KB";
  1879. }
  1880. return $bytes . "B";
  1881. };
  1882. $pid = \posix_getpid();
  1883. $str = '';
  1884. \reset(static::$workers);
  1885. $currentWorker = current(static::$workers);
  1886. $defaultWorkerName = $currentWorker->name;
  1887. /** @var static $worker */
  1888. foreach (TcpConnection::$connections as $connection) {
  1889. /** @var \Workerman\Connection\TcpConnection $connection */
  1890. $transport = $connection->transport;
  1891. $ipv4 = $connection->isIpV4() ? ' 1' : ' 0';
  1892. $ipv6 = $connection->isIpV6() ? ' 1' : ' 0';
  1893. $recvQ = $bytesFormat($connection->getRecvBufferQueueSize());
  1894. $sendQ = $bytesFormat($connection->getSendBufferQueueSize());
  1895. $localAddress = \trim($connection->getLocalAddress());
  1896. $remoteAddress = \trim($connection->getRemoteAddress());
  1897. $state = $connection->getStatus(false);
  1898. $bytesRead = $bytesFormat($connection->bytesRead);
  1899. $bytesWritten = $bytesFormat($connection->bytesWritten);
  1900. $id = $connection->id;
  1901. $protocol = $connection->protocol ? $connection->protocol : $connection->transport;
  1902. $pos = \strrpos($protocol, '\\');
  1903. if ($pos) {
  1904. $protocol = \substr($protocol, $pos + 1);
  1905. }
  1906. if (\strlen($protocol) > 15) {
  1907. $protocol = \substr($protocol, 0, 13) . '..';
  1908. }
  1909. $workerName = isset($connection->worker) ? $connection->worker->name : $defaultWorkerName;
  1910. if (\strlen($workerName) > 14) {
  1911. $workerName = \substr($workerName, 0, 12) . '..';
  1912. }
  1913. $str .= \str_pad($pid, 9) . \str_pad($workerName, 16) . \str_pad($id, 10) . \str_pad($transport, 8)
  1914. . \str_pad($protocol, 16) . \str_pad($ipv4, 7) . \str_pad($ipv6, 7) . \str_pad($recvQ, 13)
  1915. . \str_pad($sendQ, 13) . \str_pad($bytesRead, 13) . \str_pad($bytesWritten, 13) . ' '
  1916. . \str_pad($state, 14) . ' ' . \str_pad($localAddress, 22) . ' ' . \str_pad($remoteAddress, 22) . "\n";
  1917. }
  1918. if ($str) {
  1919. \file_put_contents(static::$statisticsFile, $str, \FILE_APPEND);
  1920. }
  1921. }
  1922. /**
  1923. * Check errors when current process exited.
  1924. *
  1925. * @return void
  1926. */
  1927. public static function checkErrors()
  1928. {
  1929. if (static::STATUS_SHUTDOWN !== static::$status) {
  1930. $errorMsg = \DIRECTORY_SEPARATOR === '/' ? 'Worker[' . \posix_getpid() . '] process terminated' : 'Worker process terminated';
  1931. $errors = error_get_last();
  1932. if ($errors && ($errors['type'] === \E_ERROR ||
  1933. $errors['type'] === \E_PARSE ||
  1934. $errors['type'] === \E_CORE_ERROR ||
  1935. $errors['type'] === \E_COMPILE_ERROR ||
  1936. $errors['type'] === \E_RECOVERABLE_ERROR)
  1937. ) {
  1938. $errorMsg .= ' with ERROR: ' . static::getErrorType($errors['type']) . " \"{$errors['message']} in {$errors['file']} on line {$errors['line']}\"";
  1939. }
  1940. static::log($errorMsg);
  1941. }
  1942. }
  1943. /**
  1944. * Get error message by error code.
  1945. *
  1946. * @param int $type
  1947. * @return string
  1948. */
  1949. protected static function getErrorType(int $type): string
  1950. {
  1951. return self::ERROR_TYPE[$type] ?? '';
  1952. }
  1953. /**
  1954. * Log.
  1955. *
  1956. * @param mixed $msg
  1957. * @return void
  1958. */
  1959. public static function log(mixed $msg)
  1960. {
  1961. $msg = $msg . "\n";
  1962. if (!static::$daemonize) {
  1963. static::safeEcho($msg);
  1964. }
  1965. \file_put_contents(static::$logFile, \date('Y-m-d H:i:s') . ' ' . 'pid:'
  1966. . (\DIRECTORY_SEPARATOR === '/' ? \posix_getpid() : 1) . ' ' . $msg, \FILE_APPEND | \LOCK_EX);
  1967. }
  1968. /**
  1969. * Safe Echo.
  1970. * @param string $msg
  1971. * @param bool $decorated
  1972. * @return bool
  1973. */
  1974. public static function safeEcho(string $msg, bool $decorated = false): bool
  1975. {
  1976. $stream = static::outputStream();
  1977. if (!$stream) {
  1978. return false;
  1979. }
  1980. if (!$decorated) {
  1981. $line = $white = $green = $end = '';
  1982. if (static::$outputDecorated) {
  1983. $line = "\033[1A\n\033[K";
  1984. $white = "\033[47;30m";
  1985. $green = "\033[32;40m";
  1986. $end = "\033[0m";
  1987. }
  1988. $msg = \str_replace(['<n>', '<w>', '<g>'], [$line, $white, $green], $msg);
  1989. $msg = \str_replace(['</n>', '</w>', '</g>'], $end, $msg);
  1990. } elseif (!static::$outputDecorated) {
  1991. return false;
  1992. }
  1993. \fwrite($stream, $msg);
  1994. \fflush($stream);
  1995. return true;
  1996. }
  1997. /**
  1998. * set and get output stream.
  1999. *
  2000. * @param resource|null $stream
  2001. * @return false|resource
  2002. */
  2003. private static function outputStream($stream = null)
  2004. {
  2005. if (!$stream) {
  2006. $stream = static::$outputStream ?: \STDOUT;
  2007. }
  2008. if (!$stream || !\is_resource($stream) || 'stream' !== \get_resource_type($stream)) {
  2009. return false;
  2010. }
  2011. $stat = \fstat($stream);
  2012. if (!$stat) {
  2013. return false;
  2014. }
  2015. if (($stat['mode'] & 0170000) === 0100000) { // whether is regular file
  2016. static::$outputDecorated = false;
  2017. } else {
  2018. static::$outputDecorated =
  2019. \DIRECTORY_SEPARATOR === '/' && // linux or unix
  2020. \function_exists('posix_isatty') &&
  2021. \posix_isatty($stream); // whether is interactive terminal
  2022. }
  2023. return static::$outputStream = $stream;
  2024. }
  2025. /**
  2026. * Constructor.
  2027. *
  2028. * @param string|null $socketName
  2029. * @param array $socketContext
  2030. */
  2031. public function __construct(string $socketName = null, array $socketContext = [])
  2032. {
  2033. // Save all worker instances.
  2034. $this->workerId = \spl_object_hash($this);
  2035. $this->context = new stdClass();
  2036. static::$workers[$this->workerId] = $this;
  2037. static::$pidMap[$this->workerId] = [];
  2038. // Context for socket.
  2039. if ($socketName) {
  2040. $this->socketName = $socketName;
  2041. if (!isset($socketContext['socket']['backlog'])) {
  2042. $socketContext['socket']['backlog'] = static::DEFAULT_BACKLOG;
  2043. }
  2044. $this->socketContext = \stream_context_create($socketContext);
  2045. }
  2046. // Try to turn reusePort on.
  2047. /*if (\DIRECTORY_SEPARATOR === '/' // if linux
  2048. && $socketName
  2049. && \version_compare(php_uname('r'), '3.9', 'ge') // if kernel >=3.9
  2050. && \strtolower(\php_uname('s')) !== 'darwin' // if not Mac OS
  2051. && strpos($socketName,'unix') !== 0 // if not unix socket
  2052. && strpos($socketName,'udp') !== 0) { // if not udp socket
  2053. $address = \parse_url($socketName);
  2054. if (isset($address['host']) && isset($address['port'])) {
  2055. try {
  2056. \set_error_handler(function(){});
  2057. // If address not in use, turn reusePort on automatically.
  2058. $server = stream_socket_server("tcp://{$address['host']}:{$address['port']}");
  2059. if ($server) {
  2060. $this->reusePort = true;
  2061. fclose($server);
  2062. }
  2063. \restore_error_handler();
  2064. } catch (\Throwable $e) {}
  2065. }
  2066. }*/
  2067. }
  2068. /**
  2069. * Listen.
  2070. *
  2071. * @throws Exception
  2072. */
  2073. public function listen()
  2074. {
  2075. if (!$this->socketName) {
  2076. return;
  2077. }
  2078. if (!$this->mainSocket) {
  2079. $localSocket = $this->parseSocketAddress();
  2080. // Flag.
  2081. $flags = $this->transport === 'udp' ? \STREAM_SERVER_BIND : \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN;
  2082. $errno = 0;
  2083. $errmsg = '';
  2084. // SO_REUSEPORT.
  2085. if ($this->reusePort) {
  2086. \stream_context_set_option($this->socketContext, 'socket', 'so_reuseport', 1);
  2087. }
  2088. // Create an Internet or Unix domain server socket.
  2089. $this->mainSocket = \stream_socket_server($localSocket, $errno, $errmsg, $flags, $this->socketContext);
  2090. if (!$this->mainSocket) {
  2091. throw new Exception($errmsg);
  2092. }
  2093. if ($this->transport === 'ssl') {
  2094. \stream_socket_enable_crypto($this->mainSocket, false);
  2095. } elseif ($this->transport === 'unix') {
  2096. $socketFile = \substr($localSocket, 7);
  2097. if ($this->user) {
  2098. \chown($socketFile, $this->user);
  2099. }
  2100. if ($this->group) {
  2101. \chgrp($socketFile, $this->group);
  2102. }
  2103. }
  2104. // Try to open keepalive for tcp and disable Nagle algorithm.
  2105. if (\function_exists('socket_import_stream') && self::BUILD_IN_TRANSPORTS[$this->transport] === 'tcp') {
  2106. \set_error_handler(function () {
  2107. });
  2108. $socket = \socket_import_stream($this->mainSocket);
  2109. \socket_set_option($socket, \SOL_SOCKET, \SO_KEEPALIVE, 1);
  2110. \socket_set_option($socket, \SOL_TCP, \TCP_NODELAY, 1);
  2111. \restore_error_handler();
  2112. }
  2113. // Non blocking.
  2114. \stream_set_blocking($this->mainSocket, false);
  2115. }
  2116. $this->resumeAccept();
  2117. }
  2118. /**
  2119. * Unlisten.
  2120. *
  2121. * @return void
  2122. */
  2123. public function unlisten()
  2124. {
  2125. $this->pauseAccept();
  2126. if ($this->mainSocket) {
  2127. \set_error_handler(function () {
  2128. });
  2129. \fclose($this->mainSocket);
  2130. \restore_error_handler();
  2131. $this->mainSocket = null;
  2132. }
  2133. }
  2134. /**
  2135. * Parse local socket address.
  2136. *
  2137. * @throws Exception
  2138. */
  2139. protected function parseSocketAddress(): ?string
  2140. {
  2141. if (!$this->socketName) {
  2142. return null;
  2143. }
  2144. // Get the application layer communication protocol and listening address.
  2145. list($scheme, $address) = \explode(':', $this->socketName, 2);
  2146. // Check application layer protocol class.
  2147. if (!isset(self::BUILD_IN_TRANSPORTS[$scheme])) {
  2148. $scheme = \ucfirst($scheme);
  2149. $this->protocol = \substr($scheme, 0, 1) === '\\' ? $scheme : 'Protocols\\' . $scheme;
  2150. if (!\class_exists($this->protocol)) {
  2151. $this->protocol = "Workerman\\Protocols\\$scheme";
  2152. if (!\class_exists($this->protocol)) {
  2153. throw new Exception("class \\Protocols\\$scheme not exist");
  2154. }
  2155. }
  2156. if (!isset(self::BUILD_IN_TRANSPORTS[$this->transport])) {
  2157. throw new Exception('Bad worker->transport ' . \var_export($this->transport, true));
  2158. }
  2159. } else {
  2160. if ($this->transport === 'tcp') {
  2161. $this->transport = $scheme;
  2162. }
  2163. }
  2164. //local socket
  2165. return self::BUILD_IN_TRANSPORTS[$this->transport] . ":" . $address;
  2166. }
  2167. /**
  2168. * Pause accept new connections.
  2169. *
  2170. * @return void
  2171. */
  2172. public function pauseAccept()
  2173. {
  2174. if (static::$globalEvent && false === $this->pauseAccept && $this->mainSocket) {
  2175. static::$globalEvent->offReadable($this->mainSocket);
  2176. $this->pauseAccept = true;
  2177. }
  2178. }
  2179. /**
  2180. * Resume accept new connections.
  2181. *
  2182. * @return void
  2183. */
  2184. public function resumeAccept()
  2185. {
  2186. // Register a listener to be notified when server socket is ready to read.
  2187. if (static::$globalEvent && true === $this->pauseAccept && $this->mainSocket) {
  2188. if ($this->transport !== 'udp') {
  2189. static::$globalEvent->onReadable($this->mainSocket, [$this, 'acceptTcpConnection']);
  2190. } else {
  2191. static::$globalEvent->onReadable($this->mainSocket, [$this, 'acceptUdpConnection']);
  2192. }
  2193. $this->pauseAccept = false;
  2194. }
  2195. }
  2196. /**
  2197. * Get socket name.
  2198. *
  2199. * @return string
  2200. */
  2201. public function getSocketName(): string
  2202. {
  2203. return $this->socketName ? \lcfirst($this->socketName) : 'none';
  2204. }
  2205. /**
  2206. * Run worker instance.
  2207. *
  2208. * @return void
  2209. * @throws Throwable
  2210. */
  2211. public function run()
  2212. {
  2213. $this->listen();
  2214. // Try to emit onWorkerStart callback.
  2215. if ($this->onWorkerStart) {
  2216. try {
  2217. ($this->onWorkerStart)($this);
  2218. } catch (Throwable $e) {
  2219. // Avoid rapid infinite loop exit.
  2220. sleep(1);
  2221. static::stopAll(250, $e);
  2222. }
  2223. }
  2224. }
  2225. /**
  2226. * Stop current worker instance.
  2227. *
  2228. * @return void
  2229. */
  2230. public function stop()
  2231. {
  2232. // Try to emit onWorkerStop callback.
  2233. if ($this->onWorkerStop) {
  2234. try {
  2235. ($this->onWorkerStop)($this);
  2236. } catch (Throwable $e) {
  2237. static::log($e);
  2238. }
  2239. }
  2240. // Remove listener for server socket.
  2241. $this->unlisten();
  2242. // Close all connections for the worker.
  2243. if (!static::$gracefulStop) {
  2244. foreach ($this->connections as $connection) {
  2245. $connection->close();
  2246. }
  2247. }
  2248. // Remove worker.
  2249. foreach(static::$workers as $key => $one_worker) {
  2250. if ($one_worker->workerId === $this->workerId) {
  2251. unset(static::$workers[$key]);
  2252. }
  2253. }
  2254. // Clear callback.
  2255. $this->onMessage = $this->onClose = $this->onError = $this->onBufferDrain = $this->onBufferFull = null;
  2256. }
  2257. /**
  2258. * Accept a connection.
  2259. *
  2260. * @param resource $socket
  2261. * @return void
  2262. */
  2263. public function acceptTcpConnection($socket)
  2264. {
  2265. // Accept a connection on server socket.
  2266. \set_error_handler(function () {
  2267. });
  2268. $newSocket = \stream_socket_accept($socket, 0, $remoteAddress);
  2269. \restore_error_handler();
  2270. // Thundering herd.
  2271. if (!$newSocket) {
  2272. return;
  2273. }
  2274. // TcpConnection.
  2275. $connection = new TcpConnection(static::$globalEvent, $newSocket, $remoteAddress);
  2276. $this->connections[$connection->id] = $connection;
  2277. $connection->worker = $this;
  2278. $connection->protocol = $this->protocol;
  2279. $connection->transport = $this->transport;
  2280. $connection->onMessage = $this->onMessage;
  2281. $connection->onClose = $this->onClose;
  2282. $connection->onError = $this->onError;
  2283. $connection->onBufferDrain = $this->onBufferDrain;
  2284. $connection->onBufferFull = $this->onBufferFull;
  2285. // Try to emit onConnect callback.
  2286. if ($this->onConnect) {
  2287. try {
  2288. ($this->onConnect)($connection);
  2289. } catch (Throwable $e) {
  2290. static::stopAll(250, $e);
  2291. }
  2292. }
  2293. }
  2294. /**
  2295. * For udp package.
  2296. *
  2297. * @param resource $socket
  2298. * @return bool
  2299. */
  2300. public function acceptUdpConnection($socket): bool
  2301. {
  2302. \set_error_handler(function () {
  2303. });
  2304. $recvBuffer = \stream_socket_recvfrom($socket, UdpConnection::MAX_UDP_PACKAGE_SIZE, 0, $remoteAddress);
  2305. \restore_error_handler();
  2306. if (false === $recvBuffer || empty($remoteAddress)) {
  2307. return false;
  2308. }
  2309. // UdpConnection.
  2310. $connection = new UdpConnection($socket, $remoteAddress);
  2311. $connection->protocol = $this->protocol;
  2312. $messageCallback = $this->onMessage;
  2313. if ($messageCallback) {
  2314. try {
  2315. if ($this->protocol !== null) {
  2316. /** @var \Workerman\Protocols\ProtocolInterface $parser */
  2317. $parser = $this->protocol;
  2318. if ($parser && \method_exists($parser, 'input')) {
  2319. while ($recvBuffer !== '') {
  2320. $len = $parser::input($recvBuffer, $connection);
  2321. if ($len === 0)
  2322. return true;
  2323. $package = \substr($recvBuffer, 0, $len);
  2324. $recvBuffer = \substr($recvBuffer, $len);
  2325. $data = $parser::decode($package, $connection);
  2326. if ($data === false) {
  2327. continue;
  2328. }
  2329. $messageCallback($connection, $data);
  2330. }
  2331. } else {
  2332. $data = $parser::decode($recvBuffer, $connection);
  2333. // Discard bad packets.
  2334. if ($data === false) {
  2335. return true;
  2336. }
  2337. $messageCallback($connection, $data);
  2338. }
  2339. } else {
  2340. $messageCallback($connection, $recvBuffer);
  2341. }
  2342. ++ConnectionInterface::$statistics['total_request'];
  2343. } catch (Throwable $e) {
  2344. static::stopAll(250, $e);
  2345. }
  2346. }
  2347. return true;
  2348. }
  2349. /**
  2350. * Check master process is alive
  2351. *
  2352. * @param int $masterPid
  2353. * @return bool
  2354. */
  2355. protected static function checkMasterIsAlive(int $masterPid): bool
  2356. {
  2357. if (empty($masterPid)) {
  2358. return false;
  2359. }
  2360. $masterIsAlive = $masterPid && \posix_kill($masterPid, 0) && \posix_getpid() !== $masterPid;
  2361. if (!$masterIsAlive) {
  2362. return false;
  2363. }
  2364. $cmdline = "/proc/{$masterPid}/cmdline";
  2365. if (!is_readable($cmdline) || empty(static::$processTitle)) {
  2366. return true;
  2367. }
  2368. $content = file_get_contents($cmdline);
  2369. if (empty($content)) {
  2370. return true;
  2371. }
  2372. return stripos($content, static::$processTitle) !== false || stripos($content, 'php') !== false;
  2373. }
  2374. }