Procházet zdrojové kódy

Add checkPortAvailable and move __construct to front #1069

walkor před 1 rokem
rodič
revize
fd270bf8ff
1 změnil soubory, kde provedl 63 přidání a 51 odebrání
  1. 63 51
      src/Worker.php

+ 63 - 51
src/Worker.php

@@ -535,6 +535,29 @@ class Worker
     protected ?string $workerId = null;
 
     /**
+     * Constructor.
+     *
+     * @param string|null $socketName
+     * @param array $socketContext
+     */
+    public function __construct(?string $socketName = null, array $socketContext = [])
+    {
+        // Save all worker instances.
+        $this->workerId = spl_object_hash($this);
+        $this->context = new stdClass();
+        static::$workers[$this->workerId] = $this;
+        static::$pidMap[$this->workerId] = [];
+
+        // Context for socket.
+        if ($socketName) {
+            $this->socketName = $socketName;
+            $socketContext['socket']['backlog'] ??= static::DEFAULT_BACKLOG;
+            $this->socketContext = stream_context_create($socketContext);
+        }
+
+    }
+
+    /**
      * Run all worker instances.
      *
      * @return void
@@ -543,9 +566,10 @@ class Worker
     {
         try {
             static::checkSapiEnv();
-            self::initStdOut();
+            static::initStdOut();
             static::init();
             static::parseCommand();
+            static::checkPortAvailable();
             static::lock();
             static::daemonize();
             static::initWorkers();
@@ -625,7 +649,7 @@ class Worker
      *
      * @return void
      */
-    private static function initStdOut(): void
+    protected static function initStdOut(): void
     {
         $defaultStream = fn () => defined('STDOUT') ? STDOUT : (@fopen('php://stdout', 'w') ?: fopen('php://output', 'w'));
         static::$outputStream ??= $defaultStream(); //@phpstan-ignore-line
@@ -2021,6 +2045,7 @@ class Worker
     }
 
     /**
+     *
      * Write statistics data to disk.
      *
      * @return void
@@ -2276,51 +2301,6 @@ class Worker
     }
 
     /**
-     * Constructor.
-     *
-     * @param string|null $socketName
-     * @param array $socketContext
-     */
-    public function __construct(?string $socketName = null, array $socketContext = [])
-    {
-        // Save all worker instances.
-        $this->workerId = spl_object_hash($this);
-        $this->context = new stdClass();
-        static::$workers[$this->workerId] = $this;
-        static::$pidMap[$this->workerId] = [];
-
-        // Context for socket.
-        if ($socketName) {
-            $this->socketName = $socketName;
-            $socketContext['socket']['backlog'] ??= static::DEFAULT_BACKLOG;
-            $this->socketContext = stream_context_create($socketContext);
-        }
-
-        // Try to turn reusePort on.
-        /*if (\DIRECTORY_SEPARATOR === '/'  // if linux
-            && $socketName
-            && \version_compare(php_uname('r'), '3.9', 'ge') // if kernel >=3.9
-            && \strtolower(\php_uname('s')) !== 'darwin' // if not Mac OS
-            && strpos($socketName,'unix') !== 0 // if not unix socket
-            && strpos($socketName,'udp') !== 0) { // if not udp socket
-
-            $address = \parse_url($socketName);
-            if (isset($address['host']) && isset($address['port'])) {
-                try {
-                    \set_error_handler(static fn (): bool => true);
-                    // If address not in use, turn reusePort on automatically.
-                    $server = stream_socket_server("tcp://{$address['host']}:{$address['port']}");
-                    if ($server) {
-                        $this->reusePort = true;
-                        fclose($server);
-                    }
-                    \restore_error_handler();
-                } catch (\Throwable $e) {}
-            }
-        }*/
-    }
-
-    /**
      * Listen.
      */
     public function listen(): void
@@ -2335,17 +2315,17 @@ class Worker
 
             // Flag.
             $flags = $this->transport === 'udp' ? STREAM_SERVER_BIND : STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
-            $errno = 0;
-            $errmsg = '';
+            $errNo = 0;
+            $errMsg = '';
             // SO_REUSEPORT.
             if ($this->reusePort) {
                 stream_context_set_option($this->socketContext, 'socket', 'so_reuseport', 1);
             }
 
             // Create an Internet or Unix domain server socket.
-            $this->mainSocket = stream_socket_server($localSocket, $errno, $errmsg, $flags, $this->socketContext);
+            $this->mainSocket = stream_socket_server($localSocket, $errNo, $errMsg, $flags, $this->socketContext);
             if (!$this->mainSocket) {
-                throw new RuntimeException($errmsg);
+                throw new RuntimeException($errMsg);
             }
 
             if ($this->transport === 'ssl') {
@@ -2393,6 +2373,38 @@ class Worker
     }
 
     /**
+     * Check port available.
+     *
+     * @return void
+     */
+    protected static function checkPortAvailable(): void
+    {
+        foreach (static::$workers as $worker) {
+            $socketName = $worker->getSocketName();
+            if (DIRECTORY_SEPARATOR === '/'  // if linux
+                && static::$status === static::STATUS_STARTING // only for starting status
+                && $worker->transport === 'tcp' // if tcp socket
+                && !str_starts_with($socketName, 'unix') // if not unix socket
+                && !str_starts_with($socketName, 'udp')) { // if not udp socket
+
+                $address = parse_url($socketName);
+                if (isset($address['host']) && isset($address['port'])) {
+                    $address = "tcp://{$address['host']}:{$address['port']}";
+                    $server = null;
+                    set_error_handler(function ($code, $msg) {
+                        throw new RuntimeException($msg);
+                    });
+                    $server = stream_socket_server($address, $code, $msg);
+                    if ($server) {
+                        fclose($server);
+                    }
+                    restore_error_handler();
+                }
+            }
+        }
+    }
+
+    /**
      * Parse local socket address.
      */
     protected function parseSocketAddress(): ?string