Parcourir la source

Merge pull request #1 from walkor/master

Merge walkor/Workerman
codekissyoung il y a 8 ans
Parent
commit
970532a41a
2 fichiers modifiés avec 83 ajouts et 45 suppressions
  1. 8 6
      Connection/TcpConnection.php
  2. 75 39
      Worker.php

+ 8 - 6
Connection/TcpConnection.php

@@ -876,14 +876,16 @@ class TcpConnection extends ConnectionInterface
     {
         static $mod;
         self::$statistics['connection_count']--;
-        if(Worker::getGracefulStop() && Worker::getStatus() === Worker::STATUS_SHUTDOWN){
-            if(!isset($mod)){
-                $mod=round((self::$statistics['connection_count']+1)/3);
+        if (Worker::getGracefulStop()) {
+            if (!isset($mod)) {
+                $mod = ceil((self::$statistics['connection_count'] + 1) / 3);
             }
-            if(0 === self::$statistics['connection_count']%$mod){
-                Worker::log('worker('.posix_getpid().') remains '.self::$statistics['connection_count'].' connection(s)');
+
+            if (0 === self::$statistics['connection_count'] % $mod) {
+                Worker::log('worker[' . posix_getpid() . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
             }
-            if(0 === self::$statistics['connection_count']){
+
+            if(0 === self::$statistics['connection_count']) {
                 Worker::$globalEvent->destroy();
                 exit(0);
             }

+ 75 - 39
Worker.php

@@ -223,13 +223,13 @@ class Worker
      * @var string
      */
     protected $_autoloadRootPath = '';
-    
+
     /**
-     * Pause listening or not.
+     * Pause accept new connections or not.
      *
      * @var string
      */
-    protected $_pauseListen = false;
+    protected $_pauseAccept = false;
 
     /**
      * Daemonize.
@@ -419,7 +419,7 @@ class Worker
         'unix'  => 'unix',
         'ssl'   => 'tcp'
     );
-    
+
     /**
      * Graceful stop or not.
      *
@@ -610,14 +610,14 @@ class Worker
         self::safeEcho('Workerman version:'. Worker::VERSION. "          PHP version:". PHP_VERSION. "\n");
         self::safeEcho("------------------------\033[47;30m WORKERS \033[0m-------------------------------\n");
         self::safeEcho("\033[47;30muser\033[0m". str_pad('',
-            self::$_maxUserNameLength + 2 - strlen('user')). "\033[47;30mworker\033[0m". str_pad('',
-            self::$_maxWorkerNameLength + 2 - strlen('worker')). "\033[47;30mlisten\033[0m". str_pad('',
-            self::$_maxSocketNameLength + 2 - strlen('listen')). "\033[47;30mprocesses\033[0m \033[47;30m". "status\033[0m\n");
+                self::$_maxUserNameLength + 2 - strlen('user')). "\033[47;30mworker\033[0m". str_pad('',
+                self::$_maxWorkerNameLength + 2 - strlen('worker')). "\033[47;30mlisten\033[0m". str_pad('',
+                self::$_maxSocketNameLength + 2 - strlen('listen')). "\033[47;30mprocesses\033[0m \033[47;30m". "status\033[0m\n");
 
         foreach (self::$_workers as $worker) {
             self::safeEcho(str_pad($worker->user, self::$_maxUserNameLength + 2). str_pad($worker->name,
-                self::$_maxWorkerNameLength + 2). str_pad($worker->getSocketName(),
-                self::$_maxSocketNameLength + 2). str_pad(' ' . $worker->count, 9). " \033[32;40m [OK] \033[0m\n");
+                    self::$_maxWorkerNameLength + 2). str_pad($worker->getSocketName(),
+                    self::$_maxSocketNameLength + 2). str_pad(' ' . $worker->count, 9). " \033[32;40m [OK] \033[0m\n");
         }
         self::safeEcho("----------------------------------------------------------------\n");
         if (self::$daemonize) {
@@ -723,14 +723,15 @@ class Worker
                 exit(0);
             case 'restart':
             case 'stop':
-                if($command2 === '-g'){
+                if ($command2 === '-g') {
                     self::$_gracefulStop = true;
                     $sig = SIGTERM;
-                }else{
+                    self::log("Workerman[$start_file] is gracefully stoping ...");
+                } else {
                     self::$_gracefulStop = false;
                     $sig = SIGINT;
+                    self::log("Workerman[$start_file] is stoping ...");
                 }
-                self::log("Workerman[$start_file] is stoping ...");
                 // Send stop signal to master process.
                 $master_pid && posix_kill($master_pid, $sig);
                 // Timeout.
@@ -1291,10 +1292,10 @@ class Worker
                     self::initId();
                 }
             }
-            
-            if(self::$_gracefulStop){
+
+            if (self::$_gracefulStop) {
                 $sig = SIGQUIT;
-            }else{
+            } else {
                 $sig = SIGUSR1;
             }
 
@@ -1365,12 +1366,12 @@ class Worker
         self::$_status = self::STATUS_SHUTDOWN;
         // For master process.
         if (self::$_masterPid === posix_getpid()) {
-            self::log("Workerman[" . basename(self::$_startFile) . "] Stopping ...");
+            self::log("Workerman[" . basename(self::$_startFile) . "] stopping ...");
             $worker_pid_array = self::getAllWorkerPids();
             // Send stop signal to all child processes.
-            if(self::$_gracefulStop){
+            if (self::$_gracefulStop) {
                 $sig = SIGTERM;
-            }else{
+            } else {
                 $sig = SIGINT;
             }
             foreach ($worker_pid_array as $worker_pid) {
@@ -1389,13 +1390,13 @@ class Worker
             foreach (self::$_workers as $worker) {
                 $worker->stop();
             }
-            if(!self::$_gracefulStop) {
+            if (!self::$_gracefulStop || ConnectionInterface::$statistics['connection_count'] <= 0) {
                 self::$globalEvent->destroy();
                 exit(0);
             }
         }
     }
-    
+
     /**
      * Get process status.
      *
@@ -1405,11 +1406,11 @@ class Worker
     {
         return self::$_status;
     }
-    
+
     /**
      * If stop gracefully.
      *
-     * @return number
+     * @return boolean
      */
     public static function getGracefulStop()
     {
@@ -1690,12 +1691,9 @@ class Worker
             }
             $this->_context = stream_context_create($context_option);
         }
-
-        // Set an empty onMessage callback.
-        $this->onMessage = function () {
-        };
     }
 
+
     /**
      * Listen.
      *
@@ -1771,16 +1769,7 @@ class Worker
             stream_set_blocking($this->_mainSocket, 0);
         }
 
-        // Register a listener to be notified when server socket is ready to read.
-        if (self::$globalEvent || $this->_pauseListen) {
-            if ($this->transport !== 'udp') {
-                self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
-            } else {
-                self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
-                    array($this, 'acceptUdpConnection'));
-            }
-            $this->_pauseListen = false;
-        }
+        $this->resumeAccept();
     }
 
     /**
@@ -1788,11 +1777,43 @@ class Worker
      *
      * @return void
      */
-    public function unlisten()
+    public function unlisten() {
+        $this->pauseAccept();
+        if ($this->_mainSocket) {
+            @fclose($this->_mainSocket);
+            $this->_mainSocket = null;
+        }
+    }
+
+    /**
+     * Pause accept new connections.
+     *
+     * @return void
+     */
+    public function pauseAccept()
     {
-        if (self::$globalEvent && $this->_mainSocket) {
+        if (self::$globalEvent && $this->_mainSocket && false === $this->_pauseAccept) {
             self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
-            $this->_pauseListen = true;
+            $this->_pauseAccept = true;
+        }
+    }
+
+    /**
+     * Resume accept new connections.
+     *
+     * @return void
+     */
+    public function resumeAccept()
+    {
+        // Register a listener to be notified when server socket is ready to read.
+        if (self::$globalEvent && $this->_pauseAccept && $this->_mainSocket) {
+            if ($this->transport !== 'udp') {
+                self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
+            } else {
+                self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
+                    array($this, 'acceptUdpConnection'));
+            }
+            $this->_pauseAccept = false;
         }
     }
 
@@ -1844,6 +1865,11 @@ class Worker
         // Init Timer.
         Timer::init(self::$globalEvent);
 
+        // Set an empty onMessage callback.
+        if (empty($this->onMessage)) {
+            $this->onMessage = function () {};
+        }
+
         // Try to emit onWorkerStart callback.
         if ($this->onWorkerStart) {
             try {
@@ -1886,6 +1912,16 @@ class Worker
         }
         // Remove listener for server socket.
         $this->unlisten();
+        // Close all connections for the worker.
+        if (!self::$_gracefulStop) {
+            foreach ($this->connections as $connection) {
+                $connection->close();
+            }
+        }
+        // Clear callback.
+        $this->onMessage = $this->onClose = $this->onError = $this->onBufferDrain = $this->onBufferFull = null;
+        // Remove worker instance from self::$_workers.
+        unset(self::$_workers[$this->workerId]);
     }
 
     /**