|
|
@@ -48,7 +48,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
- const STATUS_ESTABLISH = 2;
|
|
|
+ const STATUS_ESTABLISHED = 2;
|
|
|
|
|
|
/**
|
|
|
* Status closing.
|
|
|
@@ -122,6 +122,20 @@ class TcpConnection extends ConnectionInterface
|
|
|
public $worker = null;
|
|
|
|
|
|
/**
|
|
|
+ * Bytes read.
|
|
|
+ *
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
+ public $bytesRead = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Bytes written.
|
|
|
+ *
|
|
|
+ * @var int
|
|
|
+ */
|
|
|
+ public $bytesWritten = 0;
|
|
|
+
|
|
|
+ /**
|
|
|
* Connection->id.
|
|
|
*
|
|
|
* @var int
|
|
|
@@ -197,7 +211,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
*
|
|
|
* @var int
|
|
|
*/
|
|
|
- protected $_status = self::STATUS_ESTABLISH;
|
|
|
+ protected $_status = self::STATUS_ESTABLISHED;
|
|
|
|
|
|
/**
|
|
|
* Remote address.
|
|
|
@@ -214,13 +228,58 @@ class TcpConnection extends ConnectionInterface
|
|
|
protected $_isPaused = false;
|
|
|
|
|
|
/**
|
|
|
- * SSL handshake completed or not
|
|
|
+ * SSL handshake completed or not.
|
|
|
*
|
|
|
* @var bool
|
|
|
*/
|
|
|
protected $_sslHandshakeCompleted = false;
|
|
|
|
|
|
/**
|
|
|
+ * All connection instances.
|
|
|
+ *
|
|
|
+ * @var array
|
|
|
+ */
|
|
|
+ public static $connections = array();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Status to string.
|
|
|
+ *
|
|
|
+ * @var array
|
|
|
+ */
|
|
|
+ public static $_statusToString = array(
|
|
|
+ self::STATUS_INITIAL => 'INITIAL',
|
|
|
+ self::STATUS_CONNECTING => 'CONNECTING',
|
|
|
+ self::STATUS_ESTABLISHED => 'ESTABLISHED',
|
|
|
+ self::STATUS_CLOSING => 'CLOSING',
|
|
|
+ self::STATUS_CLOSED => 'CLOSED',
|
|
|
+ );
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Adding support of custom functions within protocols
|
|
|
+ *
|
|
|
+ * @param string $name
|
|
|
+ * @param array $arguments
|
|
|
+ */
|
|
|
+ public function __call($name, $arguments) {
|
|
|
+ // Try to emit custom function within protocol
|
|
|
+ if (method_exists($this->protocol, $name)) {
|
|
|
+ try {
|
|
|
+ return call_user_func(array($this->protocol, $name), $this, $arguments);
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ Worker::log($e);
|
|
|
+ exit(250);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ trigger_error('Call to undefined method '.__CLASS__.'::'.$name.'()', E_USER_ERROR);
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Construct.
|
|
|
*
|
|
|
* @param resource $socket
|
|
|
@@ -229,7 +288,10 @@ class TcpConnection extends ConnectionInterface
|
|
|
public function __construct($socket, $remote_address = '')
|
|
|
{
|
|
|
self::$statistics['connection_count']++;
|
|
|
- $this->id = $this->_id = self::$_idRecorder++;
|
|
|
+ $this->id = $this->_id = self::$_idRecorder++;
|
|
|
+ if(self::$_idRecorder === PHP_INT_MAX){
|
|
|
+ self::$_idRecorder = 0;
|
|
|
+ }
|
|
|
$this->_socket = $socket;
|
|
|
stream_set_blocking($this->_socket, 0);
|
|
|
// Compatible with hhvm
|
|
|
@@ -239,6 +301,22 @@ class TcpConnection extends ConnectionInterface
|
|
|
Worker::$globalEvent->add($this->_socket, EventInterface::EV_READ, array($this, 'baseRead'));
|
|
|
$this->maxSendBufferSize = self::$defaultMaxSendBufferSize;
|
|
|
$this->_remoteAddress = $remote_address;
|
|
|
+ static::$connections[$this->id] = $this;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get status.
|
|
|
+ *
|
|
|
+ * @param bool $raw_output
|
|
|
+ *
|
|
|
+ * @return int
|
|
|
+ */
|
|
|
+ public function getStatus($raw_output = true)
|
|
|
+ {
|
|
|
+ if ($raw_output) {
|
|
|
+ return $this->_status;
|
|
|
+ }
|
|
|
+ return self::$_statusToString[$this->_status];
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -255,7 +333,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
|
|
|
|
// Try to call protocol::encode($send_buffer) before sending.
|
|
|
- if (false === $raw && $this->protocol) {
|
|
|
+ if (false === $raw && $this->protocol !== null) {
|
|
|
$parser = $this->protocol;
|
|
|
$send_buffer = $parser::encode($send_buffer, $this);
|
|
|
if ($send_buffer === '') {
|
|
|
@@ -263,7 +341,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if ($this->_status !== self::STATUS_ESTABLISH ||
|
|
|
+ if ($this->_status !== self::STATUS_ESTABLISHED ||
|
|
|
($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true)
|
|
|
) {
|
|
|
if ($this->_sendBuffer) {
|
|
|
@@ -283,11 +361,13 @@ class TcpConnection extends ConnectionInterface
|
|
|
$len = @fwrite($this->_socket, $send_buffer, 8192);
|
|
|
// send successful.
|
|
|
if ($len === strlen($send_buffer)) {
|
|
|
+ $this->bytesWritten += $len;
|
|
|
return true;
|
|
|
}
|
|
|
// Send only part of the data.
|
|
|
if ($len > 0) {
|
|
|
$this->_sendBuffer = substr($send_buffer, $len);
|
|
|
+ $this->bytesWritten += $len;
|
|
|
} else {
|
|
|
// Connection closed?
|
|
|
if (!is_resource($this->_socket) || feof($this->_socket)) {
|
|
|
@@ -333,7 +413,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
{
|
|
|
$pos = strrpos($this->_remoteAddress, ':');
|
|
|
if ($pos) {
|
|
|
- return trim(substr($this->_remoteAddress, 0, $pos), '[]');
|
|
|
+ return substr($this->_remoteAddress, 0, $pos);
|
|
|
}
|
|
|
return '';
|
|
|
}
|
|
|
@@ -352,6 +432,102 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * Get remote address.
|
|
|
+ *
|
|
|
+ * @return string
|
|
|
+ */
|
|
|
+ public function getRemoteAddress()
|
|
|
+ {
|
|
|
+ return $this->_remoteAddress;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get local IP.
|
|
|
+ *
|
|
|
+ * @return string
|
|
|
+ */
|
|
|
+ public function getLocalIp()
|
|
|
+ {
|
|
|
+ $address = $this->getLocalAddress();
|
|
|
+ $pos = strrpos($address, ':');
|
|
|
+ if (!$pos) {
|
|
|
+ return '';
|
|
|
+ }
|
|
|
+ return substr($address, 0, $pos);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get local port.
|
|
|
+ *
|
|
|
+ * @return int
|
|
|
+ */
|
|
|
+ public function getLocalPort()
|
|
|
+ {
|
|
|
+ $address = $this->getLocalAddress();
|
|
|
+ $pos = strrpos($address, ':');
|
|
|
+ if (!$pos) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+ return (int)substr(strrchr($address, ':'), 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get local address.
|
|
|
+ *
|
|
|
+ * @return string
|
|
|
+ */
|
|
|
+ public function getLocalAddress()
|
|
|
+ {
|
|
|
+ return (string)@stream_socket_get_name($this->_socket, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get send buffer queue size.
|
|
|
+ *
|
|
|
+ * @return integer
|
|
|
+ */
|
|
|
+ public function getSendBufferQueueSize()
|
|
|
+ {
|
|
|
+ return strlen($this->_sendBuffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get recv buffer queue size.
|
|
|
+ *
|
|
|
+ * @return integer
|
|
|
+ */
|
|
|
+ public function getRecvBufferQueueSize()
|
|
|
+ {
|
|
|
+ return strlen($this->_recvBuffer);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Is ipv4.
|
|
|
+ *
|
|
|
+ * return bool.
|
|
|
+ */
|
|
|
+ public function isIpV4()
|
|
|
+ {
|
|
|
+ if ($this->transport === 'unix') {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return strpos($this->getRemoteIp(), ':') === false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Is ipv6.
|
|
|
+ *
|
|
|
+ * return bool.
|
|
|
+ */
|
|
|
+ public function isIpV6()
|
|
|
+ {
|
|
|
+ if ($this->transport === 'unix') {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ return strpos($this->getRemoteIp(), ':') !== false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Pauses the reading of data. That is onMessage will not be emitted. Useful to throttle back an upload.
|
|
|
*
|
|
|
* @return void
|
|
|
@@ -387,8 +563,8 @@ class TcpConnection extends ConnectionInterface
|
|
|
{
|
|
|
// SSL handshake.
|
|
|
if ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true) {
|
|
|
- $ret = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_SSLv2_SERVER |
|
|
|
- STREAM_CRYPTO_METHOD_SSLv3_SERVER | STREAM_CRYPTO_METHOD_SSLv23_SERVER);
|
|
|
+ $ret = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_SSLv2_SERVER |
|
|
|
+ STREAM_CRYPTO_METHOD_SSLv23_SERVER);
|
|
|
// Negotiation has failed.
|
|
|
if(false === $ret) {
|
|
|
if (!feof($socket)) {
|
|
|
@@ -417,7 +593,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- $buffer = fread($socket, self::READ_BUFFER_SIZE);
|
|
|
+ $buffer = @fread($socket, self::READ_BUFFER_SIZE);
|
|
|
|
|
|
// Check connection closed.
|
|
|
if ($buffer === '' || $buffer === false) {
|
|
|
@@ -426,11 +602,12 @@ class TcpConnection extends ConnectionInterface
|
|
|
return;
|
|
|
}
|
|
|
} else {
|
|
|
+ $this->bytesRead += strlen($buffer);
|
|
|
$this->_recvBuffer .= $buffer;
|
|
|
}
|
|
|
|
|
|
// If the application layer protocol has been set up.
|
|
|
- if ($this->protocol) {
|
|
|
+ if ($this->protocol !== null) {
|
|
|
$parser = $this->protocol;
|
|
|
while ($this->_recvBuffer !== '' && !$this->_isPaused) {
|
|
|
// The current packet length is known.
|
|
|
@@ -521,6 +698,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
{
|
|
|
$len = @fwrite($this->_socket, $this->_sendBuffer, 8192);
|
|
|
if ($len === strlen($this->_sendBuffer)) {
|
|
|
+ $this->bytesWritten += $len;
|
|
|
Worker::$globalEvent->del($this->_socket, EventInterface::EV_WRITE);
|
|
|
$this->_sendBuffer = '';
|
|
|
// Try to emit onBufferDrain callback when the send buffer becomes empty.
|
|
|
@@ -541,6 +719,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
return true;
|
|
|
}
|
|
|
if ($len > 0) {
|
|
|
+ $this->bytesWritten += $len;
|
|
|
$this->_sendBuffer = substr($this->_sendBuffer, $len);
|
|
|
} else {
|
|
|
self::$statistics['send_fail']++;
|
|
|
@@ -691,6 +870,7 @@ class TcpConnection extends ConnectionInterface
|
|
|
if ($this->worker) {
|
|
|
unset($this->worker->connections[$this->_id]);
|
|
|
}
|
|
|
+ unset(static::$connections[$this->_id]);
|
|
|
$this->_status = self::STATUS_CLOSED;
|
|
|
// Try to emit onClose callback.
|
|
|
if ($this->onClose) {
|
|
|
@@ -729,6 +909,21 @@ class TcpConnection extends ConnectionInterface
|
|
|
*/
|
|
|
public function __destruct()
|
|
|
{
|
|
|
+ static $mod;
|
|
|
self::$statistics['connection_count']--;
|
|
|
+ if (Worker::getGracefulStop()) {
|
|
|
+ if (!isset($mod)) {
|
|
|
+ $mod = ceil((self::$statistics['connection_count'] + 1) / 3);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (0 === self::$statistics['connection_count'] % $mod) {
|
|
|
+ Worker::log('worker[' . posix_getpid() . '] remains ' . self::$statistics['connection_count'] . ' connection(s)');
|
|
|
+ }
|
|
|
+
|
|
|
+ if(0 === self::$statistics['connection_count']) {
|
|
|
+ Worker::$globalEvent->destroy();
|
|
|
+ exit(0);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|