| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- <?php
- namespace Roiwk\Rabbitmq;
- use Psr\Log\LoggerInterface;
- abstract class AbstractConsumer implements Consumable
- {
- protected string $exchange = '';
- protected string $exchangeType = '';
- protected string $queue = '';
- // topic exchange - routingKeys
- protected array $routingKeys = [];
- protected array $exchangeDeclareDefault = [
- 'passive' => false,
- 'durable' => true,
- 'auto_delete' => false,
- 'internal' => false,
- 'nowait' => false,
- 'arguments' => [],
- ];
- protected array $queueDeclareDefault = [
- 'passive' => false,
- 'durable' => true,
- 'auto_delete' => false,
- 'exclusive' => false,
- 'nowait' => false,
- 'arguments' => [],
- ];
- protected array $queueBindDefault = [
- 'nowait' => false,
- 'arguments' => [],
- ];
- protected array $consumeDefault = [
- 'consumerTag' => '',
- 'noLocal' => false,
- 'noAck' => false,
- 'exclusive' => false,
- 'nowait' => false,
- 'arguments' => [],
- ];
- protected array $qosDefault = [
- 'prefetch_size' => 0,
- 'prefetch_count' => 1,
- ];
- protected array $exchangeDeclare = [];
- protected array $queueDeclare = [];
- protected array $queueBind = [];
- protected array $consume = [];
- protected array $qos = [];
- protected $client;
- protected bool $async = true;
- public function __construct(
- protected array $rabbitmqConfig,
- protected ?LoggerInterface $logger = null,
- ){
- $this->init();
- }
- public function init()
- {
- $initProperty = [
- 'exchangeDeclare' => 'exchangeDeclareDefault',
- 'queueDeclare' => 'queueDeclareDefault',
- 'queueBind' => 'queueBindDefault',
- 'consume' => 'consumeDefault',
- 'qos' => 'qosDefault',
- ];
- array_walk($initProperty, function ($default, $current) {
- if (empty($this->{$current})) {
- $this->{$current} = $this->{$default};
- } else {
- $this->{$current} = array_replace_recursive($this->{$default}, $this->{$current});
- }
- });
- $this->client = new Client(
- $this->rabbitmqConfig, $this->logger, $this->exchange, $this->exchangeType,
- $this->queue, $this->routingKeys, $this->exchangeDeclare, $this->queueDeclare,
- $this->queueBind, $this->consume, $this->qos
- );
- }
- public function onWorkerStart($worker): void
- {
- if (is_a(static::class, AbstractConsumer::class, true) || is_subclass_of(static::class, Consumable::class)) {
- if ($this->async) {
- $this->client->asyncProcess([$this, 'consume']);
- } else {
- $this->client->syncProcess([$this, 'consume']);
- }
- } else {
- return;
- }
- }
- }
|