|
@@ -14,9 +14,22 @@
|
|
|
|
|
|
|
|
namespace Workerman\Connection;
|
|
namespace Workerman\Connection;
|
|
|
|
|
|
|
|
|
|
+use Exception;
|
|
|
use Throwable;
|
|
use Throwable;
|
|
|
|
|
+use Workerman\Protocols\ProtocolInterface;
|
|
|
use Workerman\Worker;
|
|
use Workerman\Worker;
|
|
|
-use Exception;
|
|
|
|
|
|
|
+use function class_exists;
|
|
|
|
|
+use function explode;
|
|
|
|
|
+use function fclose;
|
|
|
|
|
+use function stream_context_create;
|
|
|
|
|
+use function stream_set_blocking;
|
|
|
|
|
+use function stream_socket_client;
|
|
|
|
|
+use function stream_socket_recvfrom;
|
|
|
|
|
+use function stream_socket_sendto;
|
|
|
|
|
+use function strlen;
|
|
|
|
|
+use function substr;
|
|
|
|
|
+use function ucfirst;
|
|
|
|
|
+use const STREAM_CLIENT_CONNECT;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* AsyncUdpConnection.
|
|
* AsyncUdpConnection.
|
|
@@ -60,20 +73,20 @@ class AsyncUdpConnection extends UdpConnection
|
|
|
public function __construct($remoteAddress, $contextOption = [])
|
|
public function __construct($remoteAddress, $contextOption = [])
|
|
|
{
|
|
{
|
|
|
// Get the application layer communication protocol and listening address.
|
|
// Get the application layer communication protocol and listening address.
|
|
|
- list($scheme, $address) = \explode(':', $remoteAddress, 2);
|
|
|
|
|
|
|
+ list($scheme, $address) = explode(':', $remoteAddress, 2);
|
|
|
// Check application layer protocol class.
|
|
// Check application layer protocol class.
|
|
|
if ($scheme !== 'udp') {
|
|
if ($scheme !== 'udp') {
|
|
|
- $scheme = \ucfirst($scheme);
|
|
|
|
|
|
|
+ $scheme = ucfirst($scheme);
|
|
|
$this->protocol = '\\Protocols\\' . $scheme;
|
|
$this->protocol = '\\Protocols\\' . $scheme;
|
|
|
- if (!\class_exists($this->protocol)) {
|
|
|
|
|
|
|
+ if (!class_exists($this->protocol)) {
|
|
|
$this->protocol = "\\Workerman\\Protocols\\$scheme";
|
|
$this->protocol = "\\Workerman\\Protocols\\$scheme";
|
|
|
- if (!\class_exists($this->protocol)) {
|
|
|
|
|
|
|
+ if (!class_exists($this->protocol)) {
|
|
|
throw new Exception("class \\Protocols\\$scheme not exist");
|
|
throw new Exception("class \\Protocols\\$scheme not exist");
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- $this->remoteAddress = \substr($address, 2);
|
|
|
|
|
|
|
+ $this->remoteAddress = substr($address, 2);
|
|
|
$this->contextOption = $contextOption;
|
|
$this->contextOption = $contextOption;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -86,14 +99,16 @@ class AsyncUdpConnection extends UdpConnection
|
|
|
*/
|
|
*/
|
|
|
public function baseRead($socket)
|
|
public function baseRead($socket)
|
|
|
{
|
|
{
|
|
|
- $recvBuffer = \stream_socket_recvfrom($socket, Worker::MAX_UDP_PACKAGE_SIZE, 0, $remoteAddress);
|
|
|
|
|
|
|
+ $recvBuffer = stream_socket_recvfrom($socket, Worker::MAX_UDP_PACKAGE_SIZE, 0, $remoteAddress);
|
|
|
if (false === $recvBuffer || empty($remoteAddress)) {
|
|
if (false === $recvBuffer || empty($remoteAddress)) {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if ($this->onMessage) {
|
|
if ($this->onMessage) {
|
|
|
if ($this->protocol) {
|
|
if ($this->protocol) {
|
|
|
- $recvBuffer = $this->protocol::decode($recvBuffer, $this);
|
|
|
|
|
|
|
+ /** @var ProtocolInterface $parser */
|
|
|
|
|
+ $parser = $this->protocol;
|
|
|
|
|
+ $recvBuffer = $parser::decode($recvBuffer, $this);
|
|
|
}
|
|
}
|
|
|
++ConnectionInterface::$statistics['total_request'];
|
|
++ConnectionInterface::$statistics['total_request'];
|
|
|
try {
|
|
try {
|
|
@@ -105,29 +120,6 @@ class AsyncUdpConnection extends UdpConnection
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * Sends data on the connection.
|
|
|
|
|
- *
|
|
|
|
|
- * @param mixed $sendBuffer
|
|
|
|
|
- * @param bool $raw
|
|
|
|
|
- * @return void|boolean
|
|
|
|
|
- * @throws Throwable
|
|
|
|
|
- */
|
|
|
|
|
- public function send(mixed $sendBuffer, bool $raw = false)
|
|
|
|
|
- {
|
|
|
|
|
- if (false === $raw && $this->protocol) {
|
|
|
|
|
- $parser = $this->protocol;
|
|
|
|
|
- $sendBuffer = $parser::encode($sendBuffer, $this);
|
|
|
|
|
- if ($sendBuffer === '') {
|
|
|
|
|
- return;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- if ($this->connected === false) {
|
|
|
|
|
- $this->connect();
|
|
|
|
|
- }
|
|
|
|
|
- return \strlen($sendBuffer) === \stream_socket_sendto($this->socket, $sendBuffer, 0);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /**
|
|
|
|
|
* Close connection.
|
|
* Close connection.
|
|
|
*
|
|
*
|
|
|
* @param mixed|null $data
|
|
* @param mixed|null $data
|
|
@@ -141,7 +133,7 @@ class AsyncUdpConnection extends UdpConnection
|
|
|
$this->send($data, $raw);
|
|
$this->send($data, $raw);
|
|
|
}
|
|
}
|
|
|
$this->eventLoop->offReadable($this->socket);
|
|
$this->eventLoop->offReadable($this->socket);
|
|
|
- \fclose($this->socket);
|
|
|
|
|
|
|
+ fclose($this->socket);
|
|
|
$this->connected = false;
|
|
$this->connected = false;
|
|
|
// Try to emit onClose callback.
|
|
// Try to emit onClose callback.
|
|
|
if ($this->onClose) {
|
|
if ($this->onClose) {
|
|
@@ -155,6 +147,30 @@ class AsyncUdpConnection extends UdpConnection
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
|
|
+ * Sends data on the connection.
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param mixed $sendBuffer
|
|
|
|
|
+ * @param bool $raw
|
|
|
|
|
+ * @return void|boolean
|
|
|
|
|
+ * @throws Throwable
|
|
|
|
|
+ */
|
|
|
|
|
+ public function send(mixed $sendBuffer, bool $raw = false)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (false === $raw && $this->protocol) {
|
|
|
|
|
+ /** @var ProtocolInterface $parser */
|
|
|
|
|
+ $parser = $this->protocol;
|
|
|
|
|
+ $sendBuffer = $parser::encode($sendBuffer, $this);
|
|
|
|
|
+ if ($sendBuffer === '') {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ if ($this->connected === false) {
|
|
|
|
|
+ $this->connect();
|
|
|
|
|
+ }
|
|
|
|
|
+ return strlen($sendBuffer) === stream_socket_sendto($this->socket, $sendBuffer, 0);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /**
|
|
|
* Connect.
|
|
* Connect.
|
|
|
*
|
|
*
|
|
|
* @return void
|
|
* @return void
|
|
@@ -169,19 +185,19 @@ class AsyncUdpConnection extends UdpConnection
|
|
|
$this->eventLoop = Worker::$globalEvent;
|
|
$this->eventLoop = Worker::$globalEvent;
|
|
|
}
|
|
}
|
|
|
if ($this->contextOption) {
|
|
if ($this->contextOption) {
|
|
|
- $context = \stream_context_create($this->contextOption);
|
|
|
|
|
- $this->socket = \stream_socket_client("udp://{$this->remoteAddress}", $errno, $errmsg,
|
|
|
|
|
- 30, \STREAM_CLIENT_CONNECT, $context);
|
|
|
|
|
|
|
+ $context = stream_context_create($this->contextOption);
|
|
|
|
|
+ $this->socket = stream_socket_client("udp://$this->remoteAddress", $errno, $errmsg,
|
|
|
|
|
+ 30, STREAM_CLIENT_CONNECT, $context);
|
|
|
} else {
|
|
} else {
|
|
|
- $this->socket = \stream_socket_client("udp://{$this->remoteAddress}", $errno, $errmsg);
|
|
|
|
|
|
|
+ $this->socket = stream_socket_client("udp://$this->remoteAddress", $errno, $errmsg);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (!$this->socket) {
|
|
if (!$this->socket) {
|
|
|
- Worker::safeEcho(new \Exception($errmsg));
|
|
|
|
|
|
|
+ Worker::safeEcho(new Exception($errmsg));
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- \stream_set_blocking($this->socket, false);
|
|
|
|
|
|
|
+ stream_set_blocking($this->socket, false);
|
|
|
|
|
|
|
|
if ($this->onMessage) {
|
|
if ($this->onMessage) {
|
|
|
$this->eventLoop->onWritable($this->socket, [$this, 'baseRead']);
|
|
$this->eventLoop->onWritable($this->socket, [$this, 'baseRead']);
|