|
|
@@ -148,7 +148,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
- protected $_id = 0;
|
|
|
+ protected $realId = 0;
|
|
|
|
|
|
/**
|
|
|
* Sets the maximum send buffer size for the current connection.
|
|
|
@@ -191,70 +191,70 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
- protected static $_idRecorder = 1;
|
|
|
+ protected static $idRecorder = 1;
|
|
|
|
|
|
/**
|
|
|
* Cache.
|
|
|
*
|
|
|
* @var bool.
|
|
|
*/
|
|
|
- protected static $_enableCache = true;
|
|
|
+ protected static $enableCache = true;
|
|
|
|
|
|
/**
|
|
|
* Socket
|
|
|
*
|
|
|
* @var resource
|
|
|
*/
|
|
|
- protected $_socket = null;
|
|
|
+ protected $socket = null;
|
|
|
|
|
|
/**
|
|
|
* Send buffer.
|
|
|
*
|
|
|
* @var string
|
|
|
*/
|
|
|
- protected $_sendBuffer = '';
|
|
|
+ protected $sendBuffer = '';
|
|
|
|
|
|
/**
|
|
|
* Receive buffer.
|
|
|
*
|
|
|
* @var string
|
|
|
*/
|
|
|
- protected $_recvBuffer = '';
|
|
|
+ protected $recvBuffer = '';
|
|
|
|
|
|
/**
|
|
|
* Current package length.
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
- protected $_currentPackageLength = 0;
|
|
|
+ protected $currentPackageLength = 0;
|
|
|
|
|
|
/**
|
|
|
* Connection status.
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
- protected $_status = self::STATUS_ESTABLISHED;
|
|
|
+ protected $status = self::STATUS_ESTABLISHED;
|
|
|
|
|
|
/**
|
|
|
* Remote address.
|
|
|
*
|
|
|
* @var string
|
|
|
*/
|
|
|
- protected $_remoteAddress = '';
|
|
|
+ protected $remoteAddress = '';
|
|
|
|
|
|
/**
|
|
|
* Is paused.
|
|
|
*
|
|
|
* @var bool
|
|
|
*/
|
|
|
- protected $_isPaused = false;
|
|
|
+ protected $isPaused = false;
|
|
|
|
|
|
/**
|
|
|
* SSL handshake completed or not.
|
|
|
*
|
|
|
* @var bool
|
|
|
*/
|
|
|
- protected $_sslHandshakeCompleted = false;
|
|
|
+ protected $sslHandshakeCompleted = false;
|
|
|
|
|
|
/**
|
|
|
* All connection instances.
|
|
|
@@ -268,7 +268,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*
|
|
|
* @var array
|
|
|
*/
|
|
|
- public static $_statusToString = [
|
|
|
+ public static $statusToString = [
|
|
|
self::STATUS_INITIAL => 'INITIAL',
|
|
|
self::STATUS_CONNECTING => 'CONNECTING',
|
|
|
self::STATUS_ESTABLISHED => 'ESTABLISHED',
|
|
|
@@ -285,20 +285,20 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
public function __construct($socket, $remote_address = '')
|
|
|
{
|
|
|
++self::$statistics['connection_count'];
|
|
|
- $this->id = $this->realId = self::$_idRecorder++;
|
|
|
- if (self::$_idRecorder === \PHP_INT_MAX) {
|
|
|
- self::$_idRecorder = 0;
|
|
|
+ $this->id = $this->realId = self::$idRecorder++;
|
|
|
+ if (self::$idRecorder === \PHP_INT_MAX) {
|
|
|
+ self::$idRecorder = 0;
|
|
|
}
|
|
|
- $this->_socket = $socket;
|
|
|
- \stream_set_blocking($this->_socket, 0);
|
|
|
+ $this->socket = $socket;
|
|
|
+ \stream_set_blocking($this->socket, 0);
|
|
|
// Compatible with hhvm
|
|
|
if (\function_exists('stream_set_read_buffer')) {
|
|
|
- \stream_set_read_buffer($this->_socket, 0);
|
|
|
+ \stream_set_read_buffer($this->socket, 0);
|
|
|
}
|
|
|
- Worker::$globalEvent->onReadable($this->_socket, [$this, 'baseRead']);
|
|
|
+ Worker::$globalEvent->onReadable($this->socket, [$this, 'baseRead']);
|
|
|
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
|
|
|
$this->maxPackageSize = self::$defaultMaxPackageSize;
|
|
|
- $this->_remoteAddress = $remote_address;
|
|
|
+ $this->remoteAddress = $remote_address;
|
|
|
static::$connections[$this->id] = $this;
|
|
|
$this->context = new \stdClass;
|
|
|
}
|
|
|
@@ -313,9 +313,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
public function getStatus($raw_output = true)
|
|
|
{
|
|
|
if ($raw_output) {
|
|
|
- return $this->_status;
|
|
|
+ return $this->status;
|
|
|
}
|
|
|
- return self::$_statusToString[$this->_status];
|
|
|
+ return self::$statusToString[$this->status];
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -327,7 +327,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function send($send_buffer, $raw = false)
|
|
|
{
|
|
|
- if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
|
|
|
+ if ($this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
@@ -339,29 +339,29 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if ($this->_status !== self::STATUS_ESTABLISHED ||
|
|
|
- ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true)
|
|
|
+ if ($this->status !== self::STATUS_ESTABLISHED ||
|
|
|
+ ($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true)
|
|
|
) {
|
|
|
- if ($this->_sendBuffer && $this->bufferIsFull()) {
|
|
|
+ if ($this->sendBuffer && $this->bufferIsFull()) {
|
|
|
++self::$statistics['send_fail'];
|
|
|
return false;
|
|
|
}
|
|
|
- $this->_sendBuffer .= $send_buffer;
|
|
|
+ $this->sendBuffer .= $send_buffer;
|
|
|
$this->checkBufferWillFull();
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// Attempt to send data directly.
|
|
|
- if ($this->_sendBuffer === '') {
|
|
|
+ if ($this->sendBuffer === '') {
|
|
|
if ($this->transport === 'ssl') {
|
|
|
- Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseWrite']);
|
|
|
- $this->_sendBuffer = $send_buffer;
|
|
|
+ Worker::$globalEvent->onWritable($this->socket, [$this, 'baseWrite']);
|
|
|
+ $this->sendBuffer = $send_buffer;
|
|
|
$this->checkBufferWillFull();
|
|
|
return;
|
|
|
}
|
|
|
$len = 0;
|
|
|
try {
|
|
|
- $len = @\fwrite($this->_socket, $send_buffer);
|
|
|
+ $len = @\fwrite($this->socket, $send_buffer);
|
|
|
} catch (\Throwable $e) {
|
|
|
Worker::log($e);
|
|
|
}
|
|
|
@@ -372,11 +372,11 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
}
|
|
|
// Send only part of the data.
|
|
|
if ($len > 0) {
|
|
|
- $this->_sendBuffer = \substr($send_buffer, $len);
|
|
|
+ $this->sendBuffer = \substr($send_buffer, $len);
|
|
|
$this->bytesWritten += $len;
|
|
|
} else {
|
|
|
// Connection closed?
|
|
|
- if (!\is_resource($this->_socket) || \feof($this->_socket)) {
|
|
|
+ if (!\is_resource($this->socket) || \feof($this->socket)) {
|
|
|
++self::$statistics['send_fail'];
|
|
|
if ($this->onError) {
|
|
|
try {
|
|
|
@@ -388,9 +388,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
$this->destroy();
|
|
|
return false;
|
|
|
}
|
|
|
- $this->_sendBuffer = $send_buffer;
|
|
|
+ $this->sendBuffer = $send_buffer;
|
|
|
}
|
|
|
- Worker::$globalEvent->onWritable($this->_socket, [$this, 'baseWrite']);
|
|
|
+ Worker::$globalEvent->onWritable($this->socket, [$this, 'baseWrite']);
|
|
|
// Check if the send buffer will be full.
|
|
|
$this->checkBufferWillFull();
|
|
|
return;
|
|
|
@@ -401,7 +401,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- $this->_sendBuffer .= $send_buffer;
|
|
|
+ $this->sendBuffer .= $send_buffer;
|
|
|
// Check if the send buffer is full.
|
|
|
$this->checkBufferWillFull();
|
|
|
}
|
|
|
@@ -413,9 +413,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function getRemoteIp()
|
|
|
{
|
|
|
- $pos = \strrpos($this->_remoteAddress, ':');
|
|
|
+ $pos = \strrpos($this->remoteAddress, ':');
|
|
|
if ($pos) {
|
|
|
- return (string)\substr($this->_remoteAddress, 0, $pos);
|
|
|
+ return (string)\substr($this->remoteAddress, 0, $pos);
|
|
|
}
|
|
|
return '';
|
|
|
}
|
|
|
@@ -427,8 +427,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function getRemotePort()
|
|
|
{
|
|
|
- if ($this->_remoteAddress) {
|
|
|
- return (int)\substr(\strrchr($this->_remoteAddress, ':'), 1);
|
|
|
+ if ($this->remoteAddress) {
|
|
|
+ return (int)\substr(\strrchr($this->remoteAddress, ':'), 1);
|
|
|
}
|
|
|
return 0;
|
|
|
}
|
|
|
@@ -440,7 +440,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function getRemoteAddress()
|
|
|
{
|
|
|
- return $this->_remoteAddress;
|
|
|
+ return $this->remoteAddress;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -480,10 +480,10 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function getLocalAddress()
|
|
|
{
|
|
|
- if (!\is_resource($this->_socket)) {
|
|
|
+ if (!\is_resource($this->socket)) {
|
|
|
return '';
|
|
|
}
|
|
|
- return (string)@\stream_socket_get_name($this->_socket, false);
|
|
|
+ return (string)@\stream_socket_get_name($this->socket, false);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -493,7 +493,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function getSendBufferQueueSize()
|
|
|
{
|
|
|
- return \strlen($this->_sendBuffer);
|
|
|
+ return \strlen($this->sendBuffer);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -503,7 +503,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function getRecvBufferQueueSize()
|
|
|
{
|
|
|
- return \strlen($this->_recvBuffer);
|
|
|
+ return \strlen($this->recvBuffer);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -539,8 +539,8 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function pauseRecv()
|
|
|
{
|
|
|
- Worker::$globalEvent->offReadable($this->_socket);
|
|
|
- $this->_isPaused = true;
|
|
|
+ Worker::$globalEvent->offReadable($this->socket);
|
|
|
+ $this->isPaused = true;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -550,10 +550,10 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function resumeRecv()
|
|
|
{
|
|
|
- if ($this->_isPaused === true) {
|
|
|
- Worker::$globalEvent->onReadable($this->_socket, [$this, 'baseRead']);
|
|
|
- $this->_isPaused = false;
|
|
|
- $this->baseRead($this->_socket, false);
|
|
|
+ if ($this->isPaused === true) {
|
|
|
+ Worker::$globalEvent->onReadable($this->socket, [$this, 'baseRead']);
|
|
|
+ $this->isPaused = false;
|
|
|
+ $this->baseRead($this->socket, false);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -569,10 +569,10 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
{
|
|
|
static $requests = [];
|
|
|
// SSL handshake.
|
|
|
- if ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true) {
|
|
|
+ if ($this->transport === 'ssl' && $this->sslHandshakeCompleted !== true) {
|
|
|
if ($this->doSslHandshake($socket)) {
|
|
|
- $this->_sslHandshakeCompleted = true;
|
|
|
- if ($this->_sendBuffer) {
|
|
|
+ $this->sslHandshakeCompleted = true;
|
|
|
+ if ($this->sendBuffer) {
|
|
|
Worker::$globalEvent->onWritable($socket, [$this, 'baseWrite']);
|
|
|
}
|
|
|
} else {
|
|
|
@@ -594,15 +594,15 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
}
|
|
|
} else {
|
|
|
$this->bytesRead += \strlen($buffer);
|
|
|
- if ($this->_recvBuffer === '') {
|
|
|
- if (static::$_enableCache && !isset($buffer[512]) && isset($requests[$buffer])) {
|
|
|
+ if ($this->recvBuffer === '') {
|
|
|
+ if (static::$enableCache && !isset($buffer[512]) && isset($requests[$buffer])) {
|
|
|
++self::$statistics['total_request'];
|
|
|
$request = $requests[$buffer];
|
|
|
if ($request instanceof Request) {
|
|
|
$request = clone $request;
|
|
|
$requests[$buffer] = $request;
|
|
|
$request->connection = $this;
|
|
|
- $this->__request = $request;
|
|
|
+ $this->request = $request;
|
|
|
$request->properties = [];
|
|
|
}
|
|
|
try {
|
|
|
@@ -612,38 +612,38 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- $this->_recvBuffer = $buffer;
|
|
|
+ $this->recvBuffer = $buffer;
|
|
|
} else {
|
|
|
- $this->_recvBuffer .= $buffer;
|
|
|
+ $this->recvBuffer .= $buffer;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// If the application layer protocol has been set up.
|
|
|
if ($this->protocol !== null) {
|
|
|
- while ($this->_recvBuffer !== '' && !$this->_isPaused) {
|
|
|
+ while ($this->recvBuffer !== '' && !$this->isPaused) {
|
|
|
// The current packet length is known.
|
|
|
- if ($this->_currentPackageLength) {
|
|
|
+ if ($this->currentPackageLength) {
|
|
|
// Data is not enough for a package.
|
|
|
- if ($this->_currentPackageLength > \strlen($this->_recvBuffer)) {
|
|
|
+ if ($this->currentPackageLength > \strlen($this->recvBuffer)) {
|
|
|
break;
|
|
|
}
|
|
|
} else {
|
|
|
// Get current package length.
|
|
|
try {
|
|
|
- $this->_currentPackageLength = $this->protocol::input($this->_recvBuffer, $this);
|
|
|
+ $this->currentPackageLength = $this->protocol::input($this->recvBuffer, $this);
|
|
|
} catch (\Throwable $e) {
|
|
|
}
|
|
|
// The packet length is unknown.
|
|
|
- if ($this->_currentPackageLength === 0) {
|
|
|
+ if ($this->currentPackageLength === 0) {
|
|
|
break;
|
|
|
- } elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= $this->maxPackageSize) {
|
|
|
+ } elseif ($this->currentPackageLength > 0 && $this->currentPackageLength <= $this->maxPackageSize) {
|
|
|
// Data is not enough for a package.
|
|
|
- if ($this->_currentPackageLength > \strlen($this->_recvBuffer)) {
|
|
|
+ if ($this->currentPackageLength > \strlen($this->recvBuffer)) {
|
|
|
break;
|
|
|
}
|
|
|
} // Wrong package.
|
|
|
else {
|
|
|
- Worker::safeEcho('Error package. package_length=' . \var_export($this->_currentPackageLength, true));
|
|
|
+ Worker::safeEcho('Error package. package_length=' . \var_export($this->currentPackageLength, true));
|
|
|
$this->destroy();
|
|
|
return;
|
|
|
}
|
|
|
@@ -652,21 +652,21 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
// The data is enough for a packet.
|
|
|
++self::$statistics['total_request'];
|
|
|
// The current packet length is equal to the length of the buffer.
|
|
|
- if ($one = \strlen($this->_recvBuffer) === $this->_currentPackageLength) {
|
|
|
- $one_request_buffer = $this->_recvBuffer;
|
|
|
- $this->_recvBuffer = '';
|
|
|
+ if ($one = \strlen($this->recvBuffer) === $this->currentPackageLength) {
|
|
|
+ $one_request_buffer = $this->recvBuffer;
|
|
|
+ $this->recvBuffer = '';
|
|
|
} else {
|
|
|
// Get a full package from the buffer.
|
|
|
- $one_request_buffer = \substr($this->_recvBuffer, 0, $this->_currentPackageLength);
|
|
|
+ $one_request_buffer = \substr($this->recvBuffer, 0, $this->currentPackageLength);
|
|
|
// Remove the current package from the receive buffer.
|
|
|
- $this->_recvBuffer = \substr($this->_recvBuffer, $this->_currentPackageLength);
|
|
|
+ $this->recvBuffer = \substr($this->recvBuffer, $this->currentPackageLength);
|
|
|
}
|
|
|
// Reset the current packet length to 0.
|
|
|
- $this->_currentPackageLength = 0;
|
|
|
+ $this->currentPackageLength = 0;
|
|
|
try {
|
|
|
// Decode request buffer before Emitting onMessage callback.
|
|
|
$request = $this->protocol::decode($one_request_buffer, $this);
|
|
|
- if (static::$_enableCache && (!\is_object($request) || $request instanceof Request) && $one && !isset($one_request_buffer[512])) {
|
|
|
+ if (static::$enableCache && (!\is_object($request) || $request instanceof Request) && $one && !isset($one_request_buffer[512])) {
|
|
|
$requests[$one_request_buffer] = $request;
|
|
|
if (\count($requests) > 512) {
|
|
|
unset($requests[\key($requests)]);
|
|
|
@@ -680,19 +680,19 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if ($this->_recvBuffer === '' || $this->_isPaused) {
|
|
|
+ if ($this->recvBuffer === '' || $this->isPaused) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
// Applications protocol is not set.
|
|
|
++self::$statistics['total_request'];
|
|
|
try {
|
|
|
- ($this->onMessage)($this, $this->_recvBuffer);
|
|
|
+ ($this->onMessage)($this, $this->recvBuffer);
|
|
|
} catch (\Throwable $e) {
|
|
|
Worker::stopAll(250, $e);
|
|
|
}
|
|
|
// Clean receive buffer.
|
|
|
- $this->_recvBuffer = '';
|
|
|
+ $this->recvBuffer = '';
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -705,15 +705,15 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
\set_error_handler(function () {
|
|
|
});
|
|
|
if ($this->transport === 'ssl') {
|
|
|
- $len = @\fwrite($this->_socket, $this->_sendBuffer, 8192);
|
|
|
+ $len = @\fwrite($this->socket, $this->sendBuffer, 8192);
|
|
|
} else {
|
|
|
- $len = @\fwrite($this->_socket, $this->_sendBuffer);
|
|
|
+ $len = @\fwrite($this->socket, $this->sendBuffer);
|
|
|
}
|
|
|
\restore_error_handler();
|
|
|
- if ($len === \strlen($this->_sendBuffer)) {
|
|
|
+ if ($len === \strlen($this->sendBuffer)) {
|
|
|
$this->bytesWritten += $len;
|
|
|
- Worker::$globalEvent->offWritable($this->_socket);
|
|
|
- $this->_sendBuffer = '';
|
|
|
+ Worker::$globalEvent->offWritable($this->socket);
|
|
|
+ $this->sendBuffer = '';
|
|
|
// Try to emit onBufferDrain callback when the send buffer becomes empty.
|
|
|
if ($this->onBufferDrain) {
|
|
|
try {
|
|
|
@@ -722,7 +722,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
Worker::stopAll(250, $e);
|
|
|
}
|
|
|
}
|
|
|
- if ($this->_status === self::STATUS_CLOSING) {
|
|
|
+ if ($this->status === self::STATUS_CLOSING) {
|
|
|
if ($this->context->streamSending) {
|
|
|
return true;
|
|
|
}
|
|
|
@@ -732,7 +732,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
}
|
|
|
if ($len > 0) {
|
|
|
$this->bytesWritten += $len;
|
|
|
- $this->_sendBuffer = \substr($this->_sendBuffer, $len);
|
|
|
+ $this->sendBuffer = \substr($this->sendBuffer, $len);
|
|
|
} else {
|
|
|
++self::$statistics['send_fail'];
|
|
|
$this->destroy();
|
|
|
@@ -826,7 +826,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function consumeRecvBuffer($length)
|
|
|
{
|
|
|
- $this->_recvBuffer = \substr($this->_recvBuffer, $length);
|
|
|
+ $this->recvBuffer = \substr($this->recvBuffer, $length);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -838,12 +838,12 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function close($data = null, $raw = false)
|
|
|
{
|
|
|
- if ($this->_status === self::STATUS_CONNECTING) {
|
|
|
+ if ($this->status === self::STATUS_CONNECTING) {
|
|
|
$this->destroy();
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
|
|
|
+ if ($this->status === self::STATUS_CLOSING || $this->status === self::STATUS_CLOSED) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -851,9 +851,9 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
$this->send($data, $raw);
|
|
|
}
|
|
|
|
|
|
- $this->_status = self::STATUS_CLOSING;
|
|
|
+ $this->status = self::STATUS_CLOSING;
|
|
|
|
|
|
- if ($this->_sendBuffer === '') {
|
|
|
+ if ($this->sendBuffer === '') {
|
|
|
$this->destroy();
|
|
|
} else {
|
|
|
$this->pauseRecv();
|
|
|
@@ -867,7 +867,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function getSocket()
|
|
|
{
|
|
|
- return $this->_socket;
|
|
|
+ return $this->socket;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -877,7 +877,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
protected function checkBufferWillFull()
|
|
|
{
|
|
|
- if ($this->maxSendBufferSize <= \strlen($this->_sendBuffer)) {
|
|
|
+ if ($this->maxSendBufferSize <= \strlen($this->sendBuffer)) {
|
|
|
if ($this->onBufferFull) {
|
|
|
try {
|
|
|
($this->onBufferFull)($this);
|
|
|
@@ -896,7 +896,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
protected function bufferIsFull()
|
|
|
{
|
|
|
// Buffer has been marked as full but still has data to send then the packet is discarded.
|
|
|
- if ($this->maxSendBufferSize <= \strlen($this->_sendBuffer)) {
|
|
|
+ if ($this->maxSendBufferSize <= \strlen($this->sendBuffer)) {
|
|
|
if ($this->onError) {
|
|
|
try {
|
|
|
($this->onError)($this, static::SEND_FAIL, 'send buffer full and drop package');
|
|
|
@@ -916,7 +916,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public function bufferIsEmpty()
|
|
|
{
|
|
|
- return empty($this->_sendBuffer);
|
|
|
+ return empty($this->sendBuffer);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -927,20 +927,20 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
public function destroy()
|
|
|
{
|
|
|
// Avoid repeated calls.
|
|
|
- if ($this->_status === self::STATUS_CLOSED) {
|
|
|
+ if ($this->status === self::STATUS_CLOSED) {
|
|
|
return;
|
|
|
}
|
|
|
// Remove event listener.
|
|
|
- Worker::$globalEvent->offReadable($this->_socket);
|
|
|
- Worker::$globalEvent->offWritable($this->_socket);
|
|
|
+ Worker::$globalEvent->offReadable($this->socket);
|
|
|
+ Worker::$globalEvent->offWritable($this->socket);
|
|
|
|
|
|
// Close socket.
|
|
|
try {
|
|
|
- @\fclose($this->_socket);
|
|
|
+ @\fclose($this->socket);
|
|
|
} catch (\Throwable $e) {
|
|
|
}
|
|
|
|
|
|
- $this->_status = self::STATUS_CLOSED;
|
|
|
+ $this->status = self::STATUS_CLOSED;
|
|
|
// Try to emit onClose callback.
|
|
|
if ($this->onClose) {
|
|
|
try {
|
|
|
@@ -957,10 +957,10 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
Worker::stopAll(250, $e);
|
|
|
}
|
|
|
}
|
|
|
- $this->_sendBuffer = $this->_recvBuffer = '';
|
|
|
- $this->_currentPackageLength = 0;
|
|
|
- $this->_isPaused = $this->_sslHandshakeCompleted = false;
|
|
|
- if ($this->_status === self::STATUS_CLOSED) {
|
|
|
+ $this->sendBuffer = $this->recvBuffer = '';
|
|
|
+ $this->currentPackageLength = 0;
|
|
|
+ $this->isPaused = $this->sslHandshakeCompleted = false;
|
|
|
+ if ($this->status === self::STATUS_CLOSED) {
|
|
|
// Cleaning up the callback to avoid memory leaks.
|
|
|
$this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
|
|
|
// Remove from worker->connections.
|
|
|
@@ -978,7 +978,7 @@ class TcpConnection extends ConnectionInterface implements \JsonSerializable
|
|
|
*/
|
|
|
public static function enableCache($value)
|
|
|
{
|
|
|
- static::$_enableCache = (bool)$value;
|
|
|
+ static::$enableCache = (bool)$value;
|
|
|
}
|
|
|
|
|
|
/**
|