Ver código fonte

Merge pull request #1 from walkor/master

Sync with master
Vitaly 8 anos atrás
pai
commit
b61b82f958
5 arquivos alterados com 272 adições e 166 exclusões
  1. 13 2
      Connection/AsyncTcpConnection.php
  2. 3 3
      Connection/ConnectionInterface.php
  3. 15 0
      Connection/TcpConnection.php
  4. 18 73
      README.md
  5. 223 88
      Worker.php

+ 13 - 2
Connection/AsyncTcpConnection.php

@@ -52,6 +52,13 @@ class AsyncTcpConnection extends TcpConnection
     protected $_remoteHost = '';
 
     /**
+     * Remote port.
+     *
+     * @var int
+     */
+    protected $_remotePort = 80;
+
+    /**
      * Connect start time.
      *
      * @var string
@@ -124,11 +131,15 @@ class AsyncTcpConnection extends TcpConnection
             }
             $this->_remoteAddress = "{$address_info['host']}:{$address_info['port']}";
             $this->_remoteHost    = $address_info['host'];
+            $this->_remotePort    = $address_info['port'];
             $this->_remoteURI     = "{$address_info['path']}{$address_info['query']}";
             $scheme               = isset($address_info['scheme']) ? $address_info['scheme'] : 'tcp';
         }
 
         $this->id = $this->_id = self::$_idRecorder++;
+        if(PHP_INT_MAX === self::$_idRecorder){
+            self::$_idRecorder = 0;
+        }
         // Check application layer protocol class.
         if (!isset(self::$_builtinTransports[$scheme])) {
             $scheme         = ucfirst($scheme);
@@ -166,10 +177,10 @@ class AsyncTcpConnection extends TcpConnection
         // Open socket connection asynchronously.
         if ($this->_contextOption) {
             $context = stream_context_create($this->_contextOption);
-            $this->_socket = stream_socket_client("{$this->transport}://{$this->_remoteAddress}", $errno, $errstr, 0,
+            $this->_socket = stream_socket_client("{$this->transport}://{$this->_remoteHost}:{$this->_remotePort}", $errno, $errstr, 0,
                 STREAM_CLIENT_ASYNC_CONNECT, $context);
         } else {
-            $this->_socket = stream_socket_client("{$this->transport}://{$this->_remoteAddress}", $errno, $errstr, 0,
+            $this->_socket = stream_socket_client("{$this->transport}://{$this->_remoteHost}:{$this->_remotePort}", $errno, $errstr, 0,
                 STREAM_CLIENT_ASYNC_CONNECT);
         }
         // If failed attempt to emit onError callback.

+ 3 - 3
Connection/ConnectionInterface.php

@@ -81,21 +81,21 @@ abstract class  ConnectionInterface
     abstract public function getRemoteAddress();
 
     /**
-     * Get remote IP.
+     * Get local IP.
      *
      * @return string
      */
     abstract public function getLocalIp();
 
     /**
-     * Get remote port.
+     * Get local port.
      *
      * @return int
      */
     abstract public function getLocalPort();
 
     /**
-     * Get remote address.
+     * Get local address.
      *
      * @return string
      */

+ 15 - 0
Connection/TcpConnection.php

@@ -874,6 +874,21 @@ class TcpConnection extends ConnectionInterface
      */
     public function __destruct()
     {
+        static $mod;
         self::$statistics['connection_count']--;
+        if (Worker::getGracefulStop()) {
+            if (!isset($mod)) {
+                $mod = ceil((self::$statistics['connection_count'] + 1) / 3);
+            }
+
+            if (0 === self::$statistics['connection_count'] % $mod) {
+                Worker::log('worker[' . posix_getpid() . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
+            }
+
+            if(0 === self::$statistics['connection_count']) {
+                Worker::$globalEvent->destroy();
+                exit(0);
+            }
+        }
     }
 }

+ 18 - 73
README.md

@@ -1,5 +1,10 @@
 # Workerman
 [![Gitter](https://badges.gitter.im/walkor/Workerman.svg)](https://gitter.im/walkor/Workerman?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=body_badge)
+[![Latest Stable Version](https://poser.pugx.org/workerman/workerman/v/stable)](https://packagist.org/packages/workerman/workerman)
+[![Total Downloads](https://poser.pugx.org/workerman/workerman/downloads)](https://packagist.org/packages/workerman/workerman)
+[![Monthly Downloads](https://poser.pugx.org/workerman/workerman/d/monthly)](https://packagist.org/packages/workerman/workerman)
+[![Daily Downloads](https://poser.pugx.org/workerman/workerman/d/daily)](https://packagist.org/packages/workerman/workerman)
+[![License](https://poser.pugx.org/workerman/workerman/license)](https://packagist.org/packages/workerman/workerman)
 
 ## What is it
 Workerman is an asynchronous event driven PHP framework with high performance for easily building fast, scalable network applications. Supports HTTP, Websocket, SSL and other custom protocols. Supports libevent, [HHVM](https://github.com/facebook/hhvm) , [ReactPHP](https://github.com/reactphp/react).
@@ -128,7 +133,7 @@ $tcp_worker->onClose = function($connection)
 Worker::runAll();
 ```
 
-### Enable SSL.
+### Enable SSL
 ```php
 <?php
 require_once __DIR__ . '/vendor/autoload.php';
@@ -137,8 +142,9 @@ use Workerman\Worker;
 // SSL context.
 $context = array(
     'ssl' => array(
-        'local_cert' => '/your/path/of/server.pem',
-        'local_pk'   => '/your/path/of/server.key',
+        'local_cert'  => '/your/path/of/server.pem',
+        'local_pk'    => '/your/path/of/server.key',
+        'verify_peer' => false,
     )
 );
 
@@ -494,8 +500,8 @@ Worker::runAll();
 ```php start.php start -d  ```  
 ![workerman start](http://www.workerman.net/img/workerman-start.png)  
 ```php start.php status  ```  
-![workerman satus](http://www.workerman.net/img/workerman-status.png?a=123)
-```php start.php connections
+![workerman satus](http://www.workerman.net/img/workerman-status.png?a=123)  
+```php start.php connections```  
 ```php start.php stop  ```  
 ```php start.php restart  ```  
 ```php start.php reload  ```  
@@ -504,7 +510,7 @@ Worker::runAll();
 
 中文主页:[http://www.workerman.net](http://www.workerman.net)
 
-中文文档: [http://doc3.workerman.net](http://doc3.workerman.net)
+中文文档: [http://doc.workerman.net](http://doc.workerman.net)
 
 Documentation:[https://github.com/walkor/workerman-manual](https://github.com/walkor/workerman-manual/blob/master/english/src/SUMMARY.md)
 
@@ -593,73 +599,12 @@ Percentage of the requests served within a certain time (ms)
 
 ## Other links with workerman
 
-## [PHPSocket.IO](https://github.com/walkor/phpsocket.io)  
-[Live demo](http://www.workerman.net/demos/phpsocketio-chat/)  
-[Source code](https://github.com/walkor/phpsocket.io)  
-![phpsocket.io](http://www.workerman.net/img/socket.io.png)  
-
-## [tadpole](http://kedou.workerman.net/)  
-[Live demo](http://kedou.workerman.net/)  
-[Source code](https://github.com/walkor/workerman)  
-![workerman todpole](http://www.workerman.net/img/workerman-todpole.png)  
-
-## [BrowserQuest](http://www.workerman.net/demos/browserquest/)   
-[Live demo](http://www.workerman.net/demos/browserquest/)  
-[Source code](https://github.com/walkor/BrowserQuest-PHP)  
-![BrowserQuest width workerman](http://www.workerman.net/img/browserquest.jpg) 
-
-## [web vmstat](http://www.workerman.net/demos/vmstat/)   
-[Live demo](http://www.workerman.net/demos/vmstat/)  
-[Source code](https://github.com/walkor/workerman-vmstat)  
-![web vmstat](http://www.workerman.net/img/workerman-vmstat.png)   
-
-## [live-ascii-camera](https://github.com/walkor/live-ascii-camera)   
-[Live demo camera page](http://www.workerman.net/demos/live-ascii-camera/camera.html)  
-[Live demo receive page](http://www.workerman.net/demos/live-ascii-camera/)  
-[Source code](https://github.com/walkor/live-ascii-camera)  
-![live-ascii-camera](http://www.workerman.net/img/live-ascii-camera.png)   
-
-## [live-camera](https://github.com/walkor/live-camera)   
-[Live demo camera page](http://www.workerman.net/demos/live-camera/camera.html)  
-[Live demo receive page](http://www.workerman.net/demos/live-camera/)  
-[Source code](https://github.com/walkor/live-camera)  
-![live-camera](http://www.workerman.net/img/live-camera.jpg)  
-
-## [chat room](http://chat.workerman.net/)  
-[Live demo](http://chat.workerman.net/)  
-[Source code](https://github.com/walkor/workerman-chat)  
-![workerman-chat](http://www.workerman.net/img/workerman-chat.png)  
-
-## [statistics](http://www.workerman.net:55757/)  
-[Live demo](http://www.workerman.net:55757/)  
-[Source code](https://github.com/walkor/workerman-statistics)  
-![workerman-statistics](http://www.workerman.net/img/workerman-statistics.png)  
-
-## [flappybird](http://workerman.net/demos/flappy-bird/)  
-[Live demo](http://workerman.net/demos/flappy-bird/)  
-[Source code](https://github.com/walkor/workerman-flappy-bird)  
-![workerman-statistics](http://www.workerman.net/img/workerman-flappy-bird.png)  
-
-## [jsonRpc](https://github.com/walkor/workerman-JsonRpc)  
-[Source code](https://github.com/walkor/workerman-JsonRpc)  
-![workerman-jsonRpc](http://www.workerman.net/img/workerman-json-rpc.png)  
-
-## [thriftRpc](https://github.com/walkor/workerman-thrift)  
-[Source code](https://github.com/walkor/workerman-thrift)  
-![workerman-thriftRpc](http://www.workerman.net/img/workerman-thrift.png)  
-
-## [web-msg-sender](https://github.com/walkor/web-msg-sender)  
-[Live demo send page](http://workerman.net:3333/)  
-[Live demo receive page](http://workerman.net/web-msg-sender.html)  
-[Source code](https://github.com/walkor/web-msg-sender)  
-![web-msg-sender](http://www.workerman.net/img/web-msg-sender.png)  
-
-## [shadowsocks-php](https://github.com/walkor/shadowsocks-php)
-[Source code](https://github.com/walkor/shadowsocks-php)  
-![shadowsocks-php](http://www.workerman.net/img/shadowsocks-php.png)  
-
-## [queue](https://github.com/walkor/workerman-queue)
-[Source code](https://github.com/walkor/workerman-queue)  
+[PHPSocket.IO](https://github.com/walkor/phpsocket.io)   
+[php-socks5](https://github.com/walkor/php-socks5)  
+[php-http-proxy](https://github.com/walkor/php-http-proxy)  
+
+## Donate
+<a href="https://www.paypal.com/cgi-bin/webscr?cmd=_s-xclick&hosted_button_id=UQGGS9UB35WWG"><img src="http://donate.workerman.net/img/donate.png"></a>
 
 ## LICENSE
 

+ 223 - 88
Worker.php

@@ -225,6 +225,13 @@ class Worker
     protected $_autoloadRootPath = '';
 
     /**
+     * Pause accept new connections or not.
+     *
+     * @var string
+     */
+    protected $_pauseAccept = true;
+
+    /**
      * Daemonize.
      *
      * @var bool
@@ -414,6 +421,13 @@ class Worker
     );
 
     /**
+     * Graceful stop or not.
+     *
+     * @var string
+     */
+    protected static $_gracefulStop = false;
+
+    /**
      * Run all worker instances.
      *
      * @return void
@@ -457,9 +471,12 @@ class Worker
         $backtrace        = debug_backtrace();
         self::$_startFile = $backtrace[count($backtrace) - 1]['file'];
 
+
+        $unique_prefix = str_replace('/', '_', self::$_startFile);
+
         // Pid file.
         if (empty(self::$pidFile)) {
-            self::$pidFile = __DIR__ . "/../" . str_replace('/', '_', self::$_startFile) . ".pid";
+            self::$pidFile = __DIR__ . "/../$unique_prefix.pid";
         }
 
         // Log file.
@@ -477,7 +494,7 @@ class Worker
 
         // For statistics.
         self::$_globalStatistics['start_timestamp'] = time();
-        self::$_statisticsFile                      = sys_get_temp_dir() . '/workerman.status';
+        self::$_statisticsFile                      = sys_get_temp_dir() . "/$unique_prefix.status";
 
         // Process title.
         self::setProcessTitle('WorkerMan: master process  start_file=' . self::$_startFile);
@@ -593,14 +610,14 @@ class Worker
         self::safeEcho('Workerman version:'. Worker::VERSION. "          PHP version:". PHP_VERSION. "\n");
         self::safeEcho("------------------------\033[47;30m WORKERS \033[0m-------------------------------\n");
         self::safeEcho("\033[47;30muser\033[0m". str_pad('',
-            self::$_maxUserNameLength + 2 - strlen('user')). "\033[47;30mworker\033[0m". str_pad('',
-            self::$_maxWorkerNameLength + 2 - strlen('worker')). "\033[47;30mlisten\033[0m". str_pad('',
-            self::$_maxSocketNameLength + 2 - strlen('listen')). "\033[47;30mprocesses\033[0m \033[47;30m". "status\033[0m\n");
+                self::$_maxUserNameLength + 2 - strlen('user')). "\033[47;30mworker\033[0m". str_pad('',
+                self::$_maxWorkerNameLength + 2 - strlen('worker')). "\033[47;30mlisten\033[0m". str_pad('',
+                self::$_maxSocketNameLength + 2 - strlen('listen')). "\033[47;30mprocesses\033[0m \033[47;30m". "status\033[0m\n");
 
         foreach (self::$_workers as $worker) {
             self::safeEcho(str_pad($worker->user, self::$_maxUserNameLength + 2). str_pad($worker->name,
-                self::$_maxWorkerNameLength + 2). str_pad($worker->getSocketName(),
-                self::$_maxSocketNameLength + 2). str_pad(' ' . $worker->count, 9). " \033[32;40m [OK] \033[0m\n");
+                    self::$_maxWorkerNameLength + 2). str_pad($worker->getSocketName(),
+                    self::$_maxSocketNameLength + 2). str_pad(' ' . $worker->count, 9). " \033[32;40m [OK] \033[0m\n");
         }
         self::safeEcho("----------------------------------------------------------------\n");
         if (self::$daemonize) {
@@ -608,7 +625,7 @@ class Worker
             $start_file = $argv[0];
             self::safeEcho("Input \"php $start_file stop\" to quit. Start success.\n\n");
         } else {
-            self::safeEcho("Press Ctrl-C to quit. Start success.\n");
+            self::safeEcho("Press Ctrl+C to quit. Start success.\n");
         }
     }
 
@@ -682,9 +699,15 @@ class Worker
                     // Sleep 1 second.
                     sleep(1);
                     // Clear terminal.
-                    echo chr(27).chr(91).chr(72).chr(27).chr(91).chr(50).chr(74);
+                    if ($command2 === '-d') {
+                        echo "\33[H\33[2J\33(B\33[m";
+                    }
                     // Echo status data.
                     echo self::formatStatusData();
+                    if ($command2 !== '-d') {
+                        exit(0);
+                    }
+                    echo "\nPress Ctrl+C to quit.\n\n";
                 }
                 exit(0);
             case 'connections':
@@ -700,9 +723,17 @@ class Worker
                 exit(0);
             case 'restart':
             case 'stop':
-                self::log("Workerman[$start_file] is stoping ...");
+                if ($command2 === '-g') {
+                    self::$_gracefulStop = true;
+                    $sig = SIGTERM;
+                    self::log("Workerman[$start_file] is gracefully stoping ...");
+                } else {
+                    self::$_gracefulStop = false;
+                    $sig = SIGINT;
+                    self::log("Workerman[$start_file] is stoping ...");
+                }
                 // Send stop signal to master process.
-                $master_pid && posix_kill($master_pid, SIGINT);
+                $master_pid && posix_kill($master_pid, $sig);
                 // Timeout.
                 $timeout    = 5;
                 $start_time = time();
@@ -711,7 +742,7 @@ class Worker
                     $master_is_alive = $master_pid && posix_kill($master_pid, 0);
                     if ($master_is_alive) {
                         // Timeout?
-                        if (time() - $start_time >= $timeout) {
+                        if (!self::$_gracefulStop && time() - $start_time >= $timeout) {
                             self::log("Workerman[$start_file] stop fail");
                             exit;
                         }
@@ -731,8 +762,12 @@ class Worker
                 }
                 break;
             case 'reload':
-                posix_kill($master_pid, SIGUSR1);
-                self::log("Workerman[$start_file] reload");
+                if($command2 === '-g'){
+                    $sig = SIGQUIT;
+                }else{
+                    $sig = SIGUSR1;
+                }
+                posix_kill($master_pid, $sig);
                 exit;
             default :
                 exit($usage);
@@ -805,8 +840,12 @@ class Worker
     {
         // stop
         pcntl_signal(SIGINT, array('\Workerman\Worker', 'signalHandler'), false);
+        // graceful stop
+        pcntl_signal(SIGTERM, array('\Workerman\Worker', 'signalHandler'), false);
         // reload
         pcntl_signal(SIGUSR1, array('\Workerman\Worker', 'signalHandler'), false);
+        // graceful reload
+        pcntl_signal(SIGQUIT, array('\Workerman\Worker', 'signalHandler'), false);
         // status
         pcntl_signal(SIGUSR2, array('\Workerman\Worker', 'signalHandler'), false);
         // connection status
@@ -824,14 +863,22 @@ class Worker
     {
         // uninstall stop signal handler
         pcntl_signal(SIGINT, SIG_IGN, false);
+        // uninstall graceful stop signal handler
+        pcntl_signal(SIGTERM, SIG_IGN, false);
         // uninstall reload signal handler
         pcntl_signal(SIGUSR1, SIG_IGN, false);
-        // uninstall  status signal handler
+        // uninstall graceful reload signal handler
+        pcntl_signal(SIGQUIT, SIG_IGN, false);
+        // uninstall status signal handler
         pcntl_signal(SIGUSR2, SIG_IGN, false);
         // reinstall stop signal handler
         self::$globalEvent->add(SIGINT, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
-        // reinstall  reload signal handler
+        // reinstall graceful stop signal handler
+        self::$globalEvent->add(SIGTERM, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
+        // reinstall reload signal handler
         self::$globalEvent->add(SIGUSR1, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
+        // reinstall graceful reload signal handler
+        self::$globalEvent->add(SIGQUIT, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
         // reinstall  status signal handler
         self::$globalEvent->add(SIGUSR2, EventInterface::EV_SIGNAL, array('\Workerman\Worker', 'signalHandler'));
         // reinstall connection status signal handler
@@ -848,10 +895,22 @@ class Worker
         switch ($signal) {
             // Stop.
             case SIGINT:
+                self::$_gracefulStop = false;
+                self::stopAll();
+                break;
+            // Graceful stop.
+            case SIGTERM:
+                self::$_gracefulStop = true;
                 self::stopAll();
                 break;
             // Reload.
+            case SIGQUIT:
             case SIGUSR1:
+                if($signal === SIGQUIT){
+                    self::$_gracefulStop = true;
+                }else{
+                    self::$_gracefulStop = false;
+                }
                 self::$_pidsToRestart = self::getAllWorkerPids();
                 self::reload();
                 break;
@@ -1234,6 +1293,12 @@ class Worker
                 }
             }
 
+            if (self::$_gracefulStop) {
+                $sig = SIGQUIT;
+            } else {
+                $sig = SIGUSR1;
+            }
+
             // Send reload signal to all child processes.
             $reloadable_pid_array = array();
             foreach (self::$_pidMap as $worker_id => $worker_pid_array) {
@@ -1245,7 +1310,7 @@ class Worker
                 } else {
                     foreach ($worker_pid_array as $pid) {
                         // Send reload signal to a worker process which reloadable is false.
-                        posix_kill($pid, SIGUSR1);
+                        posix_kill($pid, $sig);
                     }
                 }
             }
@@ -1263,9 +1328,11 @@ class Worker
             // Continue reload.
             $one_worker_pid = current(self::$_pidsToRestart);
             // Send reload signal to a worker process.
-            posix_kill($one_worker_pid, SIGUSR1);
+            posix_kill($one_worker_pid, $sig);
             // If the process does not exit after self::KILL_WORKER_TIMER_TIME seconds try to kill it.
-            Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($one_worker_pid, SIGKILL), false);
+            if(!self::$_gracefulStop){
+                Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($one_worker_pid, SIGKILL), false);
+            }
         } // For child processes.
         else {
             reset(self::$_workers);
@@ -1299,12 +1366,19 @@ class Worker
         self::$_status = self::STATUS_SHUTDOWN;
         // For master process.
         if (self::$_masterPid === posix_getpid()) {
-            self::log("Workerman[" . basename(self::$_startFile) . "] Stopping ...");
+            self::log("Workerman[" . basename(self::$_startFile) . "] stopping ...");
             $worker_pid_array = self::getAllWorkerPids();
             // Send stop signal to all child processes.
+            if (self::$_gracefulStop) {
+                $sig = SIGTERM;
+            } else {
+                $sig = SIGINT;
+            }
             foreach ($worker_pid_array as $worker_pid) {
-                posix_kill($worker_pid, SIGINT);
-                Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($worker_pid, SIGKILL), false);
+                posix_kill($worker_pid, $sig);
+                if(!self::$_gracefulStop){
+                    Timer::add(self::KILL_WORKER_TIMER_TIME, 'posix_kill', array($worker_pid, SIGKILL), false);
+                }
             }
             // Remove statistics file.
             if (is_file(self::$_statisticsFile)) {
@@ -1316,12 +1390,34 @@ class Worker
             foreach (self::$_workers as $worker) {
                 $worker->stop();
             }
-            self::$globalEvent->destroy();
-            exit(0);
+            if (!self::$_gracefulStop || ConnectionInterface::$statistics['connection_count'] <= 0) {
+                self::$globalEvent->destroy();
+                exit(0);
+            }
         }
     }
 
     /**
+     * Get process status.
+     *
+     * @return number
+     */
+    public static function getStatus()
+    {
+        return self::$_status;
+    }
+
+    /**
+     * If stop gracefully.
+     *
+     * @return boolean
+     */
+    public static function getGracefulStop()
+    {
+        return self::$_gracefulStop;
+    }
+
+    /**
      * Write statistics data to disk.
      *
      * @return void
@@ -1595,93 +1691,129 @@ class Worker
             }
             $this->_context = stream_context_create($context_option);
         }
-
-        // Set an empty onMessage callback.
-        $this->onMessage = function () {
-        };
     }
 
+
     /**
-     * Listen port.
+     * Listen.
      *
      * @throws Exception
      */
     public function listen()
     {
-        if (!$this->_socketName || $this->_mainSocket) {
+        if (!$this->_socketName) {
             return;
         }
 
         // Autoload.
         Autoloader::setRootPath($this->_autoloadRootPath);
 
-        // Get the application layer communication protocol and listening address.
-        list($scheme, $address) = explode(':', $this->_socketName, 2);
-        // Check application layer protocol class.
-        if (!isset(self::$_builtinTransports[$scheme])) {
-            $scheme         = ucfirst($scheme);
-            $this->protocol = '\\Protocols\\' . $scheme;
-            if (!class_exists($this->protocol)) {
-                $this->protocol = "\\Workerman\\Protocols\\$scheme";
+        if (!$this->_mainSocket) {
+            // Get the application layer communication protocol and listening address.
+            list($scheme, $address) = explode(':', $this->_socketName, 2);
+            // Check application layer protocol class.
+            if (!isset(self::$_builtinTransports[$scheme])) {
+                $scheme         = ucfirst($scheme);
+                $this->protocol = '\\Protocols\\' . $scheme;
                 if (!class_exists($this->protocol)) {
-                    throw new Exception("class \\Protocols\\$scheme not exist");
+                    $this->protocol = "\\Workerman\\Protocols\\$scheme";
+                    if (!class_exists($this->protocol)) {
+                        throw new Exception("class \\Protocols\\$scheme not exist");
+                    }
                 }
-            }
 
-            if (!isset(self::$_builtinTransports[$this->transport])) {
-                throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
+                if (!isset(self::$_builtinTransports[$this->transport])) {
+                    throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
+                }
+            } else {
+                $this->transport = $scheme;
             }
-        } else {
-            $this->transport = $scheme;
-        }
 
-        $local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;
+            $local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;
 
-        // Flag.
-        $flags  = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
-        $errno  = 0;
-        $errmsg = '';
-        // SO_REUSEPORT.
-        if ($this->reusePort) {
-            stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
-        }
+            // Flag.
+            $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
+            $errno = 0;
+            $errmsg = '';
+            // SO_REUSEPORT.
+            if ($this->reusePort) {
+                stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
+            }
 
-        // Create an Internet or Unix domain server socket.
-        $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
-        if (!$this->_mainSocket) {
-            throw new Exception($errmsg);
-        }
+            // Create an Internet or Unix domain server socket.
+            $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
+            if (!$this->_mainSocket) {
+                throw new Exception($errmsg);
+            }
 
-        if ($this->transport === 'ssl') {
-            stream_socket_enable_crypto($this->_mainSocket, false);
-        } elseif ($this->transport === 'unix') {
-            $socketFile = substr($address, 2);
-            if ($this->user) {
-                chown($socketFile, $this->user);
+            if ($this->transport === 'ssl') {
+                stream_socket_enable_crypto($this->_mainSocket, false);
+            } elseif ($this->transport === 'unix') {
+                $socketFile = substr($address, 2);
+                if ($this->user) {
+                    chown($socketFile, $this->user);
+                }
+                if ($this->group) {
+                    chgrp($socketFile, $this->group);
+                }
             }
-            if ($this->group) {
-                chgrp($socketFile, $this->group);
+
+            // Try to open keepalive for tcp and disable Nagle algorithm.
+            if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
+                $socket = socket_import_stream($this->_mainSocket);
+                @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
+                @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
             }
+
+            // Non blocking.
+            stream_set_blocking($this->_mainSocket, 0);
         }
 
-        // Try to open keepalive for tcp and disable Nagle algorithm.
-        if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
-            $socket = socket_import_stream($this->_mainSocket);
-            @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
-            @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
+        $this->resumeAccept();
+    }
+
+    /**
+     * Unlisten.
+     *
+     * @return void
+     */
+    public function unlisten() {
+        $this->pauseAccept();
+        if ($this->_mainSocket) {
+            @fclose($this->_mainSocket);
+            $this->_mainSocket = null;
         }
+    }
 
-        // Non blocking.
-        stream_set_blocking($this->_mainSocket, 0);
+    /**
+     * Pause accept new connections.
+     *
+     * @return void
+     */
+    public function pauseAccept()
+    {
+        if (self::$globalEvent && false === $this->_pauseAccept && $this->_mainSocket) {
+            self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
+            $this->_pauseAccept = true;
+        }
+    }
 
+    /**
+     * Resume accept new connections.
+     *
+     * @return void
+     */
+    public function resumeAccept()
+    {
         // Register a listener to be notified when server socket is ready to read.
-        if (self::$globalEvent) {
+        if (self::$globalEvent && true === $this->_pauseAccept && $this->_mainSocket) {
             if ($this->transport !== 'udp') {
                 self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
             } else {
                 self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
                     array($this, 'acceptUdpConnection'));
             }
+            $this->_pauseAccept = false;
         }
     }
 
@@ -1715,16 +1847,7 @@ class Worker
         if (!self::$globalEvent) {
             $event_loop_class = self::getEventLoopName();
             self::$globalEvent = new $event_loop_class;
-            // Register a listener to be notified when server socket is ready to read.
-            if ($this->_socketName) {
-                if ($this->transport !== 'udp') {
-                    self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
-                        array($this, 'acceptConnection'));
-                } else {
-                    self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
-                        array($this, 'acceptUdpConnection'));
-                }
-            }
+            $this->resumeAccept();
         }
 
         // Reinstall signal.
@@ -1733,6 +1856,11 @@ class Worker
         // Init Timer.
         Timer::init(self::$globalEvent);
 
+        // Set an empty onMessage callback.
+        if (empty($this->onMessage)) {
+            $this->onMessage = function () {};
+        }
+
         // Try to emit onWorkerStart callback.
         if ($this->onWorkerStart) {
             try {
@@ -1774,10 +1902,17 @@ class Worker
             }
         }
         // Remove listener for server socket.
-        if ($this->_mainSocket) {
-            self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
-            @fclose($this->_mainSocket);
+        $this->unlisten();
+        // Close all connections for the worker.
+        if (!self::$_gracefulStop) {
+            foreach ($this->connections as $connection) {
+                $connection->close();
+            }
         }
+        // Clear callback.
+        $this->onMessage = $this->onClose = $this->onError = $this->onBufferDrain = $this->onBufferFull = null;
+        // Remove worker instance from self::$_workers.
+        unset(self::$_workers[$this->workerId]);
     }
 
     /**