Procházet zdrojové kódy

fix(client & package): depences & client updates
1. copy workerman async client.
2. update Producer , use Singleton mode.
3. some try...catch for async client.

roiwk před 1 rokem
rodič
revize
02aad2797c

+ 7 - 4
composer.json

@@ -17,16 +17,19 @@
       "email": "vip@roiwk.cn"
     }
   ],
-  "minimum-stability": "stable",
   "require": {
     "php": "^8.0",
-    "workerman/rabbitmq": "^1.0",
     "psr/log": "^1.0||^2.0||^3.0",
-    "illuminate/collections": "^8.0|^9.0|^10.0"
+    "illuminate/collections": "^8.0|^9.0|^10.0",
+    "bunny/bunny": "^0.5.5"
   },
   "autoload": {
     "psr-4": {
       "Roiwk\\Rabbitmq\\": "src"
     }
+  },
+  "require-dev": {
+    "monolog/monolog": "^2.0|^3.0",
+    "workerman/workerman": "^4.0|^5.0@beta"
   }
-}
+}

+ 9 - 8
examples/1-hello-world/send-async.php

@@ -1,10 +1,8 @@
 <?php
-use Monolog\Logger;
+
 use Roiwk\Rabbitmq\Producer;
-use Bunny\Channel;
-use Bunny\Message;
+use Workerman\Timer;
 use Workerman\Worker;
-use Roiwk\Rabbitmq\AbstractConsumer;
 
 if (file_exists(__DIR__ . '/../../../../../vendor/autoload.php')) {
     require __DIR__ . '/../../../../../vendor/autoload.php';
@@ -14,12 +12,15 @@ if (file_exists(__DIR__ . '/../../../../../vendor/autoload.php')) {
 
 $worker = new Worker();
 
-$worker->onWorkerStart = function()  {
+// $worker->count = 4;
 
-    $config = require __DIR__ . '/../config.php';
-    $log = require __DIR__ . '/../log.php';
+$worker->onWorkerStart = function()  {
 
-    Producer::connect($config, $log)->publishAsync('Hello World!', '', '', 'hello');
+        $config = require __DIR__ . '/../config.php';
+        $log = require __DIR__ . '/../log.php';
 
+        // Timer::add(1, function() use ($config, $log) {
+            Producer::connect($config, $log)->publishAsync('Hello World!', '', '', 'hello');
+        // });
 };
 Worker::runAll();

+ 1 - 1
examples/log.php

@@ -2,7 +2,7 @@
 
 if (class_exists(Monolog\Logger::class)) {
     $log = new Monolog\Logger('test');
-    $log->pushHandler(new Monolog\Handler\StreamHandler('php://stdout'));
+    $log->pushHandler(new Monolog\Handler\StreamHandler('php://stdout', Monolog\Logger::DEBUG));
     return $log;
 } else {
     return null;

+ 8 - 0
src/AbstractConsumer.php

@@ -64,6 +64,14 @@ abstract class AbstractConsumer implements Consumable
         protected array $rabbitmqConfig,
         protected ?LoggerInterface $logger = null,
     ){
+        if ($this->async) {
+            $this->rabbitmqConfig = array_merge_recursive($this->rabbitmqConfig, [
+                'async_connect' => true,
+                'persistent' => true,
+                'path' => '/',
+            ]);
+        }
+
         $this->init();
     }
 

+ 356 - 0
src/AsyncClient.php

@@ -0,0 +1,356 @@
+<?php
+namespace Roiwk\RabbitMQ;
+
+use Bunny\AbstractClient;
+use Bunny\ClientStateEnum;
+use Bunny\Exception\ClientException;
+use Bunny\Protocol\Buffer;
+use Bunny\Protocol\HeartbeatFrame;
+use Bunny\Protocol\MethodConnectionStartFrame;
+use Bunny\Protocol\MethodConnectionTuneFrame;
+use Psr\Log\LoggerInterface;
+use React\Promise;
+use Workerman\Events\EventInterface;
+use Workerman\Worker;
+use Workerman\Timer;
+
+class AsyncClient extends \Bunny\Async\Client
+{
+    /** @var LoggerInterface */
+    protected $logger;
+
+    /** @var null|callable 重启事件回调 */
+    protected $restartCallback = null;
+
+    /**
+     * Version 5.x uses a new event interface
+     * @return bool
+     */
+    public static function isNewEventInterface(): bool
+    {
+        return version_compare(Worker::VERSION, '5.0.0', '>=');
+    }
+
+    /**
+     * Client constructor.
+     * @param array $options = [
+     *  "host" => "127.0.0.1",
+     *  "port" => 5672,
+     *  "vhost" => "/",
+     *  "mechanism" => "AMQPLAIN"
+     *  "user" => "guest",
+     *  "password" => "guest",
+     *  "timeout" => 10,
+     *  "restart_interval" => 0,
+     *  "heartbeat" => 60,
+     *  "heartbeat_callback" => function(){}
+     * ] {@see AbstractClient::__construct()} and {@see \Workerman\RabbitMQ\Client::authResponse()}
+     * @param LoggerInterface|null $logger
+     */
+    public function __construct(array $options = [], LoggerInterface $logger = null)
+    {
+        $options['async'] = true;
+        $this->logger = $logger;
+        AbstractClient::__construct($options);
+        $this->eventLoop = Worker::$globalEvent;
+    }
+
+    /**
+     * 注册重启回调
+     *  - 回调函数的返回值应为 Promise\PromiseInterface|null
+     *  - 入参为当前client实例、replyCode、replyText
+     *
+     * @param callable $callback = function (Client $client, $replyCode, $replyText): Promise\PromiseInterface|null {}
+     * @return $this
+     */
+    public function registerRestartCallback(callable $callback): Client
+    {
+        $this->restartCallback = $callback;
+        return $this;
+    }
+
+    /**
+     * 移除重启回调
+     *
+     * @return $this
+     */
+    public function unregisterRestartCallback(): Client
+    {
+        $this->restartCallback = null;
+        return $this;
+    }
+
+    /**
+     * Asynchronously sends buffered data over the wire.
+     *
+     * - Calls {@link eventLoops}'s addWriteStream() with client's stream.
+     * - Consecutive calls will return the same instance of promise.
+     *
+     * @return Promise\PromiseInterface
+     */
+    protected function flushWriteBuffer()
+    {
+        if ($this->flushWriteBufferPromise) {
+            return $this->flushWriteBufferPromise;
+
+        } else {
+            $deferred = new Promise\Deferred();
+
+            $streamFunction = function ($stream) use ($deferred) {
+                try {
+                    $this->write();
+
+                    if ($this->writeBuffer->isEmpty()) {
+                        // support workerman 5.x
+                        if (method_exists($this->eventLoop, 'offWritable')) {
+                            $this->eventLoop->offWritable($stream);
+                        }
+                        // ver earlier than 5.x
+                        else {
+                            $this->eventLoop->del($stream, EventInterface::EV_WRITE);
+                        }
+                        $this->flushWriteBufferPromise = null;
+                        $deferred->resolve(true);
+                    }
+
+                } catch (\Exception $e) {
+                    // support workerman 5.x
+                    if (method_exists($this->eventLoop, 'offWritable')) {
+                        $this->eventLoop->offWritable($stream);
+                    }
+                    // ver earlier than 5.x
+                    else {
+                        $this->eventLoop->del($stream, EventInterface::EV_WRITE);
+                    }
+                    $this->flushWriteBufferPromise = null;
+                    $deferred->reject($e);
+                }
+            };
+            // support workerman 5.x
+            if (method_exists($this->eventLoop, 'onWritable')) {
+                $this->eventLoop->onWritable($this->getStream(), $streamFunction);
+            }
+            // ver earlier than 5.x
+            else {
+                $this->eventLoop->add($this->getStream(), EventInterface::EV_WRITE, $streamFunction);
+            }
+            return $this->flushWriteBufferPromise = $deferred->promise();
+        }
+    }
+
+    /**
+     * Override to support PLAIN mechanism
+     * @param MethodConnectionStartFrame $start
+     * @return bool|Promise\PromiseInterface
+     */
+    protected function authResponse(MethodConnectionStartFrame $start)
+    {
+        if (strpos($start->mechanisms, ($mechanism = $this->options['mechanism'] ?? 'AMQPLAIN')) === false) {
+            throw new ClientException("Server does not support {$this->options['mechanism']} mechanism (supported: {$start->mechanisms}).");
+        }
+
+        if($mechanism === 'PLAIN'){
+            return $this->connectionStartOk([], $mechanism, sprintf("\0%s\0%s", $this->options["user"], $this->options["password"]), "en_US");
+        }elseif($mechanism === 'AMQPLAIN'){
+
+            $responseBuffer = new Buffer();
+            $this->writer->appendTable([
+                "LOGIN" => $this->options["user"],
+                "PASSWORD" => $this->options["password"],
+            ], $responseBuffer);
+
+            $responseBuffer->discard(4);
+
+            return $this->connectionStartOk([], $mechanism, $responseBuffer->read($responseBuffer->getLength()), "en_US");
+        }else{
+
+            throw new ClientException("Client does not support {$mechanism} mechanism. ");
+        }
+    }
+
+    /**
+     * Connects to AMQP server.
+     *
+     * Calling connect() multiple times will result in error.
+     *
+     * @return Promise\PromiseInterface
+     */
+    public function connect()
+    {
+        if ($this->state !== ClientStateEnum::NOT_CONNECTED) {
+            return Promise\reject(new ClientException("Client already connected/connecting."));
+        }
+
+        $this->state = ClientStateEnum::CONNECTING;
+        $this->writer->appendProtocolHeader($this->writeBuffer);
+
+        try {
+            // support workerman 5.x
+            if (method_exists($this->eventLoop, 'onReadable')) {
+                $this->eventLoop->onReadable($this->getStream(), [$this, "onDataAvailable"]);
+            }
+            // ver earlier than 5.x
+            else {
+                $this->eventLoop->add($this->getStream(), EventInterface::EV_READ, [$this, "onDataAvailable"]);
+            }
+        } catch (\Exception $e) {
+            return Promise\reject($e);
+        }
+
+        return $this->flushWriteBuffer()->then(function () {
+            return $this->awaitConnectionStart();
+
+        })->then(function (MethodConnectionStartFrame $start) {
+            return $this->authResponse($start);
+
+        })->then(function () {
+            return $this->awaitConnectionTune();
+
+        })->then(function (MethodConnectionTuneFrame $tune) {
+            $this->frameMax = $tune->frameMax;
+            if ($tune->channelMax > 0) {
+                $this->channelMax = $tune->channelMax;
+            }
+            return $this->connectionTuneOk($tune->channelMax, $tune->frameMax, $this->options["heartbeat"]);
+
+        })->then(function () {
+            return $this->connectionOpen($this->options["vhost"]);
+
+        })->then(function () {
+            if (isset($this->options["heartbeat"]) && $this->options["heartbeat"] > 0) {
+                $this->heartbeatTimer = Timer::add($this->options["heartbeat"], [$this, "onHeartbeat"]);
+            }
+
+            $this->state = ClientStateEnum::CONNECTED;
+            return $this;
+
+        });
+    }
+
+    /**
+     * Disconnects client from server.
+     *
+     * - Calling disconnect() if client is not connected will result in error.
+     * - Calling disconnect() multiple times will result in the same promise.
+     *
+     * @param int $replyCode
+     * @param string $replyText
+     * @return Promise\PromiseInterface|null
+     */
+    public function disconnect($replyCode = 0, $replyText = "")
+    {
+        if ($this->state === ClientStateEnum::DISCONNECTING) {
+            return $this->disconnectPromise;
+        }
+
+        if ($this->state !== ClientStateEnum::CONNECTED) {
+            return Promise\reject(new ClientException("Client is not connected."));
+        }
+
+        $this->state = ClientStateEnum::DISCONNECTING;
+
+        $promises = [];
+
+        if ($replyCode === 0) {
+            foreach ($this->channels as $channel) {
+                $promises[] = $channel->close($replyCode, $replyText);
+            }
+        }
+        else{
+            foreach($this->channels as $channel){
+                $this->removeChannel($channel->getChannelId());
+            }
+        }
+
+        if ($this->heartbeatTimer) {
+            Timer::del($this->heartbeatTimer);
+            $this->heartbeatTimer = null;
+        }
+
+        return $this->disconnectPromise = Promise\all($promises)->then(function () use ($replyCode, $replyText) {
+            if (!empty($this->channels)) {
+                throw new \LogicException("All channels have to be closed by now.");
+            }
+            if($replyCode !== 0){
+                return null;
+            }
+            return $this->connectionClose($replyCode, $replyText, 0, 0);
+        })->then(function () use ($replyCode, $replyText){
+            // support workerman 5.x
+            if (method_exists($this->eventLoop, 'offReadable')) {
+                $this->eventLoop->offReadable($this->getStream());
+            }
+            // ver earlier than 5.x
+            else {
+                $this->eventLoop->del($this->getStream(), EventInterface::EV_READ);
+            }
+            $this->closeStream();
+            $this->init();
+            if ($replyCode !== 0) {
+                // 触发重启事件回调
+                if ($this->restartCallback) {
+                    return call_user_func($this->restartCallback, $this, $replyCode, $replyText);
+                }
+                // 默认重启流程
+                else {
+                    // 延迟重启
+                    if (($restartInterval = $this->options['restart_interval'] ?? 0) > 0) {
+                        Worker::log("RabbitMQ client will restart in $restartInterval seconds. ");
+
+                        $timerFunction = function () use ($replyCode, $replyText, $restartInterval) {
+                            Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
+                        };
+                        // support workerman 5.x
+                        if (method_exists($this->eventLoop, 'delay')) {
+                            $this->eventLoop->delay($restartInterval, $timerFunction);
+                        }
+                        // ver earlier than 5.x
+                        else {
+                            $this->eventLoop->add($restartInterval, EventInterface::EV_TIMER_ONCE, $timerFunction);
+                        }
+                        return null;
+                    }
+                    // 立即重启
+                    else {
+                        Worker::stopAll(0,"RabbitMQ client disconnected: [{$replyCode}] {$replyText}");
+                    }
+                }
+            }
+            return $this;
+        });
+    }
+
+    /**
+     * Callback when heartbeat timer timed out.
+     */
+    public function onHeartbeat()
+    {
+        $this->writer->appendFrame(new HeartbeatFrame(), $this->writeBuffer);
+        $this->flushWriteBuffer()->then(
+            function () {
+                if (is_callable(
+                    isset($this->options['heartbeat_callback'])
+                        ? $this->options['heartbeat_callback']
+                        : null
+                )) {
+//                    ($this->options['heartbeat_callback'])($this);
+                    $this->options['heartbeat_callback']->call($this);
+                }
+            },
+            function (\Throwable $throwable){
+                if($this->logger){
+                    $this->logger->debug(
+                        'OnHeartbeatFailed',
+                        [
+                            $throwable->getMessage(),
+                            $throwable->getCode(),
+                            $throwable->getFile(),
+                            $throwable->getLine()
+                        ]
+                    );
+                }
+                Worker::stopAll(0,"RabbitMQ client heartbeat failed: [{$throwable->getCode()}] {$throwable->getMessage()}");
+            });
+    }
+
+}

+ 39 - 36
src/Client.php

@@ -6,7 +6,6 @@ use Bunny\Channel;
 use Bunny\Message;
 use Illuminate\Support\Arr;
 use Psr\Log\LoggerInterface;
-use Workerman\RabbitMQ\Client as AsyncClient;
 
 class Client
 {
@@ -31,7 +30,7 @@ class Client
             return $this->config['error_callback'];
         } else {
             return function (\Throwable $throwable) {
-                $this->logger?->error('['.getmypid().']:'.$throwable->getMessage().PHP_EOL, [$throwable->getTraceAsString()]);
+                $this->logger?->error('['.getmypid().']Consumer:'.$throwable->getMessage().PHP_EOL, [$throwable->getTraceAsString()]);
             };
         }
     }
@@ -81,44 +80,48 @@ class Client
     {
         $reject = $this->getErrorCallback();
 
-        (new AsyncClient($this->config, $this->logger))->connect()
-            ->then(function (AsyncClient $client) {
-                return $client->channel();
-            }, $reject)
-            ->then(function (Channel $channel) {
-                return $channel->qos($this->qos['prefetch_size'] ?? 0, $this->qos['prefetch_count'] ?? 0)
-                    ->then(function () use ($channel) {
-                        return $this->asyncDeclare($channel)->then(function () use ($channel) {
-                            return $channel;
+        try {
+            (new AsyncClient($this->config, $this->logger))->connect()
+                ->then(function (AsyncClient $client) {
+                    return $client->channel();
+                }, $reject)
+                ->then(function (Channel $channel) {
+                    return $channel->qos($this->qos['prefetch_size'] ?? 0, $this->qos['prefetch_count'] ?? 0)
+                        ->then(function () use ($channel) {
+                            return $this->asyncDeclare($channel)->then(function () use ($channel) {
+                                return $channel;
+                            });
                         });
-                    });
-            }, $reject)
-            ->then(function (Channel $channel) use ($reject, $consumer) {
-                $this->logger?->debug('Waiting:['.getmypid().'] Waiting for messages.', []);
-                $channel->consume(
-                    function (Message $message, Channel $channel, AsyncClient $client) use ($reject, $consumer) {
-                        $this->logger?->info('Received:['.getmypid().']: '.$message->content, [$message]);
+                }, $reject)
+                ->then(function (Channel $channel) use ($reject, $consumer) {
+                    $this->logger?->debug('Waiting:['.getmypid().'] Waiting for messages.', []);
+                    $channel->run(
+                        function (Message $message, Channel $channel, AsyncClient $client) use ($reject, $consumer) {
+                            $this->logger?->info('Received:['.getmypid().']: '.$message->content, [$message]);
 
-                        try {
-                            call_user_func_array($consumer, [$message, $channel, $client]);
-                        } catch (\Throwable $throw) {
-                            if (!$this->consume['noAck']) {
-                                $channel->nack($message);
+                            try {
+                                call_user_func_array($consumer, [$message, $channel, $client]);
+                            } catch (\Throwable $throw) {
+                                if (!$this->consume['noAck']) {
+                                    $channel->nack($message);
+                                }
+                                $reject($throw);
                             }
-                            $reject($throw);
-                        }
 
-                        return;
-                    },
-                    $this->queue,
-                    $this->consume['consumerTag'],
-                    $this->consume['noLocal'],
-                    $this->consume['noAck'],
-                    $this->consume['exclusive'],
-                    $this->consume['nowait'],
-                    $this->consume['arguments'],
-                );
-            }, $reject);
+                            return;
+                        },
+                        $this->queue,
+                        $this->consume['consumerTag'],
+                        $this->consume['noLocal'],
+                        $this->consume['noAck'],
+                        $this->consume['exclusive'],
+                        $this->consume['nowait'],
+                        $this->consume['arguments'],
+                    );
+                }, $reject);
+        } catch (\Throwable $throwable) {
+            $reject($throwable);
+        }
     }
 
     public function syncDeclare(Channel $channel)

+ 115 - 47
src/Producer.php

@@ -6,7 +6,6 @@ use Bunny\Channel;
 use Bunny\Client;
 use Illuminate\Support\Arr;
 use Psr\Log\LoggerInterface;
-use Workerman\RabbitMQ\Client as AsyncClient;
 
 class Producer
 {
@@ -32,15 +31,81 @@ class Producer
 
     protected array $queueDeclare = [];
 
-    public function __construct(
+    private static ?array $instances = null;
+
+    private function __construct(
         protected array $rabbitmqConfig,
         protected ?LoggerInterface $logger = null)
     {
     }
 
-    public static function connect(array $rabbitmqConfig, LoggerInterface $logger = null): self
+    private function __clone()
+    {
+    }
+
+    public function setLogger(?LoggerInterface $logger)
+    {
+        $this->logger = $logger;
+    }
+
+    public static function getInstance(array $rabbitmqConfig, ?LoggerInterface $logger = null): self
+    {
+        ksort($rabbitmqConfig);
+        $key = md5(json_encode($rabbitmqConfig));
+        if (isset(self::$instances[$key])) {
+            $obj = self::$instances[$key];
+            $obj->setLogger($logger);
+
+            return self::$instances[$key];
+        }
+
+        if (empty(self::$instances)) {
+            $obj = new self($rabbitmqConfig, $logger);
+            self::$instances[$key] = $obj;
+        }
+
+        return self::$instances[$key];
+    }
+
+    public static function connect(array $rabbitmqConfig, ?LoggerInterface $logger = null): self
+    {
+        return self::getInstance($rabbitmqConfig, $logger);
+    }
+
+    protected function getAsyncConnect(array $rabbitmqConfig, ?LoggerInterface $logger = null)
+    {
+        static $connect = null;
+        static $client = null;
+
+        if (null === $connect) {
+            $client = new AsyncClient($rabbitmqConfig, $logger);
+            $connect = $client->connect();
+        }
+
+        if ($client->isConnected()) {
+            return $connect;
+        }
+        $connect = $client->connect();
+
+        return $connect;
+    }
+
+    protected function getSyncConnect(array $rabbitmqConfig)
     {
-        return new self($rabbitmqConfig, $logger);
+        static $synConnect = null;
+        static $synClient = null;
+
+        if (null === $synConnect) {
+            $synClient = (new Client($rabbitmqConfig));
+            $synConnect = $synClient->connect();
+        }
+
+        if ($synClient->isConnected()) {
+            return $synConnect;
+        }
+        $synConnect = $synClient->connect();
+
+        return $synConnect;
     }
 
     protected function setDeclare(array $exchangeOrQueueDeclare, string $exchange = '', string $exchangeType = '')
@@ -59,73 +124,76 @@ class Producer
                 $this->exchangeDeclare['passive'], $this->exchangeDeclare['durable'],
                 $this->exchangeDeclare['auto_delete'], $this->exchangeDeclare['internal'],
                 $this->exchangeDeclare['nowait'], $this->exchangeDeclare['arguments']);
-        } else {
-            return $channel->queueDeclare($routingOrQueue,
-                $this->queueDeclare['passive'], $this->queueDeclare['durable'],
-                $this->queueDeclare['exclusive'], $this->queueDeclare['auto_delete'],
-                $this->queueDeclare['nowait'], $this->queueDeclare['arguments']);
         }
+
+        return $channel->queueDeclare($routingOrQueue,
+            $this->queueDeclare['passive'], $this->queueDeclare['durable'],
+            $this->queueDeclare['exclusive'], $this->queueDeclare['auto_delete'],
+            $this->queueDeclare['nowait'], $this->queueDeclare['arguments']);
     }
 
     public function publishAsync(string $data, string $exchange = '', string $exchangeType = '', string $routingOrQueue = '',
-        array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false
+        array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false,
     ) {
-        $this->setDeclare($exchangeOrQueueDeclare, $exchange, $exchangeType);
-
         $reject = function (\Throwable $throwable) {
-            $this->logger?->error('['.getmypid().']:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
+            $this->logger?->error('['.getmypid().']PUBLIAH ASYNC:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
         };
-        (new AsyncClient($this->rabbitmqConfig, $this->logger))->connect()
-            ->then(function (AsyncClient $client) {
-                return $client->channel();
-            }, $reject)
-            ->then(function (Channel $channel) use ($exchange, $exchangeType, $routingOrQueue) {
-                return $this->declare($channel, $routingOrQueue, $exchange, $exchangeType)
-                    ->then(function () use ($channel) {
-                        return $channel;
-                    });
-            }, $reject)
-            ->then(function (Channel $channel) use ($exchange, $routingOrQueue, $data, $headers, $mandatory, $immediate) {
-                $this->logger?->info('('.getmygid().') Sending :'.$data, [__CLASS__]);
-
-                return $channel->publish($data, $headers, $exchange, $routingOrQueue, $mandatory, $immediate)
-                    ->then(function () use ($channel) {
-                        return $channel;
+        try {
+            $this->setDeclare($exchangeOrQueueDeclare, $exchange, $exchangeType);
+            $this->getAsyncConnect($this->rabbitmqConfig, $this->logger)
+                ->then(function (AsyncClient $client) {
+                    return $client->channel();
+                }, $reject)
+                ->then(function (Channel $channel) use ($exchange, $exchangeType, $routingOrQueue) {
+                    return $this->declare($channel, $routingOrQueue, $exchange, $exchangeType)
+                        ->then(function () use ($channel) {
+                            return $channel;
+                        })
+                    ;
+                }, $reject)
+                ->then(function (Channel $channel) use ($exchange, $routingOrQueue, $data, $headers, $mandatory, $immediate) {
+                    $this->logger?->info('('.getmygid().') Sending :'.$data, [__CLASS__]);
+
+                    return $channel->publish($data, $headers, $exchange, $routingOrQueue, $mandatory, $immediate)
+                        ->then(function () use ($channel) {
+                            return $channel;
+                        })
+                    ;
+                }, $reject)
+                ->then(function (Channel $channel) use ($data) {
+                    $this->logger?->info('('.getmygid().') Sent :'.$data, [__CLASS__]);
+
+                    $client = $channel->getClient();
+
+                    return $channel->close()->then(function () use ($client) {
+                        return $client;
                     });
-            }, $reject)
-            ->then(function (Channel $channel) use ($data) {
-                $this->logger?->info('('.getmygid().') Sent :'.$data, [__CLASS__]);
-
-                $client = $channel->getClient();
-
-                return $channel->close()->then(function () use ($client) {
-                    return $client;
-                });
-            }, $reject)
-            ->then(function (AsyncClient $client) {
-                $client->disconnect();
-            }, $reject);
+                }, $reject)
+            ;
+        } catch (\Throwable $throwable) {
+            $reject($throwable);
+        }
     }
 
     public function publishSync(string $data, string $exchange = '', string $exchangeType = '', string $routingOrQueue = '',
-        array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false
+        array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false,
     ) {
         $this->setDeclare($exchangeOrQueueDeclare, $exchange, $exchangeType);
 
         $rabbitmqConfig = Arr::only($this->rabbitmqConfig, ['host', 'port', 'vhost', 'user', 'password']);
 
         try {
-            $client = (new Client($rabbitmqConfig))->connect();
+            $client = $this->getSyncConnect($rabbitmqConfig);
             $channel = $client->channel();
             $this->declare($channel, $routingOrQueue, $exchange, $exchangeType);
             $published = $channel->publish($data, $headers, $exchange, $routingOrQueue, $mandatory, $immediate);
+            $client->removeChannel($channel->getChannelId());
         } catch (\Throwable $throwable) {
-            $this->logger?->error('['.getmypid().']:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
+            $this->logger?->error('['.getmypid().']PUBLIAH SYNC:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
         } finally {
             isset($channel) && $channel->close();
-            isset($client) && $client->disconnect();
         }
 
         return $published ?? false;
     }
-}
+}