walkor 2 роки тому
батько
коміт
ed7910473d
3 змінених файлів з 122 додано та 80 видалено
  1. 91 60
      src/Connection/TcpConnection.php
  2. 21 12
      src/Connection/UdpConnection.php
  3. 10 8
      src/Events/Ev.php

+ 91 - 60
src/Connection/TcpConnection.php

@@ -14,18 +14,47 @@
 
 namespace Workerman\Connection;
 
+use JsonSerializable;
 use stdClass;
 use Throwable;
 use Workerman\Events\EventInterface;
 use Workerman\Protocols\Http\Request;
 use Workerman\Protocols\ProtocolInterface;
 use Workerman\Worker;
+use function ceil;
+use function count;
+use function fclose;
+use function feof;
+use function fread;
+use function function_exists;
+use function fwrite;
+use function is_object;
+use function is_resource;
+use function key;
+use function method_exists;
+use function posix_getpid;
+use function restore_error_handler;
+use function set_error_handler;
+use function stream_set_blocking;
+use function stream_set_read_buffer;
+use function stream_socket_enable_crypto;
+use function stream_socket_get_name;
+use function strlen;
+use function strrchr;
+use function strrpos;
+use function substr;
+use function var_export;
+use const PHP_INT_MAX;
+use const STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
+use const STREAM_CRYPTO_METHOD_SSLv23_SERVER;
+use const STREAM_CRYPTO_METHOD_SSLv2_CLIENT;
+use const STREAM_CRYPTO_METHOD_SSLv2_SERVER;
 
 /**
  * TcpConnection.
  * @property string websocketType
  */
-class TcpConnection extends ConnectionInterface implements \JsonSerializable
+class TcpConnection extends ConnectionInterface implements JsonSerializable
 {
     /**
      * Read buffer size.
@@ -167,7 +196,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      * @var int
      */
     public int $maxSendBufferSize = 1048576;
-    
+
     /**
      * Context.
      *
@@ -307,14 +336,14 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
     {
         ++self::$statistics['connection_count'];
         $this->id = $this->realId = self::$idRecorder++;
-        if (self::$idRecorder === \PHP_INT_MAX) {
+        if (self::$idRecorder === PHP_INT_MAX) {
             self::$idRecorder = 0;
         }
         $this->socket = $socket;
-        \stream_set_blocking($this->socket, 0);
+        stream_set_blocking($this->socket, 0);
         // Compatible with hhvm
-        if (\function_exists('stream_set_read_buffer')) {
-            \stream_set_read_buffer($this->socket, 0);
+        if (function_exists('stream_set_read_buffer')) {
+            stream_set_read_buffer($this->socket, 0);
         }
         $this->eventLoop = $eventLoop;
         $this->eventLoop->onReadable($this->socket, [$this, 'baseRead']);
@@ -386,22 +415,22 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
             }
             $len = 0;
             try {
-                $len = @\fwrite($this->socket, $sendBuffer);
+                $len = @fwrite($this->socket, $sendBuffer);
             } catch (Throwable $e) {
                 Worker::log($e);
             }
             // send successful.
-            if ($len === \strlen($sendBuffer)) {
+            if ($len === strlen($sendBuffer)) {
                 $this->bytesWritten += $len;
                 return true;
             }
             // Send only part of the data.
             if ($len > 0) {
-                $this->sendBuffer = \substr($sendBuffer, $len);
+                $this->sendBuffer = substr($sendBuffer, $len);
                 $this->bytesWritten += $len;
             } else {
                 // Connection closed?
-                if (!\is_resource($this->socket) || \feof($this->socket)) {
+                if (!is_resource($this->socket) || feof($this->socket)) {
                     ++self::$statistics['send_fail'];
                     if ($this->onError) {
                         try {
@@ -438,9 +467,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      */
     public function getRemoteIp(): string
     {
-        $pos = \strrpos($this->remoteAddress, ':');
+        $pos = strrpos($this->remoteAddress, ':');
         if ($pos) {
-            return (string)\substr($this->remoteAddress, 0, $pos);
+            return substr($this->remoteAddress, 0, $pos);
         }
         return '';
     }
@@ -453,7 +482,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
     public function getRemotePort(): int
     {
         if ($this->remoteAddress) {
-            return (int)\substr(\strrchr($this->remoteAddress, ':'), 1);
+            return (int)substr(strrchr($this->remoteAddress, ':'), 1);
         }
         return 0;
     }
@@ -476,11 +505,11 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
     public function getLocalIp(): string
     {
         $address = $this->getLocalAddress();
-        $pos = \strrpos($address, ':');
+        $pos = strrpos($address, ':');
         if (!$pos) {
             return '';
         }
-        return \substr($address, 0, $pos);
+        return substr($address, 0, $pos);
     }
 
     /**
@@ -491,11 +520,11 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
     public function getLocalPort(): int
     {
         $address = $this->getLocalAddress();
-        $pos = \strrpos($address, ':');
+        $pos = strrpos($address, ':');
         if (!$pos) {
             return 0;
         }
-        return (int)\substr(\strrchr($address, ':'), 1);
+        return (int)substr(strrchr($address, ':'), 1);
     }
 
     /**
@@ -505,10 +534,10 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      */
     public function getLocalAddress(): string
     {
-        if (!\is_resource($this->socket)) {
+        if (!is_resource($this->socket)) {
             return '';
         }
-        return (string)@\stream_socket_get_name($this->socket, false);
+        return (string)@stream_socket_get_name($this->socket, false);
     }
 
     /**
@@ -518,7 +547,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      */
     public function getSendBufferQueueSize(): int
     {
-        return \strlen($this->sendBuffer);
+        return strlen($this->sendBuffer);
     }
 
     /**
@@ -528,7 +557,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      */
     public function getRecvBufferQueueSize(): int
     {
-        return \strlen($this->recvBuffer);
+        return strlen($this->recvBuffer);
     }
 
     /**
@@ -583,18 +612,18 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
 
         $buffer = '';
         try {
-            $buffer = @\fread($socket, self::READ_BUFFER_SIZE);
-        } catch (Throwable $e) {
+            $buffer = @fread($socket, self::READ_BUFFER_SIZE);
+        } catch (Throwable) {
         }
 
         // Check connection closed.
         if ($buffer === '' || $buffer === false) {
-            if ($checkEof && (\feof($socket) || !\is_resource($socket) || $buffer === false)) {
+            if ($checkEof && (feof($socket) || !is_resource($socket) || $buffer === false)) {
                 $this->destroy();
                 return;
             }
         } else {
-            $this->bytesRead += \strlen($buffer);
+            $this->bytesRead += strlen($buffer);
             if ($this->recvBuffer === '') {
                 if (static::$enableCache && !isset($buffer[512]) && isset($requests[$buffer])) {
                     ++self::$statistics['total_request'];
@@ -625,7 +654,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                 // The current packet length is known.
                 if ($this->currentPackageLength) {
                     // Data is not enough for a package.
-                    if ($this->currentPackageLength > \strlen($this->recvBuffer)) {
+                    if ($this->currentPackageLength > strlen($this->recvBuffer)) {
                         break;
                     }
                 } else {
@@ -634,19 +663,19 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                         /** @var ProtocolInterface $parser */
                         $parser = $this->protocol;
                         $this->currentPackageLength = $parser::input($this->recvBuffer, $this);
-                    } catch (Throwable $e) {
+                    } catch (Throwable) {
                     }
                     // The packet length is unknown.
                     if ($this->currentPackageLength === 0) {
                         break;
                     } elseif ($this->currentPackageLength > 0 && $this->currentPackageLength <= $this->maxPackageSize) {
                         // Data is not enough for a package.
-                        if ($this->currentPackageLength > \strlen($this->recvBuffer)) {
+                        if ($this->currentPackageLength > strlen($this->recvBuffer)) {
                             break;
                         }
                     } // Wrong package.
                     else {
-                        Worker::safeEcho('Error package. package_length=' . \var_export($this->currentPackageLength, true));
+                        Worker::safeEcho('Error package. package_length=' . var_export($this->currentPackageLength, true));
                         $this->destroy();
                         return;
                     }
@@ -655,14 +684,14 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                 // The data is enough for a packet.
                 ++self::$statistics['total_request'];
                 // The current packet length is equal to the length of the buffer.
-                if ($one = \strlen($this->recvBuffer) === $this->currentPackageLength) {
+                if ($one = strlen($this->recvBuffer) === $this->currentPackageLength) {
                     $oneRequestBuffer = $this->recvBuffer;
                     $this->recvBuffer = '';
                 } else {
                     // Get a full package from the buffer.
-                    $oneRequestBuffer = \substr($this->recvBuffer, 0, $this->currentPackageLength);
+                    $oneRequestBuffer = substr($this->recvBuffer, 0, $this->currentPackageLength);
                     // Remove the current package from receive buffer.
-                    $this->recvBuffer = \substr($this->recvBuffer, $this->currentPackageLength);
+                    $this->recvBuffer = substr($this->recvBuffer, $this->currentPackageLength);
                 }
                 // Reset the current packet length to 0.
                 $this->currentPackageLength = 0;
@@ -671,10 +700,10 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
                     /** @var ProtocolInterface $parser */
                     $parser = $this->protocol;
                     $request = $parser::decode($oneRequestBuffer, $this);
-                    if (static::$enableCache && (!\is_object($request) || $request instanceof Request) && $one && !isset($oneRequestBuffer[512])) {
+                    if (static::$enableCache && (!is_object($request) || $request instanceof Request) && $one && !isset($oneRequestBuffer[512])) {
                         $requests[$oneRequestBuffer] = $request;
-                        if (\count($requests) > 512) {
-                            unset($requests[\key($requests)]);
+                        if (count($requests) > 512) {
+                            unset($requests[key($requests)]);
                         }
                     }
                     ($this->onMessage)($this, $request);
@@ -711,12 +740,13 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         $len = 0;
         try {
             if ($this->transport === 'ssl') {
-                $len = @\fwrite($this->socket, $this->sendBuffer, 8192);
+                $len = @fwrite($this->socket, $this->sendBuffer, 8192);
             } else {
-                $len = @\fwrite($this->socket, $this->sendBuffer);
+                $len = @fwrite($this->socket, $this->sendBuffer);
             }
-        } catch (Throwable $e) {}
-        if ($len === \strlen($this->sendBuffer)) {
+        } catch (Throwable) {
+        }
+        if ($len === strlen($this->sendBuffer)) {
             $this->bytesWritten += $len;
             $this->eventLoop->offWritable($this->socket);
             $this->sendBuffer = '';
@@ -738,7 +768,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         }
         if ($len > 0) {
             $this->bytesWritten += $len;
-            $this->sendBuffer = \substr($this->sendBuffer, $len);
+            $this->sendBuffer = substr($this->sendBuffer, $len);
         } else {
             ++self::$statistics['send_fail'];
             $this->destroy();
@@ -754,7 +784,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      */
     public function doSslHandshake($socket): bool|int
     {
-        if (\feof($socket)) {
+        if (feof($socket)) {
             $this->destroy();
             return false;
         }
@@ -771,19 +801,19 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         }*/
 
         if ($async) {
-            $type = \STREAM_CRYPTO_METHOD_SSLv2_CLIENT | \STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
+            $type = STREAM_CRYPTO_METHOD_SSLv2_CLIENT | STREAM_CRYPTO_METHOD_SSLv23_CLIENT;
         } else {
-            $type = \STREAM_CRYPTO_METHOD_SSLv2_SERVER | \STREAM_CRYPTO_METHOD_SSLv23_SERVER;
+            $type = STREAM_CRYPTO_METHOD_SSLv2_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER;
         }
 
         // Hidden error.
-        \set_error_handler(function ($errno, $errstr, $file) {
+        set_error_handler(function ($errno, $err_str) {
             if (!Worker::$daemonize) {
-                Worker::safeEcho("SSL handshake error: $errstr \n");
+                Worker::safeEcho("SSL handshake error: $err_str \n");
             }
         });
-        $ret = \stream_socket_enable_crypto($socket, true, $type);
-        \restore_error_handler();
+        $ret = stream_socket_enable_crypto($socket, true, $type);
+        restore_error_handler();
         // Negotiation has failed.
         if (false === $ret) {
             $this->destroy();
@@ -807,13 +837,13 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         $this->onMessage = function ($source, $data) use ($dest) {
             $dest->send($data);
         };
-        $this->onClose = function ($source) use ($dest) {
+        $this->onClose = function () use ($dest) {
             $dest->close();
         };
-        $dest->onBufferFull = function ($dest) use ($source) {
+        $dest->onBufferFull = function () use ($source) {
             $source->pauseRecv();
         };
-        $dest->onBufferDrain = function ($dest) use ($source) {
+        $dest->onBufferDrain = function () use ($source) {
             $source->resumeRecv();
         };
     }
@@ -826,7 +856,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      */
     public function consumeRecvBuffer(int $length)
     {
-        $this->recvBuffer = \substr($this->recvBuffer, $length);
+        $this->recvBuffer = substr($this->recvBuffer, $length);
     }
 
     /**
@@ -879,7 +909,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
      */
     protected function checkBufferWillFull()
     {
-        if ($this->maxSendBufferSize <= \strlen($this->sendBuffer)) {
+        if ($this->maxSendBufferSize <= strlen($this->sendBuffer)) {
             if ($this->onBufferFull) {
                 try {
                     ($this->onBufferFull)($this);
@@ -899,7 +929,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
     protected function bufferIsFull(): bool
     {
         // Buffer has been marked as full but still has data to send then the packet is discarded.
-        if ($this->maxSendBufferSize <= \strlen($this->sendBuffer)) {
+        if ($this->maxSendBufferSize <= strlen($this->sendBuffer)) {
             if ($this->onError) {
                 try {
                     ($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
@@ -940,8 +970,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
 
         // Close socket.
         try {
-            @\fclose($this->socket);
-        } catch (Throwable $e) {
+            @fclose($this->socket);
+        } catch (Throwable) {
         }
 
         $this->status = self::STATUS_CLOSED;
@@ -954,7 +984,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
             }
         }
         // Try to emit protocol::onClose
-        if ($this->protocol && \method_exists($this->protocol, 'onClose')) {
+        if ($this->protocol && method_exists($this->protocol, 'onClose')) {
             try {
                 ([$this->protocol, 'onClose'])($this);
             } catch (Throwable $e) {
@@ -984,14 +1014,14 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
     {
         static::$enableCache = $value;
     }
-    
+
     /**
      * Get the json_encode information.
      *
      * @return array
      */
     public function jsonSerialize(): array
-    {        
+    {
         return [
             'id' => $this->id,
             'status' => $this->getStatus(),
@@ -1018,11 +1048,12 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
         self::$statistics['connection_count']--;
         if (Worker::getGracefulStop()) {
             if (!isset($mod)) {
-                $mod = \ceil((self::$statistics['connection_count'] + 1) / 3);
+                $mod = ceil((self::$statistics['connection_count'] + 1) / 3);
             }
 
             if (0 === self::$statistics['connection_count'] % $mod) {
-                Worker::log('worker[' . \posix_getpid() . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
+                $pid = function_exists('posix_getpid') ? posix_getpid() : 0;
+                Worker::log('worker[' . $pid . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
             }
 
             if (0 === self::$statistics['connection_count']) {

+ 21 - 12
src/Connection/UdpConnection.php

@@ -14,12 +14,21 @@
 
 namespace Workerman\Connection;
 
+use JetBrains\PhpStorm\ArrayShape;
+use JsonSerializable;
 use Workerman\Protocols\ProtocolInterface;
+use function stream_socket_get_name;
+use function stream_socket_sendto;
+use function strlen;
+use function strrchr;
+use function strrpos;
+use function substr;
+use function trim;
 
 /**
  * UdpConnection.
  */
-class UdpConnection extends ConnectionInterface implements \JsonSerializable
+class UdpConnection extends ConnectionInterface implements JsonSerializable
 {
     /**
      * Transport layer protocol.
@@ -71,7 +80,7 @@ class UdpConnection extends ConnectionInterface implements \JsonSerializable
                 return;
             }
         }
-        return \strlen($sendBuffer) === \stream_socket_sendto($this->socket, $sendBuffer, 0, $this->isIpV6() ? '[' . $this->getRemoteIp() . ']:' . $this->getRemotePort() : $this->remoteAddress);
+        return strlen($sendBuffer) === stream_socket_sendto($this->socket, $sendBuffer, 0, $this->isIpV6() ? '[' . $this->getRemoteIp() . ']:' . $this->getRemotePort() : $this->remoteAddress);
     }
 
     /**
@@ -81,9 +90,9 @@ class UdpConnection extends ConnectionInterface implements \JsonSerializable
      */
     public function getRemoteIp(): string
     {
-        $pos = \strrpos($this->remoteAddress, ':');
+        $pos = strrpos($this->remoteAddress, ':');
         if ($pos) {
-            return \trim(\substr($this->remoteAddress, 0, $pos), '[]');
+            return trim(substr($this->remoteAddress, 0, $pos), '[]');
         }
         return '';
     }
@@ -96,7 +105,7 @@ class UdpConnection extends ConnectionInterface implements \JsonSerializable
     public function getRemotePort(): int
     {
         if ($this->remoteAddress) {
-            return (int)\substr(\strrchr($this->remoteAddress, ':'), 1);
+            return (int)substr(strrchr($this->remoteAddress, ':'), 1);
         }
         return 0;
     }
@@ -119,11 +128,11 @@ class UdpConnection extends ConnectionInterface implements \JsonSerializable
     public function getLocalIp(): string
     {
         $address = $this->getLocalAddress();
-        $pos = \strrpos($address, ':');
+        $pos = strrpos($address, ':');
         if (!$pos) {
             return '';
         }
-        return \substr($address, 0, $pos);
+        return substr($address, 0, $pos);
     }
 
     /**
@@ -134,11 +143,11 @@ class UdpConnection extends ConnectionInterface implements \JsonSerializable
     public function getLocalPort(): int
     {
         $address = $this->getLocalAddress();
-        $pos = \strrpos($address, ':');
+        $pos = strrpos($address, ':');
         if (!$pos) {
             return 0;
         }
-        return (int)\substr(\strrchr($address, ':'), 1);
+        return (int)substr(strrchr($address, ':'), 1);
     }
 
     /**
@@ -148,7 +157,7 @@ class UdpConnection extends ConnectionInterface implements \JsonSerializable
      */
     public function getLocalAddress(): string
     {
-        return (string)@\stream_socket_get_name($this->socket, false);
+        return (string)@stream_socket_get_name($this->socket, false);
     }
 
 
@@ -175,13 +184,13 @@ class UdpConnection extends ConnectionInterface implements \JsonSerializable
     {
         return $this->socket;
     }
-    
+
     /**
      * Get the json_encode information.
      *
      * @return array
      */
-    public function jsonSerialize(): array
+    #[ArrayShape(['transport' => "string", 'getRemoteIp' => "string", 'remotePort' => "int", 'getRemoteAddress' => "string", 'getLocalIp' => "string", 'getLocalPort' => "int", 'getLocalAddress' => "string", 'isIpV4' => "bool", 'isIpV6' => "bool"])] public function jsonSerialize(): array
     {
         return [
             'transport' => $this->transport,

+ 10 - 8
src/Events/Ev.php

@@ -13,8 +13,10 @@
 
 namespace Workerman\Events;
 
-use Closure;
-use EvWatcher;
+use EvIo;
+use EvSignal;
+use EvTimer;
+use function count;
 
 /**
  * Ev eventloop
@@ -67,7 +69,7 @@ class Ev implements EventInterface
     public function delay(float $delay, callable $func, array $args = []): int
     {
         $timerId = self::$timerId;
-        $event = new \EvTimer($delay, 0, function () use ($func, $args, $timerId) {
+        $event = new EvTimer($delay, 0, function () use ($func, $args, $timerId) {
             unset($this->eventTimer[$timerId]);
             $func(...$args);
         });
@@ -101,7 +103,7 @@ class Ev implements EventInterface
      */
     public function repeat(float $interval, callable $func, array $args = []): int
     {
-        $event = new \EvTimer($interval, $interval, function () use ($func, $args) {
+        $event = new EvTimer($interval, $interval, function () use ($func, $args) {
             $func(...$args);
         });
         $this->eventTimer[self::$timerId] = $event;
@@ -114,7 +116,7 @@ class Ev implements EventInterface
     public function onReadable($stream, callable $func)
     {
         $fdKey = (int)$stream;
-        $event = new \EvIo($stream, \Ev::READ, function () use ($func, $stream) {
+        $event = new EvIo($stream, \Ev::READ, function () use ($func, $stream) {
             $func($stream);
         });
         $this->readEvents[$fdKey] = $event;
@@ -140,7 +142,7 @@ class Ev implements EventInterface
     public function onWritable($stream, callable $func)
     {
         $fdKey = (int)$stream;
-        $event = new \EvIo($stream, \Ev::WRITE, function () use ($func, $stream) {
+        $event = new EvIo($stream, \Ev::WRITE, function () use ($func, $stream) {
             $func($stream);
         });
         $this->readEvents[$fdKey] = $event;
@@ -165,7 +167,7 @@ class Ev implements EventInterface
      */
     public function onSignal(int $signal, callable $func)
     {
-        $event = new \EvSignal($signal, function () use ($func, $signal) {
+        $event = new EvSignal($signal, function () use ($func, $signal) {
             $func($signal);
         });
         $this->eventSignal[$signal] = $event;
@@ -216,7 +218,7 @@ class Ev implements EventInterface
      */
     public function getTimerCount(): int
     {
-        return \count($this->eventTimer);
+        return count($this->eventTimer);
     }
 
     /**