Kaynağa Gözat

Update Worker.php

onMasterReload
fix unix domain socket file missing
walkor 9 yıl önce
ebeveyn
işleme
68fe39c5fc
1 değiştirilmiş dosya ile 43 ekleme ve 17 silme
  1. 43 17
      Worker.php

+ 43 - 17
Worker.php

@@ -190,7 +190,14 @@ class Worker
     public $onWorkerStop = null;
 
     /**
-     * Emitted when worker processes get reload command.
+     * Emitted when the master process get reload signal.
+     *
+     * @var callback
+     */
+    public static $onMasterReload = null;
+
+    /**
+     * Emitted when worker processes get reload signal.
      *
      * @var callback
      */
@@ -525,13 +532,27 @@ class Worker
     }
 
     /**
+     * Get all worker instances.
+     *
+     * @return array
+     */
+    public static function getAllWorkers()
+    {
+        return self::$_workers;
+    }
+
+    /**
      * Init idMap.
      * return void
      */
     protected static function initId()
     {
         foreach (self::$_workers as $worker_id => $worker) {
-            self::$_idMap[$worker_id] = array_fill(0, $worker->count, 0);
+            $new_id_map = array();
+            for($key = 0; $key < $worker->count; $key++) {
+                $new_id_map[$key] = isset(self::$_idMap[$worker_id][$key]) ? self::$_idMap[$worker_id][$key] : 0;
+            }
+            self::$_idMap[$worker_id] = $new_id_map;
         }
     }
 
@@ -856,6 +877,7 @@ class Worker
                 }
             }
 
+            $worker->count = $worker->count <= 0 ? 1 : $worker->count;
             while (count(self::$_pidMap[$worker->workerId]) < $worker->count) {
                 static::forkOneWorker($worker);
             }
@@ -870,9 +892,12 @@ class Worker
      */
     protected static function forkOneWorker($worker)
     {
-        $pid = pcntl_fork();
         // Get available worker id.
         $id = self::getId($worker->workerId, 0);
+        if ($id === false) {
+            return;
+        }
+        $pid = pcntl_fork();
         // For master process.
         if ($pid > 0) {
             self::$_pidMap[$worker->workerId][$pid] = $pid;
@@ -906,11 +931,7 @@ class Worker
      */
     protected static function getId($worker_id, $pid)
     {
-        $id = array_search($pid, self::$_idMap[$worker_id]);
-        if ($id === false) {
-            self::safeEcho("getId fail\n");
-        }
-        return $id;
+        return array_search($pid, self::$_idMap[$worker_id]);
     }
 
     /**
@@ -1062,6 +1083,19 @@ class Worker
             if (self::$_status !== self::STATUS_RELOADING && self::$_status !== self::STATUS_SHUTDOWN) {
                 self::log("Workerman[" . basename(self::$_startFile) . "] reloading");
                 self::$_status = self::STATUS_RELOADING;
+                // Try to emit onMasterReload callback.
+                if (self::$onMasterReload) {
+                    try {
+                        call_user_func(self::$onMasterReload);
+                    } catch (\Exception $e) {
+                        self::log($e);
+                        exit(250);
+                    } catch (\Error $e) {
+                        self::log($e);
+                        exit(250);
+                    }
+                    self::initId();
+                }
             }
 
             // Send reload signal to all child processes.
@@ -1381,15 +1415,7 @@ class Worker
         if ($this->reusePort) {
             stream_context_set_option($this->_context, 'socket', 'so_reuseport', 1);
         }
-        if ($this->transport === 'unix') {
-            umask(0);
-            list(, $address) = explode(':', $this->_socketName, 2);
-            if (!is_file($address)) {
-                register_shutdown_function(function () use ($address) {
-                    @unlink($address);
-                });
-            }
-        }
+
         // Create an Internet or Unix domain server socket.
         $this->_mainSocket = stream_socket_server($local_socket, $errno, $errmsg, $flags, $this->_context);
         if (!$this->_mainSocket) {