Ver código fonte

add unlisten method

walkor 8 anos atrás
pai
commit
d9d6b9e3fe
1 arquivos alterados com 60 adições e 47 exclusões
  1. 60 47
      Worker.php

+ 60 - 47
Worker.php

@@ -1602,7 +1602,7 @@ class Worker
     }
 
     /**
-     * Listen port.
+     * Listen.
      *
      * @throws Exception
      */
@@ -1615,65 +1615,67 @@ class Worker
         // Autoload.
         Autoloader::setRootPath($this->_autoloadRootPath);
 
-        // Get the application layer communication protocol and listening address.
-        list($scheme, $address) = explode(':', $this->_socketName, 2);
-        // Check application layer protocol class.
-        if (!isset(self::$_builtinTransports[$scheme])) {
-            $scheme         = ucfirst($scheme);
-            $this->protocol = '\\Protocols\\' . $scheme;
-            if (!class_exists($this->protocol)) {
-                $this->protocol = "\\Workerman\\Protocols\\$scheme";
+        if (!$this->_mainSocket) {
+            // Get the application layer communication protocol and listening address.
+            list($scheme, $address) = explode(':', $this->_socketName, 2);
+            // Check application layer protocol class.
+            if (!isset(self::$_builtinTransports[$scheme])) {
+                $scheme         = ucfirst($scheme);
+                $this->protocol = '\\Protocols\\' . $scheme;
                 if (!class_exists($this->protocol)) {
-                    throw new Exception("class \\Protocols\\$scheme not exist");
+                    $this->protocol = "\\Workerman\\Protocols\\$scheme";
+                    if (!class_exists($this->protocol)) {
+                        throw new Exception("class \\Protocols\\$scheme not exist");
+                    }
                 }
-            }
 
-            if (!isset(self::$_builtinTransports[$this->transport])) {
-                throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
+                if (!isset(self::$_builtinTransports[$this->transport])) {
+                    throw new \Exception('Bad worker->transport ' . var_export($this->transport, true));
+                }
+            } else {
+                $this->transport = $scheme;
             }
-        } else {
-            $this->transport = $scheme;
-        }
 
-        $local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;
+            $local_socket = self::$_builtinTransports[$this->transport] . ":" . $address;
 
-        // Flag.
-        $flags  = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
-        $errno  = 0;
-        $errmsg = '';
-        // SO_REUSEPORT.
-        if ($this->reusePort) {
-            stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
-        }
+            // Flag.
+            $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
+            $errno = 0;
+            $errmsg = '';
+            // SO_REUSEPORT.
+            if ($this->reusePort) {
+                stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
+            }
 
-        // Create an Internet or Unix domain server socket.
-        $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
-        if (!$this->_mainSocket) {
-            throw new Exception($errmsg);
-        }
+            // Create an Internet or Unix domain server socket.
+            $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
+            if (!$this->_mainSocket) {
+                throw new Exception($errmsg);
+            }
 
-        if ($this->transport === 'ssl') {
-            stream_socket_enable_crypto($this->_mainSocket, false);
-        } elseif ($this->transport === 'unix') {
-            $socketFile = substr($address, 2);
-            if ($this->user) {
-                chown($socketFile, $this->user);
+            if ($this->transport === 'ssl') {
+                stream_socket_enable_crypto($this->_mainSocket, false);
+            } elseif ($this->transport === 'unix') {
+                $socketFile = substr($address, 2);
+                if ($this->user) {
+                    chown($socketFile, $this->user);
+                }
+                if ($this->group) {
+                    chgrp($socketFile, $this->group);
+                }
             }
-            if ($this->group) {
-                chgrp($socketFile, $this->group);
+
+            // Try to open keepalive for tcp and disable Nagle algorithm.
+            if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
+                $socket = socket_import_stream($this->_mainSocket);
+                @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
+                @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
             }
-        }
 
-        // Try to open keepalive for tcp and disable Nagle algorithm.
-        if (function_exists('socket_import_stream') && self::$_builtinTransports[$this->transport] === 'tcp') {
-            $socket = socket_import_stream($this->_mainSocket);
-            @socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
-            @socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
+            // Non blocking.
+            stream_set_blocking($this->_mainSocket, 0);
         }
 
-        // Non blocking.
-        stream_set_blocking($this->_mainSocket, 0);
-
         // Register a listener to be notified when server socket is ready to read.
         if (self::$globalEvent) {
             if ($this->transport !== 'udp') {
@@ -1686,6 +1688,17 @@ class Worker
     }
 
     /**
+     * Unlisten.
+     *
+     * @return void
+     */
+    public function unlisten() {
+        if (self::$globalEvent && $this->_mainSocket) {
+            self::$globalEvent->del($this->_mainSocket, EventInterface::EV_READ);
+        }
+    }
+
+    /**
      * Get socket name.
      *
      * @return string