Forráskód Böngészése

Merge pull request #1 from walkor/master

update
Ares 8 éve
szülő
commit
041ea7b3da
2 módosított fájl, 88 hozzáadás és 57 törlés
  1. 3 0
      Connection/AsyncTcpConnection.php
  2. 85 57
      Worker.php

+ 3 - 0
Connection/AsyncTcpConnection.php

@@ -137,6 +137,9 @@ class AsyncTcpConnection extends TcpConnection
         }
 
         $this->id = $this->_id = self::$_idRecorder++;
+        if(PHP_INT_MAX === self::$_idRecorder){
+            self::$_idRecorder = 0;
+        }
         // Check application layer protocol class.
         if (!isset(self::$_builtinTransports[$scheme])) {
             $scheme         = ucfirst($scheme);

+ 85 - 57
Worker.php

@@ -223,6 +223,13 @@ class Worker
      * @var string
      */
     protected $_autoloadRootPath = '';
+    
+    /**
+     * Pause listening or not.
+     *
+     * @var string
+     */
+    protected $_pauseListen = false;
 
     /**
      * Daemonize.
@@ -457,9 +464,12 @@ class Worker
         $backtrace        = debug_backtrace();
         self::$_startFile = $backtrace[count($backtrace) - 1]['file'];
 
+
+        $unique_prefix = str_replace('/', '_', self::$_startFile);
+
         // Pid file.
         if (empty(self::$pidFile)) {
-            self::$pidFile = __DIR__ . "/../" . str_replace('/', '_', self::$_startFile) . ".pid";
+            self::$pidFile = __DIR__ . "/../$unique_prefix.pid";
         }
 
         // Log file.
@@ -477,7 +487,7 @@ class Worker
 
         // For statistics.
         self::$_globalStatistics['start_timestamp'] = time();
-        self::$_statisticsFile                      = sys_get_temp_dir() . '/workerman.status';
+        self::$_statisticsFile                      = sys_get_temp_dir() . "/$unique_prefix.status";
 
         // Process title.
         self::setProcessTitle('WorkerMan: master process  start_file=' . self::$_startFile);
@@ -608,7 +618,7 @@ class Worker
             $start_file = $argv[0];
             self::safeEcho("Input \"php $start_file stop\" to quit. Start success.\n\n");
         } else {
-            self::safeEcho("Press Ctrl-C to quit. Start success.\n");
+            self::safeEcho("Press Ctrl+C to quit. Start success.\n");
         }
     }
 
@@ -682,9 +692,15 @@ class Worker
                     // Sleep 1 second.
                     sleep(1);
                     // Clear terminal.
-                    echo chr(27).chr(91).chr(72).chr(27).chr(91).chr(50).chr(74);
+                    if ($command2 === '-d') {
+                        echo "\33[H\33[2J\33(B\33[m";
+                    }
                     // Echo status data.
                     echo self::formatStatusData();
+                    if ($command2 !== '-d') {
+                        exit(0);
+                    }
+                    echo "\nPress Ctrl+C to quit.\n\n";
                 }
                 exit(0);
             case 'connections':
@@ -1602,86 +1618,101 @@ class Worker
     }
 
     /**
-     * Listen port.
+     * Listen.
      *
      * @throws Exception
      */
     public function listen()
     {
-        if (!$this->_socketName || $this->_mainSocket) {
+        if (!$this->_socketName) {
             return;
         }
 
         // Autoload.
         Autoloader::setRootPath($this->_autoloadRootPath);
 
-        // Get the application layer communication protocol and listening address.
-        list($scheme, $address) = explode(':', $this->_socketName, 2);
-        // Check application layer protocol class.
-        if (!isset(self::$_builtinTransports[$scheme])) {
-            $scheme         = ucfirst($scheme);
-            $this->protocol = '\\Protocols\\' . $scheme;
-            if (!class_exists($this->protocol)) {
-                $this->protocol = "\\Workerman\\Protocols\\$scheme";
+        if (!$this->_mainSocket) {
+            // Get the application layer communication protocol and listening address.
+            list($scheme, $address) = explode(':', $this->_socketName, 2);
+            // Check application layer protocol class.
+            if (!isset(self::$_builtinTransports[$scheme])) {
+                $scheme         = ucfirst($scheme);
+                $this->protocol = '\\Protocols\\' . $scheme;
                 if (!class_exists($this->protocol)) {
-                    throw new Exception("class \\Protocols\\$scheme not exist");
+                    $this->protocol = "\\Workerman\\Protocols\\$scheme";
+                    if (!class_exists($this->protocol)) {
+                        throw new Exception("class \\Protocols\\$scheme not exist");
+                    }
                 }
-            }
 
-            if (!isset(self::$_builtinTransports[$this->transport])) {
-                throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
+                if (!isset(self::$_builtinTransports[$this->transport])) {
+                    throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
+                }
+            } else {
+                $this->transport = $scheme;
             }
-        } else {
-            $this->transport = $scheme;
-        }
 
-        $local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;
+            $local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;
 
-        // Flag.
-        $flags  = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
-        $errno  = 0;
-        $errmsg = '';
-        // SO_REUSEPORT.
-        if ($this->reusePort) {
-            stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
-        }
+            // Flag.
+            $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
+            $errno = 0;
+            $errmsg = '';
+            // SO_REUSEPORT.
+            if ($this->reusePort) {
+                stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
+            }
 
-        // Create an Internet or Unix domain server socket.
-        $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
-        if (!$this->_mainSocket) {
-            throw new Exception($errmsg);
-        }
+            // Create an Internet or Unix domain server socket.
+            $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
+            if (!$this->_mainSocket) {
+                throw new Exception($errmsg);
+            }
 
-        if ($this->transport === 'ssl') {
-            stream_socket_enable_crypto($this->_mainSocket, false);
-        } elseif ($this->transport === 'unix') {
-            $socketFile = substr($address, 2);
-            if ($this->user) {
-                chown($socketFile, $this->user);
+            if ($this->transport === 'ssl') {
+                stream_socket_enable_crypto($this->_mainSocket, false);
+            } elseif ($this->transport === 'unix') {
+                $socketFile = substr($address, 2);
+                if ($this->user) {
+                    chown($socketFile, $this->user);
+                }
+                if ($this->group) {
+                    chgrp($socketFile, $this->group);
+                }
             }
-            if ($this->group) {
-                chgrp($socketFile, $this->group);
+
+            // Try to open keepalive for tcp and disable Nagle algorithm.
+            if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
+                $socket = socket_import_stream($this->_mainSocket);
+                @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
+                @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
             }
-        }
 
-        // Try to open keepalive for tcp and disable Nagle algorithm.
-        if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
-            $socket = socket_import_stream($this->_mainSocket);
-            @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
-            @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
+            // Non blocking.
+            stream_set_blocking($this->_mainSocket, 0);
         }
 
-        // Non blocking.
-        stream_set_blocking($this->_mainSocket, 0);
-
         // Register a listener to be notified when server socket is ready to read.
-        if (self::$globalEvent) {
+        if (self::$globalEvent || $this->_pauseListen) {
             if ($this->transport !== 'udp') {
                 self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ, array($this, 'acceptConnection'));
             } else {
                 self::$globalEvent->add($this->_mainSocket, EventInterface::EV_READ,
                     array($this, 'acceptUdpConnection'));
             }
+            $this->_pauseListen = false;
+        }
+    }
+
+    /**
+     * Unlisten.
+     *
+     * @return void
+     */
+    public function unlisten() {
+        if (self::$globalEvent && $this->_mainSocket) {
+            self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
+            $this->_pauseListen = true;
         }
     }
 
@@ -1774,10 +1805,7 @@ class Worker
             }
         }
         // Remove listener for server socket.
-        if ($this->_mainSocket) {
-            self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
-            @fclose($this->_mainSocket);
-        }
+        $this->unlisten();
     }
 
     /**