walkor 8 жил өмнө
parent
commit
58a16cb06d
2 өөрчлөгдсөн 242 нэмэгдсэн , 25 устгасан
  1. 3 1
      Lib/Timer.php
  2. 239 24
      Worker.php

+ 3 - 1
Lib/Timer.php

@@ -54,7 +54,9 @@ class Timer
         if ($event) {
             self::$_event = $event;
         } else {
-            pcntl_signal(SIGALRM, array('\Workerman\Lib\Timer', 'signalHandle'), false);
+            if (function_exists('pcntl_signal')) {
+                pcntl_signal(SIGALRM, array('\Workerman\Lib\Timer', 'signalHandle'), false);
+            }
         }
     }
 

+ 239 - 24
Worker.php

@@ -12,7 +12,6 @@
  * @license   http://www.opensource.org/licenses/mit-license.php MIT License
  */
 namespace Workerman;
-
 require_once __DIR__ . '/Lib/Constants.php';
 
 use Workerman\Events\EventInterface;
@@ -20,6 +19,7 @@ use Workerman\Connection\ConnectionInterface;
 use Workerman\Connection\TcpConnection;
 use Workerman\Connection\UdpConnection;
 use Workerman\Lib\Timer;
+use Workerman\Events\Select;
 use Exception;
 
 /**
@@ -33,7 +33,7 @@ class Worker
      *
      * @var string
      */
-    const VERSION = '3.5.2';
+    const VERSION = '3.5.3';
 
     /**
      * Status starting.
@@ -213,7 +213,7 @@ class Worker
     /**
      * Application layer protocol.
      *
-     * @var Protocols\ProtocolInterface
+     * @var string
      */
     public $protocol = null;
 
@@ -389,6 +389,20 @@ class Worker
     protected static $_startFile = '';
 
     /**
+     * OS.
+     *
+     * @var string
+     */
+    protected static $_OS = 'linux';
+
+    /**
+     * Processes for windows.
+     *
+     * @var array
+     */
+    protected static $_processForWindows = array();
+
+    /**
      * Status info of current worker process.
      *
      * @var array
@@ -458,6 +472,9 @@ class Worker
         if (php_sapi_name() != "cli") {
             exit("only run in command line mode \n");
         }
+        if (DIRECTORY_SEPARATOR === '\\') {
+            self::$_OS = 'windows';
+        }
     }
 
     /**
@@ -513,6 +530,9 @@ class Worker
      */
     protected static function initWorkers()
     {
+        if (static::$_OS !== 'linux') {
+            return;
+        }
         foreach (static::$_workers as $worker) {
             // Worker name.
             if (empty($worker->name)) {
@@ -532,11 +552,13 @@ class Worker
             }
 
             // Get unix user of the worker process.
-            if (empty($worker->user)) {
-                $worker->user = static::getCurrentUser();
-            } else {
-                if (posix_getuid() !== 0 && $worker->user != static::getCurrentUser()) {
-                    static::log('Warning: You must have the root privileges to change uid and gid.');
+            if (static::$_OS === 'linux') {
+                if (empty($worker->user)) {
+                    $worker->user = static::getCurrentUser();
+                } else {
+                    if (posix_getuid() !== 0 && $worker->user != static::getCurrentUser()) {
+                        static::log('Warning: You must have the root privileges to change uid and gid.');
+                    }
                 }
             }
 
@@ -606,9 +628,17 @@ class Worker
      */
     protected static function displayUI()
     {
-        static::safeEcho("\033[1A\n\033[K-----------------------\033[47;30m WORKERMAN \033[0m-----------------------------\n\033[0m");
-        static::safeEcho('Workerman version:'. Worker::VERSION. "          PHP version:". PHP_VERSION. "\n");
-        static::safeEcho("------------------------\033[47;30m WORKERS \033[0m-------------------------------\n");
+        global $argv;
+        if (isset($argv[1]) && $argv[1] === '-q') {
+            return;
+        }
+        static::safeEcho("\033[1A\n\033[K-----------------------\033[47;30m WORKERMAN \033[0m-----------------------------\r\n\033[0m");
+        static::safeEcho('Workerman version:'. static::VERSION. "          PHP version:". PHP_VERSION. "\r\n");
+        static::safeEcho("------------------------\033[47;30m WORKERS \033[0m-------------------------------\r\n");
+        if (static::$_OS !== 'linux') {
+            static::safeEcho("worker               listen                              processes status\r\n");
+            return;
+        }
         static::safeEcho("\033[47;30muser\033[0m". str_pad('',
                 static::$_maxUserNameLength + 2 - strlen('user')). "\033[47;30mworker\033[0m". str_pad('',
                 static::$_maxWorkerNameLength + 2 - strlen('worker')). "\033[47;30mlisten\033[0m". str_pad('',
@@ -637,6 +667,9 @@ class Worker
      */
     protected static function parseCommand()
     {
+        if (static::$_OS !== 'linux') {
+            return;
+        }
         global $argv;
         // Check argv;
         $start_file = $argv[0];
@@ -660,7 +693,7 @@ class Worker
         // Start command.
         $mode = '';
         if ($command === 'start') {
-            if ($command2 === '-d' || Worker::$daemonize) {
+            if ($command2 === '-d' || static::$daemonize) {
                 $mode = 'in DAEMON mode';
             } else {
                 $mode = 'in DEBUG mode';
@@ -686,7 +719,7 @@ class Worker
         switch ($command) {
             case 'start':
                 if ($command2 === '-d') {
-                    Worker::$daemonize = true;
+                    static::$daemonize = true;
                 }
                 break;
             case 'status':
@@ -756,7 +789,7 @@ class Worker
                         exit(0);
                     }
                     if ($command2 === '-d') {
-                        Worker::$daemonize = true;
+                        static::$daemonize = true;
                     }
                     break;
                 }
@@ -838,6 +871,9 @@ class Worker
      */
     protected static function installSignal()
     {
+        if (static::$_OS !== 'linux') {
+            return;
+        }
         // stop
         pcntl_signal(SIGINT, array('\Workerman\Worker', 'signalHandler'), false);
         // graceful stop
@@ -861,6 +897,9 @@ class Worker
      */
     protected static function reinstallSignal()
     {
+        if (static::$_OS !== 'linux') {
+            return;
+        }
         // uninstall stop signal handler
         pcntl_signal(SIGINT, SIG_IGN, false);
         // uninstall graceful stop signal handler
@@ -984,6 +1023,9 @@ class Worker
      */
     protected static function saveMasterPid()
     {
+        if (static::$_OS !== 'linux') {
+            return;
+        }
         static::$_masterPid = posix_getpid();
         if (false === @file_put_contents(static::$pidFile, static::$_masterPid)) {
             throw new Exception('can not save pid to ' . static::$pidFile);
@@ -1054,6 +1096,21 @@ class Worker
      */
     protected static function forkWorkers()
     {
+        if (static::$_OS === 'linux') {
+            static::forkWorkersForLinux();
+        } else {
+            static::forkWorkersForWindows();
+        }
+    }
+
+    /**
+     * Fork some worker processes.
+     *
+     * @return void
+     */
+    protected static function forkWorkersForLinux()
+    {
+
         foreach (static::$_workers as $worker) {
             if (static::$_status === static::STATUS_STARTING) {
                 if (empty($worker->name)) {
@@ -1067,18 +1124,148 @@ class Worker
 
             $worker->count = $worker->count <= 0 ? 1 : $worker->count;
             while (count(static::$_pidMap[$worker->workerId]) < $worker->count) {
-                static::forkOneWorker($worker);
+                static::forkOneWorkerForLinux($worker);
+            }
+        }
+    }
+
+    /**
+     * Fork some worker processes.
+     *
+     * @return void
+     */
+    public static function forkWorkersForWindows()
+    {
+        $files = static::getStartFilesForWindows();
+        global $argv;
+        if(isset($argv[1]) && $argv[1] === '-q')
+        {
+            if(count(static::$_workers) > 1)
+            {
+                echo "@@@ Error: multi workers init in one php file are not support @@@\r\n";
+                echo "@@@ Please visit http://wiki.workerman.net/Multi_woker_for_win @@@\r\n";
+            }
+            elseif(count(static::$_workers) <= 0)
+            {
+                exit("@@@no worker inited@@@\r\n\r\n");
+            }
+
+            reset(static::$_workers);
+            /** @var Worker $worker */
+            $worker = current(static::$_workers);
+
+            // Display UI.
+            echo str_pad($worker->name, 21) . str_pad($worker->getSocketName(), 36) . str_pad($worker->count, 10) . "[ok]\n";
+            $worker->listen();
+            $worker->run();
+            exit("@@@child exit@@@\r\n");
+        }
+        else
+        {
+            static::$globalEvent = new \Workerman\Events\Select();
+            Timer::init(static::$globalEvent);
+            foreach($files as $start_file)
+            {
+                static::forkOneWorkerForWindows($start_file);
+            }
+        }
+    }
+
+    /**
+     * Get start files for windows.
+     *
+     * @return array
+     */
+    public static function getStartFilesForWindows() {
+        global $argv;
+        $files = array();
+        foreach($argv as $file)
+        {
+            $ext = pathinfo($file, PATHINFO_EXTENSION );
+            if($ext !== 'php')
+            {
+                continue;
+            }
+            if(is_file($file))
+            {
+                $files[$file] = $file;
             }
         }
+        return $files;
     }
 
     /**
      * Fork one worker process.
      *
-     * @param Worker $worker
+     * @param string $start_file
+     */
+    public static function forkOneWorkerForWindows($start_file)
+    {
+        $start_file = realpath($start_file);
+        $std_file = sys_get_temp_dir() . '/'.str_replace(array('/', "\\", ':'), '_', $start_file).'.out.txt';
+
+        $descriptorspec = array(
+            0 => array('pipe', 'a'), // stdin
+            1 => array('file', $std_file, 'w'), // stdout
+            2 => array('file', $std_file, 'w') // stderr
+        );
+
+
+        $pipes       = array();
+        $process     = proc_open("php \"$start_file\" -q", $descriptorspec, $pipes);
+        $std_handler = fopen($std_file, 'a+');
+        stream_set_blocking($std_handler, 0);
+
+        if (empty(static::$globalEvent)) {
+            static::$globalEvent = new Select();
+            Timer::init(static::$globalEvent);
+        }
+        $timer_id = Timer::add(1, function()use($std_handler)
+        {
+            echo fread($std_handler, 65535);
+        });
+
+        // 保存子进程句柄
+        static::$_processForWindows[$start_file] = array($process, $start_file, $timer_id);
+    }
+
+    /**
+     * check worker status for windows.
+     * @return void
+     */
+    public static function checkWorkerStatusForWindows()
+    {
+        foreach(static::$_processForWindows as $process_data)
+        {
+            $process = $process_data[0];
+            $start_file = $process_data[1];
+            $timer_id = $process_data[2];
+            $status = proc_get_status($process);
+            if(isset($status['running']))
+            {
+                if(!$status['running'])
+                {
+                    echo "process $start_file terminated and try to restart\n";
+                    Timer::del($timer_id);
+                    @proc_close($process);
+                    static::forkOneWorkerForWindows($start_file);
+                }
+            }
+            else
+            {
+                echo "proc_get_status fail\n";
+            }
+        }
+    }
+
+
+    /**
+     * Fork one worker process.
+     *
+     * @param \Workerman\Worker $worker
      * @throws Exception
      */
-    protected static function forkOneWorker($worker)
+    protected static function forkOneWorkerForLinux($worker)
     {
         // Get available worker id.
         $id = static::getId($worker->workerId, 0);
@@ -1184,6 +1371,20 @@ class Worker
      */
     protected static function monitorWorkers()
     {
+        if (static::$_OS === 'linux') {
+            static::monitorWorkersForLinux();
+        } else {
+            static::monitorWorkersForWindows();
+        }
+    }
+
+    /**
+     * Monitor all child processes.
+     *
+     * @return void
+     */
+    protected static function monitorWorkersForLinux()
+    {
         static::$_status = static::STATUS_RUNNING;
         while (1) {
             // Calls signal handlers for pending signals.
@@ -1244,6 +1445,18 @@ class Worker
     }
 
     /**
+     * Monitor all child processes.
+     *
+     * @return void
+     */
+    protected static function monitorWorkersForWindows()
+    {
+        Timer::add(0.5, "\\Workerman\\Worker::checkWorkerStatusForWindows");
+
+        static::$globalEvent->loop();
+    }
+
+    /**
      * Exit current process.
      *
      * @return void
@@ -1428,7 +1641,7 @@ class Worker
         if (static::$_masterPid === posix_getpid()) {
             $all_worker_info = array();
             foreach(static::$_pidMap as $worker_id => $pid_array) {
-                /** @var Worker $worker */
+                /** @var /Workerman/Worker $worker */
                 $worker = static::$_workers[$worker_id];
                 foreach($pid_array as $pid) {
                     $all_worker_info[$pid] = array('name' => $worker->name, 'listen' => $worker->getSocketName());
@@ -1440,7 +1653,7 @@ class Worker
             file_put_contents(static::$_statisticsFile,
                 "----------------------------------------------GLOBAL STATUS----------------------------------------------------\n", FILE_APPEND);
             file_put_contents(static::$_statisticsFile,
-                'Workerman version:' . Worker::VERSION . "          PHP version:" . PHP_VERSION . "\n", FILE_APPEND);
+                'Workerman version:' . static::VERSION . "          PHP version:" . PHP_VERSION . "\n", FILE_APPEND);
             file_put_contents(static::$_statisticsFile, 'start time:' . date('Y-m-d H:i:s',
                     static::$_globalStatistics['start_timestamp']) . '   run ' . floor((time() - static::$_globalStatistics['start_timestamp']) / (24 * 60 * 60)) . ' days ' . floor(((time() - static::$_globalStatistics['start_timestamp']) % (24 * 60 * 60)) / (60 * 60)) . " hours   \n",
                 FILE_APPEND);
@@ -1483,7 +1696,7 @@ class Worker
         }
 
         // For child processes.
-        /** @var Worker $worker */
+        /** @var \Workerman\Worker $worker */
         $worker            = current(static::$_workers);
         $worker_status_str = posix_getpid() . "\t" . str_pad(round(memory_get_usage(true) / (1024 * 1024), 2) . "M", 7)
             . " " . str_pad($worker->getSocketName(), static::$_maxSocketNameLength) . " "
@@ -1538,9 +1751,9 @@ class Worker
         $current_worker = current(static::$_workers);
         $default_worker_name = $current_worker->name;
 
-        /** @var Worker $worker */
+        /** @var \Workerman\Worker $worker */
         foreach(TcpConnection::$connections as $connection) {
-            /** @var Connection\TcpConnection $connection */
+            /** @var \Workerman\Connection\TcpConnection $connection */
             $transport      = $connection->transport;
             $ipv4           = $connection->isIpV4() ? ' 1' : ' 0';
             $ipv6           = $connection->isIpV6() ? ' 1' : ' 0';
@@ -1582,7 +1795,7 @@ class Worker
     public static function checkErrors()
     {
         if (static::STATUS_SHUTDOWN != static::$_status) {
-            $error_msg = 'Worker['. posix_getpid() .'] process terminated';
+            $error_msg = static::$_OS === 'linx' ? 'Worker['. posix_getpid() .'] process terminated' : 'Worker process terminated';
             $errors    = error_get_last();
             if ($errors && ($errors['type'] === E_ERROR ||
                     $errors['type'] === E_PARSE ||
@@ -1651,7 +1864,8 @@ class Worker
         if (!static::$daemonize) {
             static::safeEcho($msg);
         }
-        file_put_contents((string)static::$logFile, date('Y-m-d H:i:s') . ' ' . 'pid:'. posix_getpid() . ' ' . $msg, FILE_APPEND | LOCK_EX);
+        file_put_contents((string)static::$logFile, date('Y-m-d H:i:s') . ' ' . 'pid:'
+            . (static::$_OS === 'linux' ? posix_getpid() : 1) . ' ' . $msg, FILE_APPEND | LOCK_EX);
     }
 
     /**
@@ -1973,6 +2187,7 @@ class Worker
         $connection->protocol = $this->protocol;
         if ($this->onMessage) {
             if ($this->protocol !== null) {
+                /** @var \Workerman\Protocols\ProtocolInterface $parser */
                 $parser      = $this->protocol;
                 $recv_buffer = $parser::decode($recv_buffer, $connection);
                 // Discard bad packets.