|
|
@@ -30,6 +30,13 @@ class TcpConnection extends ConnectionInterface
|
|
|
const READ_BUFFER_SIZE = 65535;
|
|
|
|
|
|
/**
|
|
|
+ * Status initial.
|
|
|
+ *
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
+ const STATUS_INITIAL = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
* Status connecting.
|
|
|
*
|
|
|
* @var int
|
|
|
@@ -41,7 +48,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
- const STATUS_ESTABLISH = 2;
|
|
|
+ const STATUS_ESTABLISHED = 2;
|
|
|
|
|
|
/**
|
|
|
* Status closing.
|
|
|
@@ -101,6 +108,13 @@ class TcpConnection extends ConnectionInterface
|
|
|
public $protocol = null;
|
|
|
|
|
|
/**
|
|
|
+ * Transport (tcp/udp/unix/ssl).
|
|
|
+ *
|
|
|
+ * @var string
|
|
|
+ */
|
|
|
+ public $transport = 'tcp';
|
|
|
+
|
|
|
+ /**
|
|
|
* Which worker belong to.
|
|
|
*
|
|
|
* @var Worker
|
|
|
@@ -108,6 +122,20 @@ class TcpConnection extends ConnectionInterface
|
|
|
public $worker = null;
|
|
|
|
|
|
/**
|
|
|
+ * Bytes read.
|
|
|
+ *
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
+ public $bytesRead = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Bytes written.
|
|
|
+ *
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
+ public $bytesWritten = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
* Connection->id.
|
|
|
*
|
|
|
* @var int
|
|
|
@@ -183,7 +211,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
- protected $_status = self::STATUS_ESTABLISH;
|
|
|
+ protected $_status = self::STATUS_ESTABLISHED;
|
|
|
|
|
|
/**
|
|
|
* Remote address.
|
|
|
@@ -200,6 +228,58 @@ class TcpConnection extends ConnectionInterface
|
|
|
protected $_isPaused = false;
|
|
|
|
|
|
/**
|
|
|
+ * SSL handshake completed or not.
|
|
|
+ *
|
|
|
+ * @var bool
|
|
|
+ */
|
|
|
+ protected $_sslHandshakeCompleted = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * All connection instances.
|
|
|
+ *
|
|
|
+ * @var array
|
|
|
+ */
|
|
|
+ public static $connections = array();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Status to string.
|
|
|
+ *
|
|
|
+ * @var array
|
|
|
+ */
|
|
|
+ public static $_statusToString = array(
|
|
|
+ self::STATUS_INITIAL => 'INITIAL',
|
|
|
+ self::STATUS_CONNECTING => 'CONNECTING',
|
|
|
+ self::STATUS_ESTABLISHED => 'ESTABLISHED',
|
|
|
+ self::STATUS_CLOSING => 'CLOSING',
|
|
|
+ self::STATUS_CLOSED => 'CLOSED',
|
|
|
+ );
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Adding support of custom functions within protocols
|
|
|
+ *
|
|
|
+ * @param string $name
|
|
|
+ * @param array $arguments
|
|
|
+ */
|
|
|
+ public function __call($name, $arguments) {
|
|
|
+ // Try to emit custom function within protocol
|
|
|
+ if (method_exists($this->protocol, $name)) {
|
|
|
+ try {
|
|
|
+ return call_user_func(array($this->protocol, $name), $this, $arguments);
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ trigger_error('Call to undefined method '.__CLASS__.'::'.$name.'()', E_USER_ERROR);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Construct.
|
|
|
*
|
|
|
* @param resource $socket
|
|
|
@@ -208,25 +288,52 @@ class TcpConnection extends ConnectionInterface
|
|
|
public function __construct($socket, $remote_address = '')
|
|
|
{
|
|
|
self::$statistics['connection_count']++;
|
|
|
- $this->id = $this->_id = self::$_idRecorder++;
|
|
|
+ $this->id = $this->_id = self::$_idRecorder++;
|
|
|
+ if(self::$_idRecorder === PHP_INT_MAX){
|
|
|
+ self::$_idRecorder = 0;
|
|
|
+ }
|
|
|
$this->_socket = $socket;
|
|
|
stream_set_blocking($this->_socket, 0);
|
|
|
+ // Compatible with hhvm
|
|
|
+ if (function_exists('stream_set_read_buffer')) {
|
|
|
+ stream_set_read_buffer($this->_socket, 0);
|
|
|
+ }
|
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
|
|
|
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
|
|
|
$this->_remoteAddress = $remote_address;
|
|
|
+ static::$connections[$this->id] = $this;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get status.
|
|
|
+ *
|
|
|
+ * @param bool $raw_output
|
|
|
+ *
|
|
|
+ * @return int
|
|
|
+ */
|
|
|
+ public function getStatus($raw_output = true)
|
|
|
+ {
|
|
|
+ if ($raw_output) {
|
|
|
+ return $this->_status;
|
|
|
+ }
|
|
|
+ return self::$_statusToString[$this->_status];
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* Sends data on the connection.
|
|
|
*
|
|
|
* @param string $send_buffer
|
|
|
- * @param bool $raw
|
|
|
+ * @param bool $raw
|
|
|
* @return void|bool|null
|
|
|
*/
|
|
|
public function send($send_buffer, $raw = false)
|
|
|
{
|
|
|
+ if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
// Try to call protocol::encode($send_buffer) before sending.
|
|
|
- if (false === $raw && $this->protocol) {
|
|
|
+ if (false === $raw && $this->protocol !== null) {
|
|
|
$parser = $this->protocol;
|
|
|
$send_buffer = $parser::encode($send_buffer, $this);
|
|
|
if ($send_buffer === '') {
|
|
|
@@ -234,23 +341,33 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if ($this->_status === self::STATUS_CONNECTING) {
|
|
|
+ if ($this->_status !== self::STATUS_ESTABLISHED ||
|
|
|
+ ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true)
|
|
|
+ ) {
|
|
|
+ if ($this->_sendBuffer) {
|
|
|
+ if ($this->bufferIsFull()) {
|
|
|
+ self::$statistics['send_fail']++;
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
$this->_sendBuffer .= $send_buffer;
|
|
|
+ $this->checkBufferWillFull();
|
|
|
return null;
|
|
|
- } elseif ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
|
|
|
- return false;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
// Attempt to send data directly.
|
|
|
if ($this->_sendBuffer === '') {
|
|
|
- $len = @fwrite($this->_socket, $send_buffer);
|
|
|
+ $len = @fwrite($this->_socket, $send_buffer, 8192);
|
|
|
// send successful.
|
|
|
if ($len === strlen($send_buffer)) {
|
|
|
+ $this->bytesWritten += $len;
|
|
|
return true;
|
|
|
}
|
|
|
// Send only part of the data.
|
|
|
if ($len > 0) {
|
|
|
$this->_sendBuffer = substr($send_buffer, $len);
|
|
|
+ $this->bytesWritten += $len;
|
|
|
} else {
|
|
|
// Connection closed?
|
|
|
if (!is_resource($this->_socket) || feof($this->_socket)) {
|
|
|
@@ -259,7 +376,10 @@ class TcpConnection extends ConnectionInterface
|
|
|
try {
|
|
|
call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'client closed');
|
|
|
} catch (\Exception $e) {
|
|
|
- echo $e;
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
exit(250);
|
|
|
}
|
|
|
}
|
|
|
@@ -269,26 +389,18 @@ class TcpConnection extends ConnectionInterface
|
|
|
$this->_sendBuffer = $send_buffer;
|
|
|
}
|
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
|
- // Check if the send buffer is full.
|
|
|
- $this->checkBufferIsFull();
|
|
|
+ // Check if the send buffer will be full.
|
|
|
+ $this->checkBufferWillFull();
|
|
|
return null;
|
|
|
} else {
|
|
|
- // Buffer has been marked as full but still has data to send the packet is discarded.
|
|
|
- if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
|
|
|
+ if ($this->bufferIsFull()) {
|
|
|
self::$statistics['send_fail']++;
|
|
|
- if ($this->onError) {
|
|
|
- try {
|
|
|
- call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
|
|
|
- } catch (\Exception $e) {
|
|
|
- echo $e;
|
|
|
- exit(250);
|
|
|
- }
|
|
|
- }
|
|
|
return false;
|
|
|
}
|
|
|
+
|
|
|
$this->_sendBuffer .= $send_buffer;
|
|
|
// Check if the send buffer is full.
|
|
|
- $this->checkBufferIsFull();
|
|
|
+ $this->checkBufferWillFull();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -320,6 +432,102 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * Get remote address.
|
|
|
+ *
|
|
|
+ * @return string
|
|
|
+ */
|
|
|
+ public function getRemoteAddress()
|
|
|
+ {
|
|
|
+ return $this->_remoteAddress;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get local IP.
|
|
|
+ *
|
|
|
+ * @return string
|
|
|
+ */
|
|
|
+ public function getLocalIp()
|
|
|
+ {
|
|
|
+ $address = $this->getLocalAddress();
|
|
|
+ $pos = strrpos($address, ':');
|
|
|
+ if (!$pos) {
|
|
|
+ return '';
|
|
|
+ }
|
|
|
+ return substr($address, 0, $pos);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get local port.
|
|
|
+ *
|
|
|
+ * @return int
|
|
|
+ */
|
|
|
+ public function getLocalPort()
|
|
|
+ {
|
|
|
+ $address = $this->getLocalAddress();
|
|
|
+ $pos = strrpos($address, ':');
|
|
|
+ if (!$pos) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ return (int)substr(strrchr($address, ':'), 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get local address.
|
|
|
+ *
|
|
|
+ * @return string
|
|
|
+ */
|
|
|
+ public function getLocalAddress()
|
|
|
+ {
|
|
|
+ return (string)@stream_socket_get_name($this->_socket, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get send buffer queue size.
|
|
|
+ *
|
|
|
+ * @return integer
|
|
|
+ */
|
|
|
+ public function getSendBufferQueueSize()
|
|
|
+ {
|
|
|
+ return strlen($this->_sendBuffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get recv buffer queue size.
|
|
|
+ *
|
|
|
+ * @return integer
|
|
|
+ */
|
|
|
+ public function getRecvBufferQueueSize()
|
|
|
+ {
|
|
|
+ return strlen($this->_recvBuffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Is ipv4.
|
|
|
+ *
|
|
|
+ * return bool.
|
|
|
+ */
|
|
|
+ public function isIpV4()
|
|
|
+ {
|
|
|
+ if ($this->transport === 'unix') {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return strpos($this->getRemoteIp(), ':') === false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Is ipv6.
|
|
|
+ *
|
|
|
+ * return bool.
|
|
|
+ */
|
|
|
+ public function isIpV6()
|
|
|
+ {
|
|
|
+ if ($this->transport === 'unix') {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return strpos($this->getRemoteIp(), ':') !== false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
|
|
|
*
|
|
|
* @return void
|
|
|
@@ -348,28 +556,58 @@ class TcpConnection extends ConnectionInterface
|
|
|
* Base read handler.
|
|
|
*
|
|
|
* @param resource $socket
|
|
|
+ * @param bool $check_eof
|
|
|
* @return void
|
|
|
*/
|
|
|
public function baseRead($socket, $check_eof = true)
|
|
|
{
|
|
|
- $read_data = false;
|
|
|
- while (1) {
|
|
|
- $buffer = fread($socket, self::READ_BUFFER_SIZE);
|
|
|
- if ($buffer === '' || $buffer === false) {
|
|
|
- break;
|
|
|
+ // SSL handshake.
|
|
|
+ if ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true) {
|
|
|
+ $ret = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_SSLv2_SERVER |
|
|
|
+ STREAM_CRYPTO_METHOD_SSLv23_SERVER);
|
|
|
+ // Negotiation has failed.
|
|
|
+ if(false === $ret) {
|
|
|
+ if (!feof($socket)) {
|
|
|
+ echo "\nSSL Handshake fail. \nBuffer:".bin2hex(fread($socket, 8182))."\n";
|
|
|
+ }
|
|
|
+ return $this->destroy();
|
|
|
+ } elseif(0 === $ret) {
|
|
|
+ // There isn't enough data and should try again.
|
|
|
+ return;
|
|
|
}
|
|
|
- $read_data = true;
|
|
|
- $this->_recvBuffer .= $buffer;
|
|
|
+ if (isset($this->onSslHandshake)) {
|
|
|
+ try {
|
|
|
+ call_user_func($this->onSslHandshake, $this);
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ $this->_sslHandshakeCompleted = true;
|
|
|
+ if ($this->_sendBuffer) {
|
|
|
+ Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
|
+ }
|
|
|
+ return;
|
|
|
}
|
|
|
|
|
|
+ $buffer = @fread($socket, self::READ_BUFFER_SIZE);
|
|
|
+
|
|
|
// Check connection closed.
|
|
|
- if (!$read_data && $check_eof) {
|
|
|
- $this->destroy();
|
|
|
- return;
|
|
|
+ if ($buffer === '' || $buffer === false) {
|
|
|
+ if ($check_eof && (feof($socket) || !is_resource($socket) || $buffer === false)) {
|
|
|
+ $this->destroy();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ $this->bytesRead += strlen($buffer);
|
|
|
+ $this->_recvBuffer .= $buffer;
|
|
|
}
|
|
|
|
|
|
// If the application layer protocol has been set up.
|
|
|
- if ($this->protocol) {
|
|
|
+ if ($this->protocol !== null) {
|
|
|
$parser = $this->protocol;
|
|
|
while ($this->_recvBuffer !== '' && !$this->_isPaused) {
|
|
|
// The current packet length is known.
|
|
|
@@ -415,10 +653,13 @@ class TcpConnection extends ConnectionInterface
|
|
|
continue;
|
|
|
}
|
|
|
try {
|
|
|
- // Decode request buffer before Emiting onMessage callback.
|
|
|
+ // Decode request buffer before Emitting onMessage callback.
|
|
|
call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
|
|
|
} catch (\Exception $e) {
|
|
|
- echo $e;
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
exit(250);
|
|
|
}
|
|
|
}
|
|
|
@@ -438,7 +679,10 @@ class TcpConnection extends ConnectionInterface
|
|
|
try {
|
|
|
call_user_func($this->onMessage, $this, $this->_recvBuffer);
|
|
|
} catch (\Exception $e) {
|
|
|
- echo $e;
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
exit(250);
|
|
|
}
|
|
|
// Clean receive buffer.
|
|
|
@@ -452,8 +696,9 @@ class TcpConnection extends ConnectionInterface
|
|
|
*/
|
|
|
public function baseWrite()
|
|
|
{
|
|
|
- $len = @fwrite($this->_socket, $this->_sendBuffer);
|
|
|
+ $len = @fwrite($this->_socket, $this->_sendBuffer, 8192);
|
|
|
if ($len === strlen($this->_sendBuffer)) {
|
|
|
+ $this->bytesWritten += $len;
|
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
|
$this->_sendBuffer = '';
|
|
|
// Try to emit onBufferDrain callback when the send buffer becomes empty.
|
|
|
@@ -461,7 +706,10 @@ class TcpConnection extends ConnectionInterface
|
|
|
try {
|
|
|
call_user_func($this->onBufferDrain, $this);
|
|
|
} catch (\Exception $e) {
|
|
|
- echo $e;
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
exit(250);
|
|
|
}
|
|
|
}
|
|
|
@@ -471,6 +719,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
return true;
|
|
|
}
|
|
|
if ($len > 0) {
|
|
|
+ $this->bytesWritten += $len;
|
|
|
$this->_sendBuffer = substr($this->_sendBuffer, $len);
|
|
|
} else {
|
|
|
self::$statistics['send_fail']++;
|
|
|
@@ -516,15 +765,16 @@ class TcpConnection extends ConnectionInterface
|
|
|
* Close connection.
|
|
|
*
|
|
|
* @param mixed $data
|
|
|
+ * @param bool $raw
|
|
|
* @return void
|
|
|
*/
|
|
|
- public function close($data = null)
|
|
|
+ public function close($data = null, $raw = false)
|
|
|
{
|
|
|
if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
|
|
|
return;
|
|
|
} else {
|
|
|
if ($data !== null) {
|
|
|
- $this->send($data);
|
|
|
+ $this->send($data, $raw);
|
|
|
}
|
|
|
$this->_status = self::STATUS_CLOSING;
|
|
|
}
|
|
|
@@ -544,18 +794,21 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Check whether the send buffer is full.
|
|
|
+ * Check whether the send buffer will be full.
|
|
|
*
|
|
|
* @return void
|
|
|
*/
|
|
|
- protected function checkBufferIsFull()
|
|
|
+ protected function checkBufferWillFull()
|
|
|
{
|
|
|
if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
|
|
|
if ($this->onBufferFull) {
|
|
|
try {
|
|
|
call_user_func($this->onBufferFull, $this);
|
|
|
} catch (\Exception $e) {
|
|
|
- echo $e;
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
exit(250);
|
|
|
}
|
|
|
}
|
|
|
@@ -563,6 +816,31 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * Whether send buffer is full.
|
|
|
+ *
|
|
|
+ * @return bool
|
|
|
+ */
|
|
|
+ protected function bufferIsFull()
|
|
|
+ {
|
|
|
+ // Buffer has been marked as full but still has data to send then the packet is discarded.
|
|
|
+ if ($this->maxSendBufferSize <= strlen($this->_sendBuffer)) {
|
|
|
+ if ($this->onError) {
|
|
|
+ try {
|
|
|
+ call_user_func($this->onError, $this, WORKERMAN_SEND_FAIL, 'send buffer full and drop package');
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Destroy connection.
|
|
|
*
|
|
|
* @return void
|
|
|
@@ -582,18 +860,36 @@ class TcpConnection extends ConnectionInterface
|
|
|
if ($this->worker) {
|
|
|
unset($this->worker->connections[$this->_id]);
|
|
|
}
|
|
|
+ unset(static::$connections[$this->_id]);
|
|
|
$this->_status = self::STATUS_CLOSED;
|
|
|
// Try to emit onClose callback.
|
|
|
if ($this->onClose) {
|
|
|
try {
|
|
|
call_user_func($this->onClose, $this);
|
|
|
} catch (\Exception $e) {
|
|
|
- echo $e;
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Try to emit protocol::onClose
|
|
|
+ if (method_exists($this->protocol, 'onClose')) {
|
|
|
+ try {
|
|
|
+ call_user_func(array($this->protocol, 'onClose'), $this);
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
exit(250);
|
|
|
}
|
|
|
}
|
|
|
- // Cleaning up the callback to avoid memory leaks.
|
|
|
- $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
|
|
|
+ if ($this->_status === self::STATUS_CLOSED) {
|
|
|
+ // Cleaning up the callback to avoid memory leaks.
|
|
|
+ $this->onMessage = $this->onClose = $this->onError = $this->onBufferFull = $this->onBufferDrain = null;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -603,6 +899,21 @@ class TcpConnection extends ConnectionInterface
|
|
|
*/
|
|
|
public function __destruct()
|
|
|
{
|
|
|
+ static $mod;
|
|
|
self::$statistics['connection_count']--;
|
|
|
+ if (Worker::getGracefulStop()) {
|
|
|
+ if (!isset($mod)) {
|
|
|
+ $mod = ceil((self::$statistics['connection_count'] + 1) / 3);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (0 === self::$statistics['connection_count'] % $mod) {
|
|
|
+ Worker::log('worker[' . posix_getpid() . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
|
|
|
+ }
|
|
|
+
|
|
|
+ if(0 === self::$statistics['connection_count']) {
|
|
|
+ Worker::$globalEvent->destroy();
|
|
|
+ exit(0);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|