|
@@ -6,7 +6,6 @@ use Bunny\Channel;
|
|
|
use Bunny\Client;
|
|
use Bunny\Client;
|
|
|
use Illuminate\Support\Arr;
|
|
use Illuminate\Support\Arr;
|
|
|
use Psr\Log\LoggerInterface;
|
|
use Psr\Log\LoggerInterface;
|
|
|
-use Workerman\RabbitMQ\Client as AsyncClient;
|
|
|
|
|
|
|
|
|
|
class Producer
|
|
class Producer
|
|
|
{
|
|
{
|
|
@@ -32,15 +31,81 @@ class Producer
|
|
|
|
|
|
|
|
protected array $queueDeclare = [];
|
|
protected array $queueDeclare = [];
|
|
|
|
|
|
|
|
- public function __construct(
|
|
|
|
|
|
|
+ private static ?array $instances = null;
|
|
|
|
|
+
|
|
|
|
|
+ private function __construct(
|
|
|
protected array $rabbitmqConfig,
|
|
protected array $rabbitmqConfig,
|
|
|
protected ?LoggerInterface $logger = null)
|
|
protected ?LoggerInterface $logger = null)
|
|
|
{
|
|
{
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- public static function connect(array $rabbitmqConfig, LoggerInterface $logger = null): self
|
|
|
|
|
|
|
+ private function __clone()
|
|
|
|
|
+ {
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public function setLogger(?LoggerInterface $logger)
|
|
|
|
|
+ {
|
|
|
|
|
+ $this->logger = $logger;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static function getInstance(array $rabbitmqConfig, ?LoggerInterface $logger = null): self
|
|
|
|
|
+ {
|
|
|
|
|
+ ksort($rabbitmqConfig);
|
|
|
|
|
+ $key = md5(json_encode($rabbitmqConfig));
|
|
|
|
|
+ if (isset(self::$instances[$key])) {
|
|
|
|
|
+ $obj = self::$instances[$key];
|
|
|
|
|
+ $obj->setLogger($logger);
|
|
|
|
|
+
|
|
|
|
|
+ return self::$instances[$key];
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (empty(self::$instances)) {
|
|
|
|
|
+ $obj = new self($rabbitmqConfig, $logger);
|
|
|
|
|
+ self::$instances[$key] = $obj;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return self::$instances[$key];
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ public static function connect(array $rabbitmqConfig, ?LoggerInterface $logger = null): self
|
|
|
|
|
+ {
|
|
|
|
|
+ return self::getInstance($rabbitmqConfig, $logger);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ protected function getAsyncConnect(array $rabbitmqConfig, ?LoggerInterface $logger = null)
|
|
|
|
|
+ {
|
|
|
|
|
+ static $connect = null;
|
|
|
|
|
+ static $client = null;
|
|
|
|
|
+
|
|
|
|
|
+ if (null === $connect) {
|
|
|
|
|
+ $client = new AsyncClient($rabbitmqConfig, $logger);
|
|
|
|
|
+ $connect = $client->connect();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if ($client->isConnected()) {
|
|
|
|
|
+ return $connect;
|
|
|
|
|
+ }
|
|
|
|
|
+ $connect = $client->connect();
|
|
|
|
|
+
|
|
|
|
|
+ return $connect;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ protected function getSyncConnect(array $rabbitmqConfig)
|
|
|
{
|
|
{
|
|
|
- return new self($rabbitmqConfig, $logger);
|
|
|
|
|
|
|
+ static $synConnect = null;
|
|
|
|
|
+ static $synClient = null;
|
|
|
|
|
+
|
|
|
|
|
+ if (null === $synConnect) {
|
|
|
|
|
+ $synClient = (new Client($rabbitmqConfig));
|
|
|
|
|
+ $synConnect = $synClient->connect();
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if ($synClient->isConnected()) {
|
|
|
|
|
+ return $synConnect;
|
|
|
|
|
+ }
|
|
|
|
|
+ $synConnect = $synClient->connect();
|
|
|
|
|
+
|
|
|
|
|
+ return $synConnect;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
protected function setDeclare(array $exchangeOrQueueDeclare, string $exchange = '', string $exchangeType = '')
|
|
protected function setDeclare(array $exchangeOrQueueDeclare, string $exchange = '', string $exchangeType = '')
|
|
@@ -59,73 +124,76 @@ class Producer
|
|
|
$this->exchangeDeclare['passive'], $this->exchangeDeclare['durable'],
|
|
$this->exchangeDeclare['passive'], $this->exchangeDeclare['durable'],
|
|
|
$this->exchangeDeclare['auto_delete'], $this->exchangeDeclare['internal'],
|
|
$this->exchangeDeclare['auto_delete'], $this->exchangeDeclare['internal'],
|
|
|
$this->exchangeDeclare['nowait'], $this->exchangeDeclare['arguments']);
|
|
$this->exchangeDeclare['nowait'], $this->exchangeDeclare['arguments']);
|
|
|
- } else {
|
|
|
|
|
- return $channel->queueDeclare($routingOrQueue,
|
|
|
|
|
- $this->queueDeclare['passive'], $this->queueDeclare['durable'],
|
|
|
|
|
- $this->queueDeclare['exclusive'], $this->queueDeclare['auto_delete'],
|
|
|
|
|
- $this->queueDeclare['nowait'], $this->queueDeclare['arguments']);
|
|
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ return $channel->queueDeclare($routingOrQueue,
|
|
|
|
|
+ $this->queueDeclare['passive'], $this->queueDeclare['durable'],
|
|
|
|
|
+ $this->queueDeclare['exclusive'], $this->queueDeclare['auto_delete'],
|
|
|
|
|
+ $this->queueDeclare['nowait'], $this->queueDeclare['arguments']);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public function publishAsync(string $data, string $exchange = '', string $exchangeType = '', string $routingOrQueue = '',
|
|
public function publishAsync(string $data, string $exchange = '', string $exchangeType = '', string $routingOrQueue = '',
|
|
|
- array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false
|
|
|
|
|
|
|
+ array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false,
|
|
|
) {
|
|
) {
|
|
|
- $this->setDeclare($exchangeOrQueueDeclare, $exchange, $exchangeType);
|
|
|
|
|
-
|
|
|
|
|
$reject = function (\Throwable $throwable) {
|
|
$reject = function (\Throwable $throwable) {
|
|
|
- $this->logger?->error('['.getmypid().']:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
|
|
|
|
|
|
|
+ $this->logger?->error('['.getmypid().']PUBLIAH ASYNC:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
|
|
|
};
|
|
};
|
|
|
- (new AsyncClient($this->rabbitmqConfig, $this->logger))->connect()
|
|
|
|
|
- ->then(function (AsyncClient $client) {
|
|
|
|
|
- return $client->channel();
|
|
|
|
|
- }, $reject)
|
|
|
|
|
- ->then(function (Channel $channel) use ($exchange, $exchangeType, $routingOrQueue) {
|
|
|
|
|
- return $this->declare($channel, $routingOrQueue, $exchange, $exchangeType)
|
|
|
|
|
- ->then(function () use ($channel) {
|
|
|
|
|
- return $channel;
|
|
|
|
|
- });
|
|
|
|
|
- }, $reject)
|
|
|
|
|
- ->then(function (Channel $channel) use ($exchange, $routingOrQueue, $data, $headers, $mandatory, $immediate) {
|
|
|
|
|
- $this->logger?->info('('.getmygid().') Sending :'.$data, [__CLASS__]);
|
|
|
|
|
-
|
|
|
|
|
- return $channel->publish($data, $headers, $exchange, $routingOrQueue, $mandatory, $immediate)
|
|
|
|
|
- ->then(function () use ($channel) {
|
|
|
|
|
- return $channel;
|
|
|
|
|
|
|
+ try {
|
|
|
|
|
+ $this->setDeclare($exchangeOrQueueDeclare, $exchange, $exchangeType);
|
|
|
|
|
+ $this->getAsyncConnect($this->rabbitmqConfig, $this->logger)
|
|
|
|
|
+ ->then(function (AsyncClient $client) {
|
|
|
|
|
+ return $client->channel();
|
|
|
|
|
+ }, $reject)
|
|
|
|
|
+ ->then(function (Channel $channel) use ($exchange, $exchangeType, $routingOrQueue) {
|
|
|
|
|
+ return $this->declare($channel, $routingOrQueue, $exchange, $exchangeType)
|
|
|
|
|
+ ->then(function () use ($channel) {
|
|
|
|
|
+ return $channel;
|
|
|
|
|
+ })
|
|
|
|
|
+ ;
|
|
|
|
|
+ }, $reject)
|
|
|
|
|
+ ->then(function (Channel $channel) use ($exchange, $routingOrQueue, $data, $headers, $mandatory, $immediate) {
|
|
|
|
|
+ $this->logger?->info('('.getmygid().') Sending :'.$data, [__CLASS__]);
|
|
|
|
|
+
|
|
|
|
|
+ return $channel->publish($data, $headers, $exchange, $routingOrQueue, $mandatory, $immediate)
|
|
|
|
|
+ ->then(function () use ($channel) {
|
|
|
|
|
+ return $channel;
|
|
|
|
|
+ })
|
|
|
|
|
+ ;
|
|
|
|
|
+ }, $reject)
|
|
|
|
|
+ ->then(function (Channel $channel) use ($data) {
|
|
|
|
|
+ $this->logger?->info('('.getmygid().') Sent :'.$data, [__CLASS__]);
|
|
|
|
|
+
|
|
|
|
|
+ $client = $channel->getClient();
|
|
|
|
|
+
|
|
|
|
|
+ return $channel->close()->then(function () use ($client) {
|
|
|
|
|
+ return $client;
|
|
|
});
|
|
});
|
|
|
- }, $reject)
|
|
|
|
|
- ->then(function (Channel $channel) use ($data) {
|
|
|
|
|
- $this->logger?->info('('.getmygid().') Sent :'.$data, [__CLASS__]);
|
|
|
|
|
-
|
|
|
|
|
- $client = $channel->getClient();
|
|
|
|
|
-
|
|
|
|
|
- return $channel->close()->then(function () use ($client) {
|
|
|
|
|
- return $client;
|
|
|
|
|
- });
|
|
|
|
|
- }, $reject)
|
|
|
|
|
- ->then(function (AsyncClient $client) {
|
|
|
|
|
- $client->disconnect();
|
|
|
|
|
- }, $reject);
|
|
|
|
|
|
|
+ }, $reject)
|
|
|
|
|
+ ;
|
|
|
|
|
+ } catch (\Throwable $throwable) {
|
|
|
|
|
+ $reject($throwable);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public function publishSync(string $data, string $exchange = '', string $exchangeType = '', string $routingOrQueue = '',
|
|
public function publishSync(string $data, string $exchange = '', string $exchangeType = '', string $routingOrQueue = '',
|
|
|
- array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false
|
|
|
|
|
|
|
+ array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false,
|
|
|
) {
|
|
) {
|
|
|
$this->setDeclare($exchangeOrQueueDeclare, $exchange, $exchangeType);
|
|
$this->setDeclare($exchangeOrQueueDeclare, $exchange, $exchangeType);
|
|
|
|
|
|
|
|
$rabbitmqConfig = Arr::only($this->rabbitmqConfig, ['host', 'port', 'vhost', 'user', 'password']);
|
|
$rabbitmqConfig = Arr::only($this->rabbitmqConfig, ['host', 'port', 'vhost', 'user', 'password']);
|
|
|
|
|
|
|
|
try {
|
|
try {
|
|
|
- $client = (new Client($rabbitmqConfig))->connect();
|
|
|
|
|
|
|
+ $client = $this->getSyncConnect($rabbitmqConfig);
|
|
|
$channel = $client->channel();
|
|
$channel = $client->channel();
|
|
|
$this->declare($channel, $routingOrQueue, $exchange, $exchangeType);
|
|
$this->declare($channel, $routingOrQueue, $exchange, $exchangeType);
|
|
|
$published = $channel->publish($data, $headers, $exchange, $routingOrQueue, $mandatory, $immediate);
|
|
$published = $channel->publish($data, $headers, $exchange, $routingOrQueue, $mandatory, $immediate);
|
|
|
|
|
+ $client->removeChannel($channel->getChannelId());
|
|
|
} catch (\Throwable $throwable) {
|
|
} catch (\Throwable $throwable) {
|
|
|
- $this->logger?->error('['.getmypid().']:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
|
|
|
|
|
|
|
+ $this->logger?->error('['.getmypid().']PUBLIAH SYNC:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
|
|
|
} finally {
|
|
} finally {
|
|
|
isset($channel) && $channel->close();
|
|
isset($channel) && $channel->close();
|
|
|
- isset($client) && $client->disconnect();
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return $published ?? false;
|
|
return $published ?? false;
|
|
|
}
|
|
}
|
|
|
-}
|
|
|
|
|
|
|
+}
|