Producer.php 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. <?php
  2. namespace Roiwk\Rabbitmq;
  3. use Bunny\Channel;
  4. use Bunny\Client;
  5. use Illuminate\Support\Arr;
  6. use Psr\Log\LoggerInterface;
  7. class Producer
  8. {
  9. protected array $exchangeDeclareDefault = [
  10. 'passive' => false,
  11. 'durable' => true,
  12. 'auto_delete' => false,
  13. 'internal' => false,
  14. 'nowait' => false,
  15. 'arguments' => [],
  16. ];
  17. protected array $queueDeclareDefault = [
  18. 'passive' => false,
  19. 'durable' => true,
  20. 'exclusive' => false,
  21. 'auto_delete' => false,
  22. 'nowait' => false,
  23. 'arguments' => [],
  24. ];
  25. protected array $exchangeDeclare = [];
  26. protected array $queueDeclare = [];
  27. private static ?array $instances = null;
  28. private function __construct(
  29. protected array $rabbitmqConfig,
  30. protected ?LoggerInterface $logger = null)
  31. {
  32. }
  33. private function __clone()
  34. {
  35. }
  36. public function setLogger(?LoggerInterface $logger)
  37. {
  38. $this->logger = $logger;
  39. }
  40. public static function getInstance(array $rabbitmqConfig, ?LoggerInterface $logger = null): self
  41. {
  42. ksort($rabbitmqConfig);
  43. $key = md5(json_encode($rabbitmqConfig));
  44. if (isset(self::$instances[$key])) {
  45. $obj = self::$instances[$key];
  46. $obj->setLogger($logger);
  47. return self::$instances[$key];
  48. }
  49. if (empty(self::$instances)) {
  50. $obj = new self($rabbitmqConfig, $logger);
  51. self::$instances[$key] = $obj;
  52. }
  53. return self::$instances[$key];
  54. }
  55. public static function connect(array $rabbitmqConfig, ?LoggerInterface $logger = null): self
  56. {
  57. return self::getInstance($rabbitmqConfig, $logger);
  58. }
  59. protected function getAsyncConnect(array $rabbitmqConfig, ?LoggerInterface $logger = null)
  60. {
  61. static $connect = null;
  62. static $client = null;
  63. if (null === $connect) {
  64. $client = new AsyncClient($rabbitmqConfig, $logger);
  65. $connect = $client->connect();
  66. }
  67. if ($client->isConnected()) {
  68. return $connect;
  69. }
  70. $connect = $client->connect();
  71. return $connect;
  72. }
  73. protected function getSyncConnect(array $rabbitmqConfig)
  74. {
  75. static $synConnect = null;
  76. static $synClient = null;
  77. if (null === $synConnect) {
  78. $synClient = (new Client($rabbitmqConfig));
  79. $synConnect = $synClient->connect();
  80. }
  81. if ($synClient->isConnected()) {
  82. return $synConnect;
  83. }
  84. $synConnect = $synClient->connect();
  85. return $synConnect;
  86. }
  87. protected function setDeclare(array $exchangeOrQueueDeclare, string $exchange = '', string $exchangeType = '')
  88. {
  89. if (!empty($exchange) && !empty($exchangeType)) {
  90. $this->exchangeDeclare = array_replace_recursive($this->exchangeDeclareDefault, $exchangeOrQueueDeclare);
  91. } else {
  92. $this->queueDeclare = array_replace_recursive($this->queueDeclareDefault, $exchangeOrQueueDeclare);
  93. }
  94. }
  95. protected function declare(Channel $channel, string $routingOrQueue, string $exchange = '', string $exchangeType = '')
  96. {
  97. if (!empty($exchange) && !empty($exchangeType)) {
  98. return $channel->exchangeDeclare($exchange, $exchangeType,
  99. $this->exchangeDeclare['passive'], $this->exchangeDeclare['durable'],
  100. $this->exchangeDeclare['auto_delete'], $this->exchangeDeclare['internal'],
  101. $this->exchangeDeclare['nowait'], $this->exchangeDeclare['arguments']);
  102. }
  103. return $channel->queueDeclare($routingOrQueue,
  104. $this->queueDeclare['passive'], $this->queueDeclare['durable'],
  105. $this->queueDeclare['exclusive'], $this->queueDeclare['auto_delete'],
  106. $this->queueDeclare['nowait'], $this->queueDeclare['arguments']);
  107. }
  108. public function publishAsync(string $data, string $exchange = '', string $exchangeType = '', string $routingOrQueue = '',
  109. array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false,
  110. ) {
  111. $reject = function (\Throwable $throwable) {
  112. $this->logger?->error('['.getmypid().']PUBLIAH ASYNC:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
  113. };
  114. try {
  115. $this->setDeclare($exchangeOrQueueDeclare, $exchange, $exchangeType);
  116. $this->getAsyncConnect($this->rabbitmqConfig, $this->logger)
  117. ->then(function (AsyncClient $client) {
  118. return $client->channel();
  119. }, $reject)
  120. ->then(function (Channel $channel) use ($exchange, $exchangeType, $routingOrQueue) {
  121. return $this->declare($channel, $routingOrQueue, $exchange, $exchangeType)
  122. ->then(function () use ($channel) {
  123. return $channel;
  124. })
  125. ;
  126. }, $reject)
  127. ->then(function (Channel $channel) use ($exchange, $routingOrQueue, $data, $headers, $mandatory, $immediate) {
  128. $this->logger?->info('('.getmygid().') Sending :'.$data, [__CLASS__]);
  129. return $channel->publish($data, $headers, $exchange, $routingOrQueue, $mandatory, $immediate)
  130. ->then(function () use ($channel) {
  131. return $channel;
  132. })
  133. ;
  134. }, $reject)
  135. ->then(function (Channel $channel) use ($data) {
  136. $this->logger?->info('('.getmygid().') Sent :'.$data, [__CLASS__]);
  137. $client = $channel->getClient();
  138. return $channel->close()->then(function () use ($client) {
  139. return $client;
  140. });
  141. }, $reject)
  142. ;
  143. } catch (\Throwable $throwable) {
  144. $reject($throwable);
  145. }
  146. }
  147. public function publishSync(string $data, string $exchange = '', string $exchangeType = '', string $routingOrQueue = '',
  148. array $exchangeOrQueueDeclare = [], array $headers = [], bool $mandatory = false, bool $immediate = false,
  149. ) {
  150. $this->setDeclare($exchangeOrQueueDeclare, $exchange, $exchangeType);
  151. $rabbitmqConfig = Arr::only($this->rabbitmqConfig, ['host', 'port', 'vhost', 'user', 'password']);
  152. try {
  153. $client = $this->getSyncConnect($rabbitmqConfig);
  154. $channel = $client->channel();
  155. $this->declare($channel, $routingOrQueue, $exchange, $exchangeType);
  156. $published = $channel->publish($data, $headers, $exchange, $routingOrQueue, $mandatory, $immediate);
  157. $client->removeChannel($channel->getChannelId());
  158. } catch (\Throwable $throwable) {
  159. $this->logger?->error('['.getmypid().']PUBLIAH SYNC:'.$throwable->getMessage().PHP_EOL.$throwable->getTraceAsString(), [__CLASS__]);
  160. } finally {
  161. isset($channel) && $channel->close();
  162. }
  163. return $published ?? false;
  164. }
  165. }