|
|
@@ -1526,10 +1526,36 @@ class Worker
|
|
|
}
|
|
|
}
|
|
|
Timer::delAll();
|
|
|
+
|
|
|
+ //Update process state.
|
|
|
+ static::$status = static::STATUS_RUNNING;
|
|
|
+
|
|
|
+ // Register shutdown function for checking errors.
|
|
|
+ \register_shutdown_function(["\\Workerman\\Worker", 'checkErrors']);
|
|
|
+
|
|
|
+ // Create a global event loop.
|
|
|
+ if (!static::$globalEvent) {
|
|
|
+ $eventLoopClass = static::getEventLoopName();
|
|
|
+ static::$globalEvent = new $eventLoopClass;
|
|
|
+ static::$globalEvent->setErrorHandler(function ($exception) {
|
|
|
+ static::stopAll(250, $exception);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // Reinstall signal.
|
|
|
+ static::reinstallSignal();
|
|
|
+
|
|
|
+ // Init Timer.
|
|
|
+ Timer::init(static::$globalEvent);
|
|
|
+
|
|
|
+ \restore_error_handler();
|
|
|
+
|
|
|
static::setProcessTitle(self::$processTitle . ': worker process ' . $worker->name . ' ' . $worker->getSocketName());
|
|
|
$worker->setUserAndGroup();
|
|
|
$worker->id = $id;
|
|
|
$worker->run();
|
|
|
+ // Main loop.
|
|
|
+ static::$globalEvent->run();
|
|
|
if (static::$status !== self::STATUS_SHUTDOWN) {
|
|
|
$err = new Exception('event-loop exited');
|
|
|
static::log($err);
|
|
|
@@ -1844,7 +1870,8 @@ class Worker
|
|
|
} // For child processes.
|
|
|
else {
|
|
|
// Execute exit.
|
|
|
- foreach (static::$workers as $worker) {
|
|
|
+ $workers = array_reverse(static::$workers);
|
|
|
+ foreach ($workers as $worker) {
|
|
|
if (!$worker->stopping) {
|
|
|
$worker->stop();
|
|
|
$worker->stopping = true;
|
|
|
@@ -2373,38 +2400,11 @@ class Worker
|
|
|
* Run worker instance.
|
|
|
*
|
|
|
* @return void
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
|
public function run()
|
|
|
{
|
|
|
- //Update process state.
|
|
|
- static::$status = static::STATUS_RUNNING;
|
|
|
-
|
|
|
- // Register shutdown function for checking errors.
|
|
|
- \register_shutdown_function(["\\Workerman\\Worker", 'checkErrors']);
|
|
|
-
|
|
|
- // Create a global event loop.
|
|
|
- if (!static::$globalEvent) {
|
|
|
- $eventLoopClass = static::getEventLoopName();
|
|
|
- static::$globalEvent = new $eventLoopClass;
|
|
|
- static::$globalEvent->setErrorHandler(function ($exception) {
|
|
|
- static::stopAll(250, $exception);
|
|
|
- });
|
|
|
- $this->resumeAccept();
|
|
|
- }
|
|
|
-
|
|
|
- // Reinstall signal.
|
|
|
- static::reinstallSignal();
|
|
|
-
|
|
|
- // Init Timer.
|
|
|
- Timer::init(static::$globalEvent);
|
|
|
-
|
|
|
- // Set an empty onMessage callback.
|
|
|
- if (empty($this->onMessage)) {
|
|
|
- $this->onMessage = function () {
|
|
|
- };
|
|
|
- }
|
|
|
-
|
|
|
- \restore_error_handler();
|
|
|
+ $this->listen();
|
|
|
|
|
|
// Try to emit onWorkerStart callback.
|
|
|
if ($this->onWorkerStart) {
|
|
|
@@ -2416,9 +2416,6 @@ class Worker
|
|
|
static::stopAll(250, $e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- // Main loop.
|
|
|
- static::$globalEvent->run();
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -2507,8 +2504,8 @@ class Worker
|
|
|
// UdpConnection.
|
|
|
$connection = new UdpConnection($socket, $remoteAddress);
|
|
|
$connection->protocol = $this->protocol;
|
|
|
- $messageCb = $this->onMessage;
|
|
|
- if ($messageCb) {
|
|
|
+ $messageCallback = $this->onMessage;
|
|
|
+ if ($messageCallback) {
|
|
|
try {
|
|
|
if ($this->protocol !== null) {
|
|
|
/** @var \Workerman\Protocols\ProtocolInterface $parser */
|
|
|
@@ -2524,7 +2521,7 @@ class Worker
|
|
|
if ($data === false) {
|
|
|
continue;
|
|
|
}
|
|
|
- $messageCb($connection, $data);
|
|
|
+ $messageCallback($connection, $data);
|
|
|
}
|
|
|
} else {
|
|
|
$data = $parser::decode($recvBuffer, $connection);
|
|
|
@@ -2532,10 +2529,10 @@ class Worker
|
|
|
if ($data === false) {
|
|
|
return true;
|
|
|
}
|
|
|
- $messageCb($connection, $data);
|
|
|
+ $messageCallback($connection, $data);
|
|
|
}
|
|
|
} else {
|
|
|
- $messageCb($connection, $recvBuffer);
|
|
|
+ $messageCallback($connection, $recvBuffer);
|
|
|
}
|
|
|
++ConnectionInterface::$statistics['total_request'];
|
|
|
} catch (Throwable $e) {
|