diff --git a/composer.json b/composer.json index 7e7f9a9b..7aa754e6 100644 --- a/composer.json +++ b/composer.json @@ -18,7 +18,7 @@ "require": { "php": "^7.4 | ^8.0", "nette/di": "^3.0", - "nette/utils": "^3.0", + "nette/utils": "^3.0 || ^4.0", "php-amqplib/php-amqplib": "~3.1.0" }, "require-dev": { diff --git a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php index 0e710b65..04ee15ee 100644 --- a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php +++ b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php @@ -49,6 +49,17 @@ class RabbitMqExtension extends \Nette\DI\CompilerExtension 'user' => NULL, 'password' => NULL, 'vhost' => '/', + 'insist' => FALSE, + 'login_method' => 'AMQPLAIN', + 'login_response' => NULL, + 'locale' => 'en_US', + 'connection_timeout' => 3.0, + 'read_write_timeout' => 3.0, + 'context' => NULL, + 'keepalive' => FALSE, + 'heartbeat' => 0, + 'channel_rpc_timeout' => 0.0, + 'ssl_protocol' => NULL, ]; /** @@ -275,6 +286,17 @@ protected function loadConnections(array $connections): void $config['user'], $config['password'], $config['vhost'], + $config['insist'], + $config['login_method'], + $config['login_response'], + $config['locale'], + $config['connection_timeout'], + $config['read_write_timeout'], + $config['context'], + $config['keepalive'], + $config['heartbeat'], + $config['channel_rpc_timeout'], + $config['ssl_protocol'], ]); $this->connectionsMeta[$name] = [ diff --git a/src/Kdyby/RabbitMq/MultipleConsumer.php b/src/Kdyby/RabbitMq/MultipleConsumer.php index c81cdfba..32cb37d9 100644 --- a/src/Kdyby/RabbitMq/MultipleConsumer.php +++ b/src/Kdyby/RabbitMq/MultipleConsumer.php @@ -73,6 +73,14 @@ protected function queueDeclare(): void $this->queueDeclared = TRUE; } + public function stopConsuming(): void + { + foreach ($this->queues as $name => $options) { + $this->getChannel()->basic_cancel($this->getQueueConsumerTag($name)); + } + $this->onStop($this); + } + public function processQueueMessage(string $queueName, AMQPMessage $msg): void { if (!isset($this->queues[$queueName])) {