|
|
@@ -108,6 +108,13 @@ class TcpConnection extends ConnectionInterface
|
|
|
public $protocol = null;
|
|
|
|
|
|
/**
|
|
|
+ * Transport (tcp/udp/unix/ssl).
|
|
|
+ *
|
|
|
+ * @var string
|
|
|
+ */
|
|
|
+ public $transport = 'tcp';
|
|
|
+
|
|
|
+ /**
|
|
|
* Which worker belong to.
|
|
|
*
|
|
|
* @var Worker
|
|
|
@@ -207,6 +214,13 @@ class TcpConnection extends ConnectionInterface
|
|
|
protected $_isPaused = false;
|
|
|
|
|
|
/**
|
|
|
+ * SSL handshake completed or not
|
|
|
+ *
|
|
|
+ * @var bool
|
|
|
+ */
|
|
|
+ protected $_sslHandshakeCompleted = false;
|
|
|
+
|
|
|
+ /**
|
|
|
* Construct.
|
|
|
*
|
|
|
* @param resource $socket
|
|
|
@@ -236,6 +250,10 @@ class TcpConnection extends ConnectionInterface
|
|
|
*/
|
|
|
public function send($send_buffer, $raw = false)
|
|
|
{
|
|
|
+ if ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
// Try to call protocol::encode($send_buffer) before sending.
|
|
|
if (false === $raw && $this->protocol) {
|
|
|
$parser = $this->protocol;
|
|
|
@@ -245,7 +263,9 @@ class TcpConnection extends ConnectionInterface
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if ($this->_status === self::STATUS_INITIAL || $this->_status === self::STATUS_CONNECTING) {
|
|
|
+ if ($this->_status !== self::STATUS_ESTABLISH ||
|
|
|
+ ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true)
|
|
|
+ ) {
|
|
|
if ($this->_sendBuffer) {
|
|
|
if ($this->bufferIsFull()) {
|
|
|
self::$statistics['send_fail']++;
|
|
|
@@ -255,10 +275,9 @@ class TcpConnection extends ConnectionInterface
|
|
|
$this->_sendBuffer .= $send_buffer;
|
|
|
$this->checkBufferWillFull();
|
|
|
return null;
|
|
|
- } elseif ($this->_status === self::STATUS_CLOSING || $this->_status === self::STATUS_CLOSED) {
|
|
|
- return false;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
// Attempt to send data directly.
|
|
|
if ($this->_sendBuffer === '') {
|
|
|
$len = @fwrite($this->_socket, $send_buffer);
|
|
|
@@ -366,6 +385,33 @@ class TcpConnection extends ConnectionInterface
|
|
|
*/
|
|
|
public function baseRead($socket, $check_eof = true)
|
|
|
{
|
|
|
+ // SSL handshake.
|
|
|
+ if ($this->transport === 'ssl' && $this->_sslHandshakeCompleted !== true) {
|
|
|
+ stream_set_blocking($socket, true);
|
|
|
+ stream_set_timeout($socket, 1);
|
|
|
+ $ret = stream_socket_enable_crypto($socket, true, STREAM_CRYPTO_METHOD_SSLv23_SERVER);
|
|
|
+ if(!$ret) {
|
|
|
+ echo new \Exception('ssl handshake fail, stream_socket_enable_crypto return ' . var_export($ret, true));
|
|
|
+ return $this->destroy();
|
|
|
+ }
|
|
|
+ if (isset($this->onSslHandshake)) {
|
|
|
+ try {
|
|
|
+ call_user_func($this->onSslHandshake, $this);
|
|
|
+ } catch (\Exception $e) {
|
|
|
+ self::log($e);
|
|
|
+ exit(250);
|
|
|
+ } catch (\Error $e) {
|
|
|
+ self::log($e);
|
|
|
+ exit(250);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ $this->_sslHandshakeCompleted = true;
|
|
|
+ if ($this->_sendBuffer) {
|
|
|
+ Worker::$globalEvent->add($socket, EventInterface::EV_WRITE, array($this, 'baseWrite'));
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
$buffer = fread($socket, self::READ_BUFFER_SIZE);
|
|
|
|
|
|
// Check connection closed.
|