From d2a17abef060e22dfd7b25e289727f73e696db01 Mon Sep 17 00:00:00 2001 From: Andrey Date: Tue, 7 Nov 2023 17:36:18 +0300 Subject: [PATCH] Add "no_ack" consumer option (issue-717) (#718) * Add no_ack consumer option * Replace consumer_options to options * Add options for anon consumer * Add Unit tests for DI * Update README.md * Update CHANGELOG --- CHANGELOG | 3 ++ DependencyInjection/Configuration.php | 30 +++++++++++++++++ .../OldSoundRabbitMqExtension.php | 33 +++++++++++++++++++ README.md | 2 ++ RabbitMq/BaseAmqp.php | 13 ++++++++ RabbitMq/BaseConsumer.php | 2 +- RabbitMq/BatchConsumer.php | 2 +- RabbitMq/MultipleConsumer.php | 2 +- Tests/DependencyInjection/Fixtures/test.yml | 11 +++++++ .../OldSoundRabbitMqExtensionTest.php | 32 ++++++++++++++++++ 10 files changed, 127 insertions(+), 3 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index 202b4c9e..cc572205 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,6 @@ +- 2023-11-07 + * Add consumer option `no_ack` + - 2021-05-15 * Add possibility to use multiple RabbitMQ hosts diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 503ecd9e..811cc601 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -185,6 +185,12 @@ protected function addConsumers(ArrayNodeDefinition $node) ->end() ->end() ->scalarNode('auto_setup_fabric')->defaultTrue()->end() + ->arrayNode('options') + ->canBeUnset() + ->children() + ->booleanNode('no_ack')->defaultFalse()->end() + ->end() + ->end() ->arrayNode('qos_options') ->canBeUnset() ->children() @@ -217,6 +223,12 @@ protected function addMultipleConsumers(ArrayNodeDefinition $node) ->scalarNode('idle_timeout_exit_code')->end() ->scalarNode('timeout_wait')->end() ->scalarNode('auto_setup_fabric')->defaultTrue()->end() + ->arrayNode('options') + ->canBeUnset() + ->children() + ->booleanNode('no_ack')->defaultFalse()->end() + ->end() + ->end() ->arrayNode('graceful_max_execution') ->canBeUnset() ->children() @@ -265,6 +277,12 @@ protected function addDynamicConsumers(ArrayNodeDefinition $node) ->end() ->end() ->scalarNode('auto_setup_fabric')->defaultTrue()->end() + ->arrayNode('options') + ->canBeUnset() + ->children() + ->booleanNode('no_ack')->defaultFalse()->end() + ->end() + ->end() ->arrayNode('qos_options') ->canBeUnset() ->children() @@ -311,6 +329,12 @@ protected function addBatchConsumers(ArrayNodeDefinition $node) ->end() ->end() ->scalarNode('auto_setup_fabric')->defaultTrue()->end() + ->arrayNode('options') + ->canBeUnset() + ->children() + ->booleanNode('no_ack')->defaultFalse()->end() + ->end() + ->end() ->arrayNode('qos_options') ->children() ->scalarNode('prefetch_size')->defaultValue(0)->end() @@ -339,6 +363,12 @@ protected function addAnonConsumers(ArrayNodeDefinition $node) ->children() ->scalarNode('connection')->defaultValue('default')->end() ->scalarNode('callback')->isRequired()->end() + ->arrayNode('options') + ->canBeUnset() + ->children() + ->booleanNode('no_ack')->defaultFalse()->end() + ->end() + ->end() ->end() ->end() ->end() diff --git a/DependencyInjection/OldSoundRabbitMqExtension.php b/DependencyInjection/OldSoundRabbitMqExtension.php index e1ac6945..ddb98ec0 100644 --- a/DependencyInjection/OldSoundRabbitMqExtension.php +++ b/DependencyInjection/OldSoundRabbitMqExtension.php @@ -257,6 +257,12 @@ protected function loadConsumers() if (!$consumer['auto_setup_fabric']) { $definition->addMethodCall('disableAutoSetupFabric'); } + if (isset($consumer['options'])) { + $definition->addMethodCall( + 'setConsumerOptions', + [$this->normalizeArgumentKeys($consumer['options'])] + ); + } $this->injectConnection($definition, $consumer['connection']); if ($this->collectorEnabled) { @@ -349,6 +355,12 @@ protected function loadMultipleConsumers() if (!$consumer['auto_setup_fabric']) { $definition->addMethodCall('disableAutoSetupFabric'); } + if (isset($consumer['options'])) { + $definition->addMethodCall( + 'setConsumerOptions', + [$this->normalizeArgumentKeys($consumer['options'])] + ); + } $this->injectConnection($definition, $consumer['connection']); if ($this->collectorEnabled) { @@ -424,6 +436,12 @@ protected function loadDynamicConsumers() if (!$consumer['auto_setup_fabric']) { $definition->addMethodCall('disableAutoSetupFabric'); } + if (isset($consumer['options'])) { + $definition->addMethodCall( + 'setConsumerOptions', + [$this->normalizeArgumentKeys($consumer['options'])] + ); + } $this->injectConnection($definition, $consumer['connection']); if ($this->collectorEnabled) { @@ -485,6 +503,13 @@ protected function loadBatchConsumers() $definition->addMethodCall('disableAutoSetupFabric'); } + if (isset($consumer['options'])) { + $definition->addMethodCall( + 'setConsumerOptions', + [$this->normalizeArgumentKeys($consumer['options'])] + ); + } + if ($consumer['keep_alive']) { $definition->addMethodCall('keepAlive'); } @@ -512,6 +537,14 @@ protected function loadAnonConsumers() ->addTag('old_sound_rabbit_mq.anon_consumer') ->addMethodCall('setExchangeOptions', [$this->normalizeArgumentKeys($anon['exchange_options'])]) ->addMethodCall('setCallback', [[new Reference($anon['callback']), 'execute']]); + + if (isset($anon['options'])) { + $definition->addMethodCall( + 'setConsumerOptions', + [$this->normalizeArgumentKeys($anon['options'])] + ); + } + $this->injectConnection($definition, $anon['connection']); if ($this->collectorEnabled) { $this->injectLoggedChannel($definition, $key, $anon['connection']); diff --git a/README.md b/README.md index 4b48d0da..d5fefc05 100644 --- a/README.md +++ b/README.md @@ -145,6 +145,8 @@ old_sound_rabbit_mq: exchange_options: {name: 'upload-picture', type: direct} queue_options: {name: 'upload-picture'} callback: upload_picture_service + options: + no_ack: false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details. ``` Here we configure the connection service and the message endpoints that our application will have. In this example your service container will contain the service `old_sound_rabbit_mq.upload_picture_producer` and `old_sound_rabbit_mq.upload_picture_consumer`. The later expects that there's a service called `upload_picture_service`. diff --git a/RabbitMq/BaseAmqp.php b/RabbitMq/BaseAmqp.php index a05432cd..fd65438b 100644 --- a/RabbitMq/BaseAmqp.php +++ b/RabbitMq/BaseAmqp.php @@ -49,6 +49,10 @@ abstract class BaseAmqp 'declare' => true, ]; + protected $consumerOptions = [ + 'no_ack' => false, + ]; + /** * @var EventDispatcherInterface|null */ @@ -155,6 +159,15 @@ public function setQueueOptions(array $options = []) $this->queueOptions = array_merge($this->queueOptions, $options); } + /** + * @param array $options + * @return void + */ + public function setConsumerOptions(array $options = []) + { + $this->consumerOptions = array_merge($this->consumerOptions, $options); + } + /** * @param string $routingKey * @return void diff --git a/RabbitMq/BaseConsumer.php b/RabbitMq/BaseConsumer.php index 5a28b803..c3935ee8 100644 --- a/RabbitMq/BaseConsumer.php +++ b/RabbitMq/BaseConsumer.php @@ -68,7 +68,7 @@ protected function setupConsumer() if ($this->autoSetupFabric) { $this->setupFabric(); } - $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, [$this, 'processMessage']); + $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, $this->consumerOptions['no_ack'], false, false, [$this, 'processMessage']); } public function processMessage(AMQPMessage $msg) diff --git a/RabbitMq/BatchConsumer.php b/RabbitMq/BatchConsumer.php index ac10ed38..88756389 100644 --- a/RabbitMq/BatchConsumer.php +++ b/RabbitMq/BatchConsumer.php @@ -355,7 +355,7 @@ protected function setupConsumer() $this->setupFabric(); } - $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, [$this, 'processMessage']); + $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, $this->consumerOptions['no_ack'], false, false, [$this, 'processMessage']); } /** diff --git a/RabbitMq/MultipleConsumer.php b/RabbitMq/MultipleConsumer.php index fbb53b57..d8e6f65b 100644 --- a/RabbitMq/MultipleConsumer.php +++ b/RabbitMq/MultipleConsumer.php @@ -64,7 +64,7 @@ protected function setupConsumer() //PHP 5.3 Compliant $currentObject = $this; - $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, false, false, false, function (AMQPMessage $msg) use ($currentObject, $name) { + $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, $this->consumerOptions['no_ack'], false, false, function (AMQPMessage $msg) use ($currentObject, $name) { $currentObject->processQueueMessage($name, $msg); }); } diff --git a/Tests/DependencyInjection/Fixtures/test.yml b/Tests/DependencyInjection/Fixtures/test.yml index 0c0e6119..4ec9d171 100644 --- a/Tests/DependencyInjection/Fixtures/test.yml +++ b/Tests/DependencyInjection/Fixtures/test.yml @@ -122,6 +122,8 @@ old_sound_rabbit_mq: - 'android.#.upload' - 'iphone.upload' callback: foo.callback + options: + no_ack: true default_consumer: exchange_options: @@ -169,6 +171,8 @@ old_sound_rabbit_mq: - 'iphone.upload' callback: foo.multiple_test2.callback queues_provider: foo.queues_provider + options: + no_ack: true dynamic_consumers: foo_dyn_consumer: @@ -178,6 +182,9 @@ old_sound_rabbit_mq: type: direct callback: foo.dynamic.callback queue_options_provider: foo.dynamic.provider + options: + no_ack: true + bar_dyn_consumer: connection: bar_default exchange_options: @@ -185,9 +192,11 @@ old_sound_rabbit_mq: type: direct callback: bar.dynamic.callback queue_options_provider: bar.dynamic.provider + bindings: - {exchange: foo, destination: bar, routing_key: baz} - {exchange: moo, connection: default2, destination: cow, nowait: true, destination_is_exchange: true, arguments: {moo: cow}} + anon_consumers: foo_anon_consumer: connection: foo_connection @@ -202,6 +211,8 @@ old_sound_rabbit_mq: arguments: null ticket: null callback: foo_anon.callback + options: + no_ack: true default_anon_consumer: exchange_options: diff --git a/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php b/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php index c3c09164..bf7b2130 100644 --- a/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php +++ b/Tests/DependencyInjection/OldSoundRabbitMqExtensionTest.php @@ -486,6 +486,14 @@ public function testFooConsumerDefinition() 'setTimeoutWait', [3], ], + [ + 'setConsumerOptions', + [ + [ + 'no_ack' => true, + ], + ], + ], ], $definition->getMethodCalls() ); @@ -670,6 +678,14 @@ public function testMultipleConsumerDefinition() 'setTimeoutWait', [3], ], + [ + 'setConsumerOptions', + [ + [ + 'no_ack' => true, + ], + ], + ], ], $definition->getMethodCalls() ); @@ -714,6 +730,14 @@ public function testDynamicConsumerDefinition() new Reference('foo.dynamic.provider'), ], ], + [ + 'setConsumerOptions', + [ + [ + 'no_ack' => true, + ], + ], + ], ], $definition->getMethodCalls() ); @@ -750,6 +774,14 @@ public function testFooAnonConsumerDefinition() 'setCallback', [[new Reference('foo_anon.callback'), 'execute']], ], + [ + 'setConsumerOptions', + [ + [ + 'no_ack' => true, + ], + ], + ], ], $definition->getMethodCalls() );