Bladeren bron

add connection->eventLoop

walkor 2 jaren geleden
bovenliggende
commit
72075d710e

+ 16 - 11
src/Connection/AsyncTcpConnection.php

@@ -196,6 +196,10 @@ class AsyncTcpConnection extends TcpConnection
             return;
         }
 
+        if (!$this->eventLoop) {
+            $this->eventLoop = Worker::$globalEvent;
+        }
+
         $this->status = self::STATUS_CONNECTING;
         $this->connectStartTime = \microtime(true);
         if ($this->transport !== 'unix') {
@@ -245,10 +249,10 @@ class AsyncTcpConnection extends TcpConnection
             return;
         }
         // Add socket to global event loop waiting connection is successfully established or faild.
-        Worker::$globalEvent->onWritable($this->socket, [$this, 'checkConnection']);
+        $this->eventLoop->onWritable($this->socket, [$this, 'checkConnection']);
         // For windows.
-        if (\DIRECTORY_SEPARATOR === '\\' && Worker::$eventLoopClass === Select::class) {
-            Worker::$globalEvent->onExcept($this->socket, [$this, 'checkConnection']);
+        if (\DIRECTORY_SEPARATOR === '\\' && \method_exists($this->eventLoop, 'onExcept')) {
+            $this->eventLoop->onExcept($this->socket, [$this, 'checkConnection']);
         }
     }
 
@@ -316,7 +320,7 @@ class AsyncTcpConnection extends TcpConnection
             try {
                 ($this->onError)($this, $code, $msg);
             } catch (\Throwable $e) {
-                Worker::stopAll(250, $e);
+                $this->error($e);
             }
         }
     }
@@ -330,11 +334,11 @@ class AsyncTcpConnection extends TcpConnection
     public function checkConnection()
     {
         // Remove EV_EXPECT for windows.
-        if (\DIRECTORY_SEPARATOR === '\\' && Worker::$eventLoopClass === Select::class) {
-            Worker::$globalEvent->offExcept($this->socket);
+        if (\DIRECTORY_SEPARATOR === '\\' && \method_exists($this->eventLoop, 'offExcept')) {
+            $this->eventLoop->offExcept($this->socket);
         }
         // Remove write listener.
-        Worker::$globalEvent->offWritable($this->socket);
+        $this->eventLoop->offWritable($this->socket);
 
         if ($this->status !== self::STATUS_CONNECTING) {
             return;
@@ -363,11 +367,11 @@ class AsyncTcpConnection extends TcpConnection
             } else {
                 // There are some data waiting to send.
                 if ($this->sendBuffer) {
-                    Worker::$globalEvent->onWritable($this->socket, [$this, 'baseWrite']);
+                    $this->eventLoop->onWritable($this->socket, [$this, 'baseWrite']);
                 }
             }
             // Register a listener waiting read event.
-            Worker::$globalEvent->onReadable($this->socket, [$this, 'baseRead']);
+            $this->eventLoop->onReadable($this->socket, [$this, 'baseRead']);
 
             $this->status = self::STATUS_ESTABLISHED;
             $this->remoteAddress = $address;
@@ -377,7 +381,7 @@ class AsyncTcpConnection extends TcpConnection
                 try {
                     ($this->onConnect)($this);
                 } catch (\Throwable $e) {
-                    Worker::stopAll(250, $e);
+                    $this->error($e);
                 }
             }
             // Try to emit protocol::onConnect
@@ -385,10 +389,11 @@ class AsyncTcpConnection extends TcpConnection
                 try {
                     [$this->protocol, 'onConnect']($this);
                 } catch (\Throwable $e) {
-                    Worker::stopAll(250, $e);
+                    $this->error($e);
                 }
             }
         } else {
+
             // Connection failed.
             $this->emitError(static::CONNECT_FAIL, 'connect ' . $this->remoteAddress . ' fail after ' . round(\microtime(true) - $this->connectStartTime, 4) . ' seconds');
             if ($this->status === self::STATUS_CLOSING) {

+ 14 - 9
src/Connection/AsyncUdpConnection.php

@@ -14,6 +14,7 @@
 
 namespace Workerman\Connection;
 
+use Throwable;
 use Workerman\Events\EventInterface;
 use Workerman\Worker;
 use \Exception;
@@ -97,8 +98,8 @@ class AsyncUdpConnection extends UdpConnection
             ++ConnectionInterface::$statistics['total_request'];
             try {
                 ($this->onMessage)($this, $recvBuffer);
-            } catch (\Throwable $e) {
-                Worker::stopAll(250, $e);
+            } catch (Throwable $e) {
+                $this->error($e);
             }
         }
         return true;
@@ -134,24 +135,25 @@ class AsyncUdpConnection extends UdpConnection
      * @param bool $raw
      *
      * @return bool
+     * @throws Throwable
      */
     public function close($data = null, $raw = false)
     {
         if ($data !== null) {
             $this->send($data, $raw);
         }
-        Worker::$globalEvent->offReadable($this->socket);
+        $this->eventLoop->offReadable($this->socket);
         \fclose($this->socket);
         $this->connected = false;
         // Try to emit onClose callback.
         if ($this->onClose) {
             try {
                 ($this->onClose)($this);
-            } catch (\Throwable $e) {
-                Worker::stopAll(250, $e);
+            } catch (Throwable $e) {
+                $this->error($e);
             }
         }
-        $this->onConnect = $this->onMessage = $this->onClose = null;
+        $this->onConnect = $this->onMessage = $this->onClose = $this->eventLoop = $this->errorHandler = null;
         return true;
     }
 
@@ -165,6 +167,9 @@ class AsyncUdpConnection extends UdpConnection
         if ($this->connected === true) {
             return;
         }
+        if (!$this->eventLoop) {
+            $this->eventLoop = Worker::$globalEvent;
+        }
         if ($this->contextOption) {
             $context = \stream_context_create($this->contextOption);
             $this->socket = \stream_socket_client("udp://{$this->remoteAddress}", $errno, $errmsg,
@@ -181,15 +186,15 @@ class AsyncUdpConnection extends UdpConnection
         \stream_set_blocking($this->socket, false);
 
         if ($this->onMessage) {
-            Worker::$globalEvent->onWritable($this->socket, [$this, 'baseRead']);
+            $this->eventLoop->onWritable($this->socket, [$this, 'baseRead']);
         }
         $this->connected = true;
         // Try to emit onConnect callback.
         if ($this->onConnect) {
             try {
                 ($this->onConnect)($this);
-            } catch (\Throwable $e) {
-                Worker::stopAll(250, $e);
+            } catch (Throwable $e) {
+                $this->error($e);
             }
         }
     }

+ 38 - 0
src/Connection/ConnectionInterface.php

@@ -14,6 +14,12 @@
 
 namespace Workerman\Connection;
 
+use Throwable;
+use Workerman\Events\Event;
+use Workerman\Events\EventInterface;
+use Workerman\Events\Revolt;
+use Workerman\Worker;
+
 /**
  * ConnectionInterface.
  */
@@ -68,6 +74,16 @@ abstract class ConnectionInterface
     public $onError = null;
 
     /**
+     * @var EventInterface
+     */
+    public $eventLoop;
+
+    /**
+     * @var \Closure
+     */
+    public $errorHandler;
+
+    /**
      * Sends data on the connection.
      *
      * @param mixed $sendBuffer
@@ -139,4 +155,26 @@ abstract class ConnectionInterface
      */
     abstract public function close($data = null);
 
+    /**
+     * @param Throwable $exception
+     * @return mixed
+     * @throws Throwable
+     */
+    public function error(Throwable $exception)
+    {
+        if (!$this->errorHandler) {
+            Worker::stopAll(250, $exception);
+            return;
+        }
+        try {
+            ($this->errorHandler)($exception);
+        } catch (Throwable $exception) {
+            if ($this->eventLoop instanceof Event) {
+                echo $exception;
+                return;
+            }
+            throw $exception;
+        }
+    }
+
 }

+ 23 - 21
src/Connection/TcpConnection.php

@@ -294,7 +294,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      * @param resource $socket
      * @param string $remoteAddress
      */
-    public function __construct($socket, $remoteAddress = '')
+    public function __construct($eventLoop, $socket, $remoteAddress = '')
     {
         ++self::$statistics['connection_count'];
         $this->id = $this->realId = self::$idRecorder++;
@@ -307,7 +307,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         if (\function_exists('stream_set_read_buffer')) {
             \stream_set_read_buffer($this->socket, 0);
         }
-        Worker::$globalEvent->onReadable($this->socket, [$this, 'baseRead']);
+        $this->eventLoop = $eventLoop;
+        $this->eventLoop->onReadable($this->socket, [$this, 'baseRead']);
         $this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
         $this->maxPackageSize = self::$defaultMaxPackageSize;
         $this->remoteAddress = $remoteAddress;
@@ -366,7 +367,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         // Attempt to send data directly.
         if ($this->sendBuffer === '') {
             if ($this->transport === 'ssl') {
-                Worker::$globalEvent->onWritable($this->socket, [$this, 'baseWrite']);
+                $this->eventLoop->onWritable($this->socket, [$this, 'baseWrite']);
                 $this->sendBuffer = $sendBuffer;
                 $this->checkBufferWillFull();
                 return;
@@ -394,7 +395,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                         try {
                             ($this->onError)($this, static::SEND_FAIL, 'client closed');
                         } catch (\Throwable $e) {
-                            Worker::stopAll(250, $e);
+                            $this->error($e);
                         }
                     }
                     $this->destroy();
@@ -402,7 +403,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                 }
                 $this->sendBuffer = $sendBuffer;
             }
-            Worker::$globalEvent->onWritable($this->socket, [$this, 'baseWrite']);
+            $this->eventLoop->onWritable($this->socket, [$this, 'baseWrite']);
             // Check if the send buffer will be full.
             $this->checkBufferWillFull();
             return;
@@ -551,7 +552,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      */
     public function pauseRecv()
     {
-        Worker::$globalEvent->offReadable($this->socket);
+        $this->eventLoop->offReadable($this->socket);
         $this->isPaused = true;
     }
 
@@ -563,7 +564,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
     public function resumeRecv()
     {
         if ($this->isPaused === true) {
-            Worker::$globalEvent->onReadable($this->socket, [$this, 'baseRead']);
+            $this->eventLoop->onReadable($this->socket, [$this, 'baseRead']);
             $this->isPaused = false;
             $this->baseRead($this->socket, false);
         }
@@ -585,7 +586,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
             if ($this->doSslHandshake($socket)) {
                 $this->sslHandshakeCompleted = true;
                 if ($this->sendBuffer) {
-                    Worker::$globalEvent->onWritable($socket, [$this, 'baseWrite']);
+                    $this->eventLoop->onWritable($socket, [$this, 'baseWrite']);
                 }
             } else {
                 return;
@@ -620,7 +621,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                     try {
                         ($this->onMessage)($this, $request);
                     } catch (\Throwable $e) {
-                        Worker::stopAll(250, $e);
+                        $this->error($e);
                     }
                     return;
                 }
@@ -686,7 +687,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                     }
                     ($this->onMessage)($this, $request);
                 } catch (\Throwable $e) {
-                    Worker::stopAll(250, $e);
+                    $this->error($e);
                 }
             }
             return;
@@ -701,7 +702,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         try {
             ($this->onMessage)($this, $this->recvBuffer);
         } catch (\Throwable $e) {
-            Worker::stopAll(250, $e);
+            $this->error($e);
         }
         // Clean receive buffer.
         $this->recvBuffer = '';
@@ -724,14 +725,14 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         } catch (\Throwable $e) {}
         if ($len === \strlen($this->sendBuffer)) {
             $this->bytesWritten += $len;
-            Worker::$globalEvent->offWritable($this->socket);
+            $this->eventLoop->offWritable($this->socket);
             $this->sendBuffer = '';
             // Try to emit onBufferDrain callback when the send buffer becomes empty.
             if ($this->onBufferDrain) {
                 try {
                     ($this->onBufferDrain)($this);
                 } catch (\Throwable $e) {
-                    Worker::stopAll(250, $e);
+                    $this->error($e);
                 }
             }
             if ($this->status === self::STATUS_CLOSING) {
@@ -801,7 +802,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
             try {
                 ($this->onSslHandshake)($this);
             } catch (\Throwable $e) {
-                Worker::stopAll(250, $e);
+                $this->error($e);
             }
         }
         return true;
@@ -894,7 +895,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                 try {
                     ($this->onBufferFull)($this);
                 } catch (\Throwable $e) {
-                    Worker::stopAll(250, $e);
+                    $this->error($e);
                 }
             }
         }
@@ -913,7 +914,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                 try {
                     ($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
                 } catch (\Throwable $e) {
-                    Worker::stopAll(250, $e);
+                    $this->error($e);
                 }
             }
             return true;
@@ -935,6 +936,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      * Destroy connection.
      *
      * @return void
+     * @throws \Throwable
      */
     public function destroy()
     {
@@ -943,8 +945,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
             return;
         }
         // Remove event listener.
-        Worker::$globalEvent->offReadable($this->socket);
-        Worker::$globalEvent->offWritable($this->socket);
+        $this->eventLoop->offReadable($this->socket);
+        $this->eventLoop->offWritable($this->socket);
 
         // Close socket.
         try {
@@ -958,7 +960,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
             try {
                 ($this->onClose)($this);
             } catch (\Throwable $e) {
-                Worker::stopAll(250, $e);
+                $this->error($e);
             }
         }
         // Try to emit protocol::onClose
@@ -966,7 +968,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
             try {
                 ([$this->protocol, 'onClose'])($this);
             } catch (\Throwable $e) {
-                Worker::stopAll(250, $e);
+                $this->error($e);
             }
         }
         $this->sendBuffer = $this->recvBuffer = '';
@@ -974,7 +976,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         $this->isPaused = $this->sslHandshakeCompleted = false;
         if ($this->status === self::STATUS_CLOSED) {
             // Cleaning up the callback to avoid memory leaks.
-            $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
+            $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = $this->eventLoop = $this->errorHandler = null;
             // Remove from worker->connections.
             if ($this->worker) {
                 unset($this->worker->connections[$this->realId]);

+ 1 - 0
src/Connection/UdpConnection.php

@@ -196,6 +196,7 @@ class UdpConnection extends ConnectionInterface implements \JsonSerializable
         if ($data !== null) {
             $this->send($data, $raw);
         }
+        $this->eventLoop = $this->errorHandler = null;
         return true;
     }
 

+ 0 - 1
src/Events/Select.php

@@ -344,7 +344,6 @@ class Select implements EventInterface
             $read = $this->readFds;
             $write = $this->writeFds;
             $except = $this->exceptFds;
-
             if ($read || $write || $except) {
                 // Waiting read/write/signal/timeout events.
                 try {

+ 1 - 1
src/Worker.php

@@ -2453,7 +2453,7 @@ class Worker
         }
 
         // TcpConnection.
-        $connection                         = new TcpConnection($newSocket, $remoteAddress);
+        $connection                         = new TcpConnection(Worker::$globalEvent, $newSocket, $remoteAddress);
         $this->connections[$connection->id] = $connection;
         $connection->worker                 = $this;
         $connection->protocol               = $this->protocol;