Переглянути джерело

fix(consumer): init client in WorkerStart

roiwk 11 місяців тому
батько
коміт
20bf5a5d7d
2 змінених файлів з 12 додано та 17 видалено
  1. 5 13
      src/AbstractConsumer.php
  2. 7 4
      src/GroupConsumers.php

+ 5 - 13
src/AbstractConsumer.php

@@ -57,25 +57,16 @@ abstract class AbstractConsumer implements Consumable
     protected array $consume = [];
     protected array $qos = [];
 
-    protected $client;
     protected bool $async = true;
 
     public function __construct(
         protected array $rabbitmqConfig,
         protected ?LoggerInterface $logger = null,
     ){
-        if ($this->async) {
-            $this->rabbitmqConfig = array_merge_recursive($this->rabbitmqConfig, [
-                'async_connect' => true,
-                'persistent' => true,
-                'path' => '/',
-            ]);
-        }
 
-        $this->init();
     }
 
-    public function init()
+    public function getClient()
     {
         $initProperty = [
             'exchangeDeclare' => 'exchangeDeclareDefault',
@@ -93,7 +84,7 @@ abstract class AbstractConsumer implements Consumable
             }
         });
 
-        $this->client = new Client(
+        return new Client(
             $this->rabbitmqConfig, $this->logger, $this->exchange, $this->exchangeType,
             $this->queue, $this->routingKeys, $this->exchangeDeclare, $this->queueDeclare,
             $this->queueBind, $this->consume, $this->qos
@@ -102,11 +93,12 @@ abstract class AbstractConsumer implements Consumable
 
     public function onWorkerStart($worker): void
     {
+        $client = $this->getClient();
         if (is_a(static::class, AbstractConsumer::class, true) || is_subclass_of(static::class, Consumable::class)) {
             if ($this->async) {
-                $this->client->asyncProcess([$this, 'consume']);
+                $client->asyncProcess([$this, 'consume']);
             } else {
-                $this->client->syncProcess([$this, 'consume']);
+                $client->syncProcess([$this, 'consume']);
             }
         } else {
             return;

+ 7 - 4
src/GroupConsumers.php

@@ -3,7 +3,6 @@
 namespace Roiwk\Rabbitmq;
 
 use Psr\Log\LoggerInterface;
-use support\Container;
 
 /**
  * 分组消费.(模仿webman-redis queue).
@@ -38,9 +37,13 @@ class GroupConsumers
                     continue;
                 }
 
-                $consumer = Container::make($class, [
-                    'rabbitmqConfig' => $this->rabbitmqConfig, 'logger' => $this->logger
-                ]);
+                if (class_exists('support\Container', false)) {
+                    $consumer = \support\Container::make($class, [
+                        'rabbitmqConfig' => $this->rabbitmqConfig, 'logger' => $this->logger
+                    ]);
+                } else {
+                    $consumer = new $class($this->rabbitmqConfig, $this->logger);
+                }
                 $consumer->onWorkerStart($worker);
             }
         }