AbstractConsumer.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. <?php
  2. namespace Roiwk\Rabbitmq;
  3. use Psr\Log\LoggerInterface;
  4. abstract class AbstractConsumer implements Consumable
  5. {
  6. protected string $exchange = '';
  7. protected string $exchangeType = '';
  8. protected string $queue = '';
  9. // topic exchange - routingKeys
  10. protected array $routingKeys = [];
  11. protected array $exchangeDeclareDefault = [
  12. 'passive' => false,
  13. 'durable' => true,
  14. 'auto_delete' => false,
  15. 'internal' => false,
  16. 'nowait' => false,
  17. 'arguments' => [],
  18. ];
  19. protected array $queueDeclareDefault = [
  20. 'passive' => false,
  21. 'durable' => true,
  22. 'auto_delete' => false,
  23. 'exclusive' => false,
  24. 'nowait' => false,
  25. 'arguments' => [],
  26. ];
  27. protected array $queueBindDefault = [
  28. 'nowait' => false,
  29. 'arguments' => [],
  30. ];
  31. protected array $consumeDefault = [
  32. 'consumerTag' => '',
  33. 'noLocal' => false,
  34. 'noAck' => false,
  35. 'exclusive' => false,
  36. 'nowait' => false,
  37. 'arguments' => [],
  38. ];
  39. protected array $qosDefault = [
  40. 'prefetch_size' => 0,
  41. 'prefetch_count' => 1,
  42. ];
  43. protected array $exchangeDeclare = [];
  44. protected array $queueDeclare = [];
  45. protected array $queueBind = [];
  46. protected array $consume = [];
  47. protected array $qos = [];
  48. protected $client;
  49. protected bool $async = true;
  50. public function __construct(
  51. protected array $rabbitmqConfig,
  52. protected ?LoggerInterface $logger = null,
  53. ){
  54. $this->init();
  55. }
  56. public function init()
  57. {
  58. $initProperty = [
  59. 'exchangeDeclare' => 'exchangeDeclareDefault',
  60. 'queueDeclare' => 'queueDeclareDefault',
  61. 'queueBind' => 'queueBindDefault',
  62. 'consume' => 'consumeDefault',
  63. 'qos' => 'qosDefault',
  64. ];
  65. array_walk($initProperty, function ($default, $current) {
  66. if (empty($this->{$current})) {
  67. $this->{$current} = $this->{$default};
  68. } else {
  69. $this->{$current} = array_replace_recursive($this->{$default}, $this->{$current});
  70. }
  71. });
  72. $this->client = new Client(
  73. $this->rabbitmqConfig, $this->logger, $this->exchange, $this->exchangeType,
  74. $this->queue, $this->routingKeys, $this->exchangeDeclare, $this->queueDeclare,
  75. $this->queueBind, $this->consume, $this->qos
  76. );
  77. }
  78. public function onWorkerStart($worker): void
  79. {
  80. if (is_a(static::class, AbstractConsumer::class, true) || is_subclass_of(static::class, Consumable::class)) {
  81. if ($this->async) {
  82. $this->client->asyncProcess([$this, 'consume']);
  83. } else {
  84. $this->client->syncProcess([$this, 'consume']);
  85. }
  86. } else {
  87. return;
  88. }
  89. }
  90. }