|
@@ -14,9 +14,10 @@
|
|
|
|
|
|
|
|
namespace Workerman\Connection;
|
|
namespace Workerman\Connection;
|
|
|
|
|
|
|
|
|
|
+use stdClass;
|
|
|
|
|
+use Throwable;
|
|
|
use Workerman\Events\EventInterface;
|
|
use Workerman\Events\EventInterface;
|
|
|
use Workerman\Protocols\Http\Request;
|
|
use Workerman\Protocols\Http\Request;
|
|
|
-use Workerman\Protocols\ProtocolInterface;
|
|
|
|
|
use Workerman\Worker;
|
|
use Workerman\Worker;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -70,161 +71,153 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
/**
|
|
/**
|
|
|
* Emitted when socket connection is successfully established.
|
|
* Emitted when socket connection is successfully established.
|
|
|
*
|
|
*
|
|
|
- * @var callable|null
|
|
|
|
|
|
|
+ * @var ?callable
|
|
|
*/
|
|
*/
|
|
|
public $onConnect = null;
|
|
public $onConnect = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Emitted when websocket handshake completed (Only work when protocol is ws).
|
|
* Emitted when websocket handshake completed (Only work when protocol is ws).
|
|
|
*
|
|
*
|
|
|
- * @var callable|null
|
|
|
|
|
|
|
+ * @var ?callable
|
|
|
*/
|
|
*/
|
|
|
public $onWebSocketConnect = null;
|
|
public $onWebSocketConnect = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Emitted when data is received.
|
|
* Emitted when data is received.
|
|
|
*
|
|
*
|
|
|
- * @var callable
|
|
|
|
|
|
|
+ * @var ?callable
|
|
|
*/
|
|
*/
|
|
|
public $onMessage = null;
|
|
public $onMessage = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Emitted when the other end of the socket sends a FIN packet.
|
|
* Emitted when the other end of the socket sends a FIN packet.
|
|
|
*
|
|
*
|
|
|
- * @var callable
|
|
|
|
|
|
|
+ * @var ?callable
|
|
|
*/
|
|
*/
|
|
|
public $onClose = null;
|
|
public $onClose = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Emitted when an error occurs with connection.
|
|
* Emitted when an error occurs with connection.
|
|
|
*
|
|
*
|
|
|
- * @var callable
|
|
|
|
|
|
|
+ * @var ?callable
|
|
|
*/
|
|
*/
|
|
|
public $onError = null;
|
|
public $onError = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Emitted when the send buffer becomes full.
|
|
* Emitted when the send buffer becomes full.
|
|
|
*
|
|
*
|
|
|
- * @var callable
|
|
|
|
|
|
|
+ * @var ?callable
|
|
|
*/
|
|
*/
|
|
|
public $onBufferFull = null;
|
|
public $onBufferFull = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Emitted when the send buffer becomes empty.
|
|
|
|
|
|
|
+ * Emitted when send buffer becomes empty.
|
|
|
*
|
|
*
|
|
|
- * @var callable
|
|
|
|
|
|
|
+ * @var ?callable
|
|
|
*/
|
|
*/
|
|
|
public $onBufferDrain = null;
|
|
public $onBufferDrain = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Application layer protocol.
|
|
|
|
|
- * The format is like this Workerman\\Protocols\\Http.
|
|
|
|
|
- *
|
|
|
|
|
- * @var ProtocolInterface
|
|
|
|
|
- */
|
|
|
|
|
- public $protocol = null;
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
* Transport (tcp/udp/unix/ssl).
|
|
* Transport (tcp/udp/unix/ssl).
|
|
|
*
|
|
*
|
|
|
* @var string
|
|
* @var string
|
|
|
*/
|
|
*/
|
|
|
- public $transport = 'tcp';
|
|
|
|
|
|
|
+ public string $transport = 'tcp';
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Which worker belong to.
|
|
* Which worker belong to.
|
|
|
*
|
|
*
|
|
|
- * @var Worker
|
|
|
|
|
|
|
+ * @var ?Worker
|
|
|
*/
|
|
*/
|
|
|
- public $worker = null;
|
|
|
|
|
|
|
+ public ?Worker $worker = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Bytes read.
|
|
* Bytes read.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- public $bytesRead = 0;
|
|
|
|
|
|
|
+ public int $bytesRead = 0;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Bytes written.
|
|
* Bytes written.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- public $bytesWritten = 0;
|
|
|
|
|
|
|
+ public int $bytesWritten = 0;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Connection->id.
|
|
* Connection->id.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- public $id = 0;
|
|
|
|
|
|
|
+ public int $id = 0;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* A copy of $worker->id which used to clean up the connection in worker->connections
|
|
* A copy of $worker->id which used to clean up the connection in worker->connections
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- protected $realId = 0;
|
|
|
|
|
|
|
+ protected int $realId = 0;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Sets the maximum send buffer size for the current connection.
|
|
* Sets the maximum send buffer size for the current connection.
|
|
|
- * OnBufferFull callback will be emited When the send buffer is full.
|
|
|
|
|
|
|
+ * OnBufferFull callback will be emited When send buffer is full.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- public $maxSendBufferSize = 1048576;
|
|
|
|
|
|
|
+ public int $maxSendBufferSize = 1048576;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Context.
|
|
* Context.
|
|
|
*
|
|
*
|
|
|
- * @var object
|
|
|
|
|
|
|
+ * @var ?stdClass
|
|
|
*/
|
|
*/
|
|
|
- public $context;
|
|
|
|
|
|
|
+ public ?stdClass $context = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* @var array
|
|
* @var array
|
|
|
*/
|
|
*/
|
|
|
- public $headers = [];
|
|
|
|
|
|
|
+ public array $headers = [];
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * @var object
|
|
|
|
|
|
|
+ * @var ?Request
|
|
|
*/
|
|
*/
|
|
|
- public $request;
|
|
|
|
|
|
|
+ public ?Request $request = null;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Default send buffer size.
|
|
* Default send buffer size.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- public static $defaultMaxSendBufferSize = 1048576;
|
|
|
|
|
|
|
+ public static int $defaultMaxSendBufferSize = 1048576;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Sets the maximum acceptable packet size for the current connection.
|
|
* Sets the maximum acceptable packet size for the current connection.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- public $maxPackageSize = 1048576;
|
|
|
|
|
|
|
+ public int $maxPackageSize = 1048576;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Default maximum acceptable packet size.
|
|
* Default maximum acceptable packet size.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- public static $defaultMaxPackageSize = 10485760;
|
|
|
|
|
|
|
+ public static int $defaultMaxPackageSize = 10485760;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Id recorder.
|
|
* Id recorder.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- protected static $idRecorder = 1;
|
|
|
|
|
|
|
+ protected static int $idRecorder = 1;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Cache.
|
|
* Cache.
|
|
|
*
|
|
*
|
|
|
* @var bool.
|
|
* @var bool.
|
|
|
*/
|
|
*/
|
|
|
- protected static $enableCache = true;
|
|
|
|
|
|
|
+ protected static bool $enableCache = true;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Socket
|
|
* Socket
|
|
@@ -238,63 +231,63 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @var string
|
|
* @var string
|
|
|
*/
|
|
*/
|
|
|
- protected $sendBuffer = '';
|
|
|
|
|
|
|
+ protected string $sendBuffer = '';
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Receive buffer.
|
|
* Receive buffer.
|
|
|
*
|
|
*
|
|
|
* @var string
|
|
* @var string
|
|
|
*/
|
|
*/
|
|
|
- protected $recvBuffer = '';
|
|
|
|
|
|
|
+ protected string $recvBuffer = '';
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Current package length.
|
|
* Current package length.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- protected $currentPackageLength = 0;
|
|
|
|
|
|
|
+ protected int $currentPackageLength = 0;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Connection status.
|
|
* Connection status.
|
|
|
*
|
|
*
|
|
|
* @var int
|
|
* @var int
|
|
|
*/
|
|
*/
|
|
|
- protected $status = self::STATUS_ESTABLISHED;
|
|
|
|
|
|
|
+ protected int $status = self::STATUS_ESTABLISHED;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Remote address.
|
|
* Remote address.
|
|
|
*
|
|
*
|
|
|
* @var string
|
|
* @var string
|
|
|
*/
|
|
*/
|
|
|
- protected $remoteAddress = '';
|
|
|
|
|
|
|
+ protected string $remoteAddress = '';
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Is paused.
|
|
* Is paused.
|
|
|
*
|
|
*
|
|
|
* @var bool
|
|
* @var bool
|
|
|
*/
|
|
*/
|
|
|
- protected $isPaused = false;
|
|
|
|
|
|
|
+ protected bool $isPaused = false;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* SSL handshake completed or not.
|
|
* SSL handshake completed or not.
|
|
|
*
|
|
*
|
|
|
* @var bool
|
|
* @var bool
|
|
|
*/
|
|
*/
|
|
|
- protected $sslHandshakeCompleted = false;
|
|
|
|
|
|
|
+ protected bool $sslHandshakeCompleted = false;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* All connection instances.
|
|
* All connection instances.
|
|
|
*
|
|
*
|
|
|
* @var array
|
|
* @var array
|
|
|
*/
|
|
*/
|
|
|
- public static $connections = [];
|
|
|
|
|
|
|
+ public static array $connections = [];
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Status to string.
|
|
* Status to string.
|
|
|
*
|
|
*
|
|
|
* @var array
|
|
* @var array
|
|
|
*/
|
|
*/
|
|
|
- public static $statusToString = [
|
|
|
|
|
|
|
+ const STATUS_TO_STRING = [
|
|
|
self::STATUS_INITIAL => 'INITIAL',
|
|
self::STATUS_INITIAL => 'INITIAL',
|
|
|
self::STATUS_CONNECTING => 'CONNECTING',
|
|
self::STATUS_CONNECTING => 'CONNECTING',
|
|
|
self::STATUS_ESTABLISHED => 'ESTABLISHED',
|
|
self::STATUS_ESTABLISHED => 'ESTABLISHED',
|
|
@@ -305,10 +298,11 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
/**
|
|
/**
|
|
|
* Construct.
|
|
* Construct.
|
|
|
*
|
|
*
|
|
|
|
|
+ * @param EventInterface $eventLoop
|
|
|
* @param resource $socket
|
|
* @param resource $socket
|
|
|
* @param string $remoteAddress
|
|
* @param string $remoteAddress
|
|
|
*/
|
|
*/
|
|
|
- public function __construct($eventLoop, $socket, $remoteAddress = '')
|
|
|
|
|
|
|
+ public function __construct(EventInterface $eventLoop, $socket, string $remoteAddress = '')
|
|
|
{
|
|
{
|
|
|
++self::$statistics['connection_count'];
|
|
++self::$statistics['connection_count'];
|
|
|
$this->id = $this->realId = self::$idRecorder++;
|
|
$this->id = $this->realId = self::$idRecorder++;
|
|
@@ -327,7 +321,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
$this->maxPackageSize = self::$defaultMaxPackageSize;
|
|
$this->maxPackageSize = self::$defaultMaxPackageSize;
|
|
|
$this->remoteAddress = $remoteAddress;
|
|
$this->remoteAddress = $remoteAddress;
|
|
|
static::$connections[$this->id] = $this;
|
|
static::$connections[$this->id] = $this;
|
|
|
- $this->context = new \stdClass;
|
|
|
|
|
|
|
+ $this->context = new stdClass;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -337,12 +331,12 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @return int|string
|
|
* @return int|string
|
|
|
*/
|
|
*/
|
|
|
- public function getStatus($rawOutput = true)
|
|
|
|
|
|
|
+ public function getStatus(bool $rawOutput = true): int|string
|
|
|
{
|
|
{
|
|
|
if ($rawOutput) {
|
|
if ($rawOutput) {
|
|
|
return $this->status;
|
|
return $this->status;
|
|
|
}
|
|
}
|
|
|
- return self::$statusToString[$this->status];
|
|
|
|
|
|
|
+ return self::STATUS_TO_STRING[$this->status];
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -351,8 +345,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
* @param mixed $sendBuffer
|
|
* @param mixed $sendBuffer
|
|
|
* @param bool $raw
|
|
* @param bool $raw
|
|
|
* @return bool|void
|
|
* @return bool|void
|
|
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
*/
|
|
|
- public function send($sendBuffer, $raw = false)
|
|
|
|
|
|
|
+ public function send(mixed $sendBuffer, bool $raw = false)
|
|
|
{
|
|
{
|
|
|
if ($this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
|
|
if ($this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
|
|
|
return false;
|
|
return false;
|
|
@@ -389,7 +384,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
$len = 0;
|
|
$len = 0;
|
|
|
try {
|
|
try {
|
|
|
$len = @\fwrite($this->socket, $sendBuffer);
|
|
$len = @\fwrite($this->socket, $sendBuffer);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
Worker::log($e);
|
|
Worker::log($e);
|
|
|
}
|
|
}
|
|
|
// send successful.
|
|
// send successful.
|
|
@@ -408,7 +403,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
if ($this->onError) {
|
|
if ($this->onError) {
|
|
|
try {
|
|
try {
|
|
|
($this->onError)($this, static::SEND_FAIL, 'client closed');
|
|
($this->onError)($this, static::SEND_FAIL, 'client closed');
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
$this->error($e);
|
|
$this->error($e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -418,7 +413,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
$this->sendBuffer = $sendBuffer;
|
|
$this->sendBuffer = $sendBuffer;
|
|
|
}
|
|
}
|
|
|
$this->eventLoop->onWritable($this->socket, [$this, 'baseWrite']);
|
|
$this->eventLoop->onWritable($this->socket, [$this, 'baseWrite']);
|
|
|
- // Check if the send buffer will be full.
|
|
|
|
|
|
|
+ // Check if send buffer will be full.
|
|
|
$this->checkBufferWillFull();
|
|
$this->checkBufferWillFull();
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
@@ -429,7 +424,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
$this->sendBuffer .= $sendBuffer;
|
|
$this->sendBuffer .= $sendBuffer;
|
|
|
- // Check if the send buffer is full.
|
|
|
|
|
|
|
+ // Check if send buffer is full.
|
|
|
$this->checkBufferWillFull();
|
|
$this->checkBufferWillFull();
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -438,7 +433,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @return string
|
|
* @return string
|
|
|
*/
|
|
*/
|
|
|
- public function getRemoteIp()
|
|
|
|
|
|
|
+ public function getRemoteIp(): string
|
|
|
{
|
|
{
|
|
|
$pos = \strrpos($this->remoteAddress, ':');
|
|
$pos = \strrpos($this->remoteAddress, ':');
|
|
|
if ($pos) {
|
|
if ($pos) {
|
|
@@ -452,7 +447,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @return int
|
|
* @return int
|
|
|
*/
|
|
*/
|
|
|
- public function getRemotePort()
|
|
|
|
|
|
|
+ public function getRemotePort(): int
|
|
|
{
|
|
{
|
|
|
if ($this->remoteAddress) {
|
|
if ($this->remoteAddress) {
|
|
|
return (int)\substr(\strrchr($this->remoteAddress, ':'), 1);
|
|
return (int)\substr(\strrchr($this->remoteAddress, ':'), 1);
|
|
@@ -465,7 +460,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @return string
|
|
* @return string
|
|
|
*/
|
|
*/
|
|
|
- public function getRemoteAddress()
|
|
|
|
|
|
|
+ public function getRemoteAddress(): string
|
|
|
{
|
|
{
|
|
|
return $this->remoteAddress;
|
|
return $this->remoteAddress;
|
|
|
}
|
|
}
|
|
@@ -475,7 +470,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @return string
|
|
* @return string
|
|
|
*/
|
|
*/
|
|
|
- public function getLocalIp()
|
|
|
|
|
|
|
+ public function getLocalIp(): string
|
|
|
{
|
|
{
|
|
|
$address = $this->getLocalAddress();
|
|
$address = $this->getLocalAddress();
|
|
|
$pos = \strrpos($address, ':');
|
|
$pos = \strrpos($address, ':');
|
|
@@ -490,7 +485,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @return int
|
|
* @return int
|
|
|
*/
|
|
*/
|
|
|
- public function getLocalPort()
|
|
|
|
|
|
|
+ public function getLocalPort(): int
|
|
|
{
|
|
{
|
|
|
$address = $this->getLocalAddress();
|
|
$address = $this->getLocalAddress();
|
|
|
$pos = \strrpos($address, ':');
|
|
$pos = \strrpos($address, ':');
|
|
@@ -505,7 +500,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @return string
|
|
* @return string
|
|
|
*/
|
|
*/
|
|
|
- public function getLocalAddress()
|
|
|
|
|
|
|
+ public function getLocalAddress(): string
|
|
|
{
|
|
{
|
|
|
if (!\is_resource($this->socket)) {
|
|
if (!\is_resource($this->socket)) {
|
|
|
return '';
|
|
return '';
|
|
@@ -518,48 +513,22 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @return integer
|
|
* @return integer
|
|
|
*/
|
|
*/
|
|
|
- public function getSendBufferQueueSize()
|
|
|
|
|
|
|
+ public function getSendBufferQueueSize(): int
|
|
|
{
|
|
{
|
|
|
return \strlen($this->sendBuffer);
|
|
return \strlen($this->sendBuffer);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Get recv buffer queue size.
|
|
|
|
|
|
|
+ * Get receive buffer queue size.
|
|
|
*
|
|
*
|
|
|
* @return integer
|
|
* @return integer
|
|
|
*/
|
|
*/
|
|
|
- public function getRecvBufferQueueSize()
|
|
|
|
|
|
|
+ public function getRecvBufferQueueSize(): int
|
|
|
{
|
|
{
|
|
|
return \strlen($this->recvBuffer);
|
|
return \strlen($this->recvBuffer);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Is ipv4.
|
|
|
|
|
- *
|
|
|
|
|
- * return bool.
|
|
|
|
|
- */
|
|
|
|
|
- public function isIpV4()
|
|
|
|
|
- {
|
|
|
|
|
- if ($this->transport === 'unix') {
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
- return \strpos($this->getRemoteIp(), ':') === false;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
- * Is ipv6.
|
|
|
|
|
- *
|
|
|
|
|
- * return bool.
|
|
|
|
|
- */
|
|
|
|
|
- public function isIpV6()
|
|
|
|
|
- {
|
|
|
|
|
- if ($this->transport === 'unix') {
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
- return \strpos($this->getRemoteIp(), ':') !== false;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
* Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
|
|
* Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
|
|
|
*
|
|
*
|
|
|
* @return void
|
|
* @return void
|
|
@@ -574,6 +543,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
* Resumes reading after a call to pauseRecv.
|
|
* Resumes reading after a call to pauseRecv.
|
|
|
*
|
|
*
|
|
|
* @return void
|
|
* @return void
|
|
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
*/
|
|
|
public function resumeRecv()
|
|
public function resumeRecv()
|
|
|
{
|
|
{
|
|
@@ -591,8 +561,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
* @param resource $socket
|
|
* @param resource $socket
|
|
|
* @param bool $checkEof
|
|
* @param bool $checkEof
|
|
|
* @return void
|
|
* @return void
|
|
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
*/
|
|
|
- public function baseRead($socket, $checkEof = true)
|
|
|
|
|
|
|
+ public function baseRead($socket, bool $checkEof = true)
|
|
|
{
|
|
{
|
|
|
static $requests = [];
|
|
static $requests = [];
|
|
|
// SSL handshake.
|
|
// SSL handshake.
|
|
@@ -610,7 +581,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
$buffer = '';
|
|
$buffer = '';
|
|
|
try {
|
|
try {
|
|
|
$buffer = @\fread($socket, self::READ_BUFFER_SIZE);
|
|
$buffer = @\fread($socket, self::READ_BUFFER_SIZE);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Check connection closed.
|
|
// Check connection closed.
|
|
@@ -634,7 +605,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
}
|
|
}
|
|
|
try {
|
|
try {
|
|
|
($this->onMessage)($this, $request);
|
|
($this->onMessage)($this, $request);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
$this->error($e);
|
|
$this->error($e);
|
|
|
}
|
|
}
|
|
|
return;
|
|
return;
|
|
@@ -658,7 +629,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
// Get current package length.
|
|
// Get current package length.
|
|
|
try {
|
|
try {
|
|
|
$this->currentPackageLength = $this->protocol::input($this->recvBuffer, $this);
|
|
$this->currentPackageLength = $this->protocol::input($this->recvBuffer, $this);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
}
|
|
}
|
|
|
// The packet length is unknown.
|
|
// The packet length is unknown.
|
|
|
if ($this->currentPackageLength === 0) {
|
|
if ($this->currentPackageLength === 0) {
|
|
@@ -685,7 +656,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
} else {
|
|
} else {
|
|
|
// Get a full package from the buffer.
|
|
// Get a full package from the buffer.
|
|
|
$oneRequestBuffer = \substr($this->recvBuffer, 0, $this->currentPackageLength);
|
|
$oneRequestBuffer = \substr($this->recvBuffer, 0, $this->currentPackageLength);
|
|
|
- // Remove the current package from the receive buffer.
|
|
|
|
|
|
|
+ // Remove the current package from receive buffer.
|
|
|
$this->recvBuffer = \substr($this->recvBuffer, $this->currentPackageLength);
|
|
$this->recvBuffer = \substr($this->recvBuffer, $this->currentPackageLength);
|
|
|
}
|
|
}
|
|
|
// Reset the current packet length to 0.
|
|
// Reset the current packet length to 0.
|
|
@@ -700,7 +671,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
($this->onMessage)($this, $request);
|
|
($this->onMessage)($this, $request);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
$this->error($e);
|
|
$this->error($e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -715,7 +686,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
++self::$statistics['total_request'];
|
|
++self::$statistics['total_request'];
|
|
|
try {
|
|
try {
|
|
|
($this->onMessage)($this, $this->recvBuffer);
|
|
($this->onMessage)($this, $this->recvBuffer);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
$this->error($e);
|
|
$this->error($e);
|
|
|
}
|
|
}
|
|
|
// Clean receive buffer.
|
|
// Clean receive buffer.
|
|
@@ -725,7 +696,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
/**
|
|
/**
|
|
|
* Base write handler.
|
|
* Base write handler.
|
|
|
*
|
|
*
|
|
|
- * @return void|bool
|
|
|
|
|
|
|
+ * @return void
|
|
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
*/
|
|
|
public function baseWrite()
|
|
public function baseWrite()
|
|
|
{
|
|
{
|
|
@@ -736,26 +708,26 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
} else {
|
|
} else {
|
|
|
$len = @\fwrite($this->socket, $this->sendBuffer);
|
|
$len = @\fwrite($this->socket, $this->sendBuffer);
|
|
|
}
|
|
}
|
|
|
- } catch (\Throwable $e) {}
|
|
|
|
|
|
|
+ } catch (Throwable $e) {}
|
|
|
if ($len === \strlen($this->sendBuffer)) {
|
|
if ($len === \strlen($this->sendBuffer)) {
|
|
|
$this->bytesWritten += $len;
|
|
$this->bytesWritten += $len;
|
|
|
$this->eventLoop->offWritable($this->socket);
|
|
$this->eventLoop->offWritable($this->socket);
|
|
|
$this->sendBuffer = '';
|
|
$this->sendBuffer = '';
|
|
|
- // Try to emit onBufferDrain callback when the send buffer becomes empty.
|
|
|
|
|
|
|
+ // Try to emit onBufferDrain callback when send buffer becomes empty.
|
|
|
if ($this->onBufferDrain) {
|
|
if ($this->onBufferDrain) {
|
|
|
try {
|
|
try {
|
|
|
($this->onBufferDrain)($this);
|
|
($this->onBufferDrain)($this);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
$this->error($e);
|
|
$this->error($e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
if ($this->status === self::STATUS_CLOSING) {
|
|
if ($this->status === self::STATUS_CLOSING) {
|
|
|
if ($this->context->streamSending) {
|
|
if ($this->context->streamSending) {
|
|
|
- return true;
|
|
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
$this->destroy();
|
|
$this->destroy();
|
|
|
}
|
|
}
|
|
|
- return true;
|
|
|
|
|
|
|
+ return;
|
|
|
}
|
|
}
|
|
|
if ($len > 0) {
|
|
if ($len > 0) {
|
|
|
$this->bytesWritten += $len;
|
|
$this->bytesWritten += $len;
|
|
@@ -770,9 +742,10 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
* SSL handshake.
|
|
* SSL handshake.
|
|
|
*
|
|
*
|
|
|
* @param resource $socket
|
|
* @param resource $socket
|
|
|
- * @return bool
|
|
|
|
|
|
|
+ * @return bool|int
|
|
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
*/
|
|
|
- public function doSslHandshake($socket)
|
|
|
|
|
|
|
+ public function doSslHandshake($socket): bool|int
|
|
|
{
|
|
{
|
|
|
if (\feof($socket)) {
|
|
if (\feof($socket)) {
|
|
|
$this->destroy();
|
|
$this->destroy();
|
|
@@ -812,13 +785,6 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
// There isn't enough data and should try again.
|
|
// There isn't enough data and should try again.
|
|
|
return 0;
|
|
return 0;
|
|
|
}
|
|
}
|
|
|
- if (isset($this->onSslHandshake)) {
|
|
|
|
|
- try {
|
|
|
|
|
- ($this->onSslHandshake)($this);
|
|
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
- $this->error($e);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
return true;
|
|
return true;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -851,7 +817,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
* @param int $length
|
|
* @param int $length
|
|
|
* @return void
|
|
* @return void
|
|
|
*/
|
|
*/
|
|
|
- public function consumeRecvBuffer($length)
|
|
|
|
|
|
|
+ public function consumeRecvBuffer(int $length)
|
|
|
{
|
|
{
|
|
|
$this->recvBuffer = \substr($this->recvBuffer, $length);
|
|
$this->recvBuffer = \substr($this->recvBuffer, $length);
|
|
|
}
|
|
}
|
|
@@ -859,11 +825,12 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
/**
|
|
/**
|
|
|
* Close connection.
|
|
* Close connection.
|
|
|
*
|
|
*
|
|
|
- * @param mixed $data
|
|
|
|
|
|
|
+ * @param mixed|null $data
|
|
|
* @param bool $raw
|
|
* @param bool $raw
|
|
|
* @return void
|
|
* @return void
|
|
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
*/
|
|
|
- public function close($data = null, $raw = false)
|
|
|
|
|
|
|
+ public function close(mixed $data = null, bool $raw = false)
|
|
|
{
|
|
{
|
|
|
if ($this->status === self::STATUS_CONNECTING) {
|
|
if ($this->status === self::STATUS_CONNECTING) {
|
|
|
$this->destroy();
|
|
$this->destroy();
|
|
@@ -898,9 +865,10 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Check whether the send buffer will be full.
|
|
|
|
|
|
|
+ * Check whether send buffer will be full.
|
|
|
*
|
|
*
|
|
|
* @return void
|
|
* @return void
|
|
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
*/
|
|
|
protected function checkBufferWillFull()
|
|
protected function checkBufferWillFull()
|
|
|
{
|
|
{
|
|
@@ -908,7 +876,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
if ($this->onBufferFull) {
|
|
if ($this->onBufferFull) {
|
|
|
try {
|
|
try {
|
|
|
($this->onBufferFull)($this);
|
|
($this->onBufferFull)($this);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
$this->error($e);
|
|
$this->error($e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -919,15 +887,16 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
* Whether send buffer is full.
|
|
* Whether send buffer is full.
|
|
|
*
|
|
*
|
|
|
* @return bool
|
|
* @return bool
|
|
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
*/
|
|
|
- protected function bufferIsFull()
|
|
|
|
|
|
|
+ protected function bufferIsFull(): bool
|
|
|
{
|
|
{
|
|
|
// Buffer has been marked as full but still has data to send then the packet is discarded.
|
|
// Buffer has been marked as full but still has data to send then the packet is discarded.
|
|
|
if ($this->maxSendBufferSize <= \strlen($this->sendBuffer)) {
|
|
if ($this->maxSendBufferSize <= \strlen($this->sendBuffer)) {
|
|
|
if ($this->onError) {
|
|
if ($this->onError) {
|
|
|
try {
|
|
try {
|
|
|
($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
|
|
($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
$this->error($e);
|
|
$this->error($e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -941,7 +910,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @return bool
|
|
* @return bool
|
|
|
*/
|
|
*/
|
|
|
- public function bufferIsEmpty()
|
|
|
|
|
|
|
+ public function bufferIsEmpty(): bool
|
|
|
{
|
|
{
|
|
|
return empty($this->sendBuffer);
|
|
return empty($this->sendBuffer);
|
|
|
}
|
|
}
|
|
@@ -950,7 +919,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
* Destroy connection.
|
|
* Destroy connection.
|
|
|
*
|
|
*
|
|
|
* @return void
|
|
* @return void
|
|
|
- * @throws \Throwable
|
|
|
|
|
|
|
+ * @throws Throwable
|
|
|
*/
|
|
*/
|
|
|
public function destroy()
|
|
public function destroy()
|
|
|
{
|
|
{
|
|
@@ -965,7 +934,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
// Close socket.
|
|
// Close socket.
|
|
|
try {
|
|
try {
|
|
|
@\fclose($this->socket);
|
|
@\fclose($this->socket);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
$this->status = self::STATUS_CLOSED;
|
|
$this->status = self::STATUS_CLOSED;
|
|
@@ -973,7 +942,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
if ($this->onClose) {
|
|
if ($this->onClose) {
|
|
|
try {
|
|
try {
|
|
|
($this->onClose)($this);
|
|
($this->onClose)($this);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
$this->error($e);
|
|
$this->error($e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -981,7 +950,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
if ($this->protocol && \method_exists($this->protocol, 'onClose')) {
|
|
if ($this->protocol && \method_exists($this->protocol, 'onClose')) {
|
|
|
try {
|
|
try {
|
|
|
([$this->protocol, 'onClose'])($this);
|
|
([$this->protocol, 'onClose'])($this);
|
|
|
- } catch (\Throwable $e) {
|
|
|
|
|
|
|
+ } catch (Throwable $e) {
|
|
|
$this->error($e);
|
|
$this->error($e);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -1004,18 +973,17 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
*
|
|
|
* @param mixed $value
|
|
* @param mixed $value
|
|
|
*/
|
|
*/
|
|
|
- public static function enableCache($value)
|
|
|
|
|
|
|
+ public static function enableCache(bool $value = true)
|
|
|
{
|
|
{
|
|
|
- static::$enableCache = (bool)$value;
|
|
|
|
|
|
|
+ static::$enableCache = $value;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Get the json_encode information.
|
|
* Get the json_encode information.
|
|
|
*
|
|
*
|
|
|
- * @return mixed
|
|
|
|
|
|
|
+ * @return array
|
|
|
*/
|
|
*/
|
|
|
- #[\ReturnTypeWillChange]
|
|
|
|
|
- public function jsonSerialize()
|
|
|
|
|
|
|
+ public function jsonSerialize(): array
|
|
|
{
|
|
{
|
|
|
return [
|
|
return [
|
|
|
'id' => $this->id,
|
|
'id' => $this->id,
|