From e11e2d7be3af46885248e14c8d614d21f504eb41 Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Fri, 25 Oct 2024 19:37:00 -0500 Subject: [PATCH 1/5] Port the solution in 5.4 --- composer.json | 7 +- .../Amqp/Tests/Transport/AmqpReceiverTest.php | 26 +++ .../Tests/Transport/AmqpTransportTest.php | 30 +++ .../Bridge/Amqp/Transport/AmqpReceiver.php | 56 ++++- .../Bridge/Amqp/Transport/AmqpTransport.php | 19 +- .../Bridge/Amqp/Transport/Connection.php | 23 +- .../Command/ConsumeMessagesCommand.php | 8 + .../Command/ConsumeMessagesCommandTest.php | 37 +++ .../Component/Messenger/Tests/WorkerTest.php | 218 +++++++++++++++++- .../Receiver/BlockingReceiverInterface.php | 28 +++ .../QueueBlockingReceiverInterface.php | 29 +++ src/Symfony/Component/Messenger/Worker.php | 66 +++++- 12 files changed, 521 insertions(+), 26 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Transport/Receiver/BlockingReceiverInterface.php create mode 100644 src/Symfony/Component/Messenger/Transport/Receiver/QueueBlockingReceiverInterface.php diff --git a/composer.json b/composer.json index 1c53f27932fc1..af7619fb4a7df 100644 --- a/composer.json +++ b/composer.json @@ -167,7 +167,8 @@ "allow-plugins": { "php-http/discovery": false, "symfony/runtime": true - } + }, + "use-parent-dir": true }, "autoload": { "psr-4": { @@ -204,10 +205,6 @@ "symfony/contracts": "2.5.x-dev" } } - }, - { - "type": "path", - "url": "src/Symfony/Component/Runtime" } ], "minimum-stability": "dev" diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php index 9dd86dcd07b42..167987116de14 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpReceiverTest.php @@ -46,6 +46,32 @@ public function testItReturnsTheDecodedMessageToTheHandler() $this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage()); } + public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode() + { + $connection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->onlyMethods(['getQueueNames', 'pull']) + ->getMock(); + $serializer = new Serializer( + new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()]) + ); + + $amqpEnvelope = $this->createAMQPEnvelope(); + + $amqpQueue = $this->createMock(\AMQPQueue::class); + $amqpQueue->method('getName')->willReturn('queueName'); + + $connection->method('getQueueNames')->willReturn(['queueName']); + $connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) { + \call_user_func($callback, $amqpEnvelope, $amqpQueue); + }); + + $receiver = new AmqpReceiver($connection, $serializer); + $receiver->pull(function (Envelope $envelope) { + $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage()); + }); + } + public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage() { $this->expectException(TransportException::class); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php index 743bd51bac1f3..c2cc766b43bef 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php @@ -52,6 +52,36 @@ public function testReceivesMessages() $this->assertSame($decodedMessage, $envelopes[0]->getMessage()); } + public function testReceivesMessagesInBlockingMode() + { + $transport = $this->getTransport( + $serializer = $this->createMock(SerializerInterface::class), + $connection = $this->getMockBuilder(Connection::class) + ->disableOriginalConstructor() + ->onlyMethods(['getQueueNames', 'pull']) + ->getMock(), + ); + + $decodedMessage = new DummyMessage('Decoded.'); + + $amqpEnvelope = $this->createMock(\AMQPEnvelope::class); + $amqpEnvelope->method('getBody')->willReturn('body'); + $amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']); + + $amqpQueue = $this->createMock(\AMQPQueue::class); + $amqpQueue->method('getName')->willReturn('queueName'); + + $serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage)); + $connection->method('getQueueNames')->willReturn(['queueName']); + $connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) { + \call_user_func($callback, $amqpEnvelope, $amqpQueue); + }); + + $transport->pull(function (Envelope $envelope) use ($decodedMessage) { + $this->assertSame($decodedMessage, $envelope->getMessage()); + }); + } + private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): AmqpTransport { $serializer = $serializer ?? $this->createMock(SerializerInterface::class); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php index 3cadcc10f7994..b9a45fb91182f 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php @@ -16,6 +16,7 @@ use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -25,7 +26,7 @@ * * @author Samuel Roze */ -class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface +class AmqpReceiver implements QueueReceiverInterface, QueueBlockingReceiverInterface, MessageCountAwareInterface { private $serializer; private $connection; @@ -44,9 +45,46 @@ public function get(): iterable yield from $this->getFromQueues($this->connection->getQueueNames()); } - /** - * {@inheritdoc} - */ + public function pull(callable $callback): void + { + $this->pullFromQueues($this->connection->getQueueNames(), $callback); + } + + public function pullFromQueues(array $queueNames, callable $callback): void + { + if (0 === \count($queueNames)) { + return; + } + + // Pop last queue to send callback + $firstQueue = array_pop($queueNames); + + foreach ($queueNames as $queueName) { + $this->pullEnvelope($queueName, null); + } + + $this->pullEnvelope($firstQueue, $callback); + } + + private function pullEnvelope(string $queueName, ?callable $callback): void + { + if (null !== $callback) { + $callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) { + $queueName = $queue->getName(); + $body = $amqpEnvelope->getBody(); + $envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName); + + return $callback($envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName))); + }; + } + + try { + $this->connection->pull($queueName, $callback); + } catch (\AMQPException $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + } + public function getFromQueues(array $queueNames): iterable { foreach ($queueNames as $queueName) { @@ -67,9 +105,15 @@ private function getEnvelope(string $queueName): iterable } $body = $amqpEnvelope->getBody(); + $envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName); + + yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)); + } + private function decodeAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, $body, string $queueName): Envelope + { try { - $envelope = $this->serializer->decode([ + return $this->serializer->decode([ 'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351 'headers' => $amqpEnvelope->getHeaders(), ]); @@ -79,8 +123,6 @@ private function getEnvelope(string $queueName): iterable throw $exception; } - - yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)); } /** diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php index 52b529959b2ea..75e40d4c9597f 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpTransport.php @@ -13,6 +13,7 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -22,7 +23,7 @@ /** * @author Nicolas Grekas */ -class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface +class AmqpTransport implements QueueReceiverInterface, QueueBlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface { private $serializer; private $connection; @@ -43,6 +44,22 @@ public function get(): iterable return ($this->receiver ?? $this->getReceiver())->get(); } + /** + * {@inheritdoc} + */ + public function pull(callable $callback): void + { + $this->getReceiver()->pull($callback); + } + + /** + * {@inheritdoc} + */ + public function pullFromQueues(array $queueNames, callable $callback): void + { + $this->getReceiver()->pullFromQueues($queueNames, $callback); + } + /** * {@inheritdoc} */ diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 8689b8ee306cc..853b815d9ff43 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -281,7 +281,7 @@ private static function normalizeQueueArguments(array $arguments): array } if (!is_numeric($arguments[$key])) { - throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", "%s" given.', $key, get_debug_type($arguments[$key]))); + throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", "%s" given.', $key, gettype($arguments[$key]))); } $arguments[$key] = (int) $arguments[$key]; @@ -457,6 +457,27 @@ public function get(string $queueName): ?\AMQPEnvelope return null; } + /** + * Consume a message from the specified queue in blocking mode. + * + * @param ?callable(\AMQPEnvelope,\AMQPQueue):?false $callback If callback return false, then processing thread will be + * returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client + * will be made available to the first real callback registered. That allows one to have a single callback + * consuming from multiple queues. + * + * @throws \AMQPException + */ + public function pull(string $queueName, ?callable $callback): void + { + $this->clearWhenDisconnected(); + + if ($this->autoSetupExchange) { + $this->setupExchangeAndQueues(); + } + + $this->queue($queueName)->consume($callback); + } + public function ack(\AMQPEnvelope $message, string $queueName): bool { return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true; diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 5b60942ffdf06..58f0d615e68e1 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -81,6 +81,7 @@ protected function configure(): void new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'), new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'), new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'), + new InputOption('blocking-mode', null, InputOption::VALUE_NONE, 'Consume messages in blocking mode. If option is specified only one receiver is supported'), ]) ->setDescription(self::$defaultDescription) ->setHelp(<<<'EOF' @@ -122,6 +123,12 @@ protected function configure(): void Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages): php %command.full_name% --no-reset + +Use the --blocking-mode option to force receiver to work in blocking mode +("consume" method will be used instead of "get" in RabbitMQ for example). +Only supported by some receivers, and you should pass only one receiver: + + php %command.full_name% --blocking-mode EOF ) ; @@ -227,6 +234,7 @@ protected function execute(InputInterface $input, OutputInterface $output) $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger); $options = [ 'sleep' => $input->getOption('sleep') * 1000000, + 'blocking-mode' => (bool) $input->getOption('blocking-mode'), ]; if ($queues = $input->getOption('queues')) { $options['queues'] = $queues; diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php index 4ff6b66d11f35..fe27225e711a8 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php @@ -28,6 +28,7 @@ use Symfony\Component\Messenger\RoutableMessageBus; use Symfony\Component\Messenger\Stamp\BusNameStamp; use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; class ConsumeMessagesCommandTest extends TestCase @@ -72,6 +73,42 @@ public function testBasicRun() $this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay()); } + public function testRunWithBlockingModeOption() + { + $envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]); + + $receiver = $this->createMock(QueueBlockingReceiverInterface::class); + $receiver->expects($this->once())->method('pullFromQueues')->willReturnCallback(function (array $queueNames, callable $callback) use ($envelope) { + \call_user_func($callback, $envelope); + }); + + $receiverLocator = $this->createMock(ContainerInterface::class); + $receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true); + $receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver); + + $bus = $this->createMock(MessageBusInterface::class); + $bus->expects($this->once())->method('dispatch'); + + $busLocator = $this->createMock(ContainerInterface::class); + $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true); + $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus); + + $command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher()); + + $application = new Application(); + $application->add($command); + $tester = new CommandTester($application->get('messenger:consume')); + $tester->execute([ + 'receivers' => ['dummy-receiver'], + '--limit' => 1, + '--blocking-mode' => true, + '--queues' => ['foo'], + ]); + + $tester->assertCommandIsSuccessful(); + $this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay()); + } + public function testRunWithBusOption() { $envelope = new Envelope(new \stdClass()); diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 0b7c447f11285..6f134808f0f05 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -38,8 +38,9 @@ use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; -use Symfony\Component\Messenger\Tests\Fixtures\DummyReceiver; use Symfony\Component\Messenger\Tests\Fixtures\ResettableDummyReceiver; +use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; @@ -512,6 +513,212 @@ public function testFlushBatchOnStop() $this->assertSame($expectedMessages, $handler->processedMessages); } + + public function testBlockingMode() + { + $apiMessage = new DummyMessage('API'); + $ipaMessage = new DummyMessage('IPA'); + + $receiver = new BlockingDummyReceiver([ + [new Envelope($apiMessage), new Envelope($ipaMessage)], + ]); + + $bus = $this->createMock(MessageBusInterface::class); + $envelopes = []; + + $bus->expects($this->exactly(2)) + ->method('dispatch') + ->willReturnCallback(function ($envelope) use (&$envelopes) { + return $envelopes[] = $envelope; + }); + + $dispatcher = new class() implements EventDispatcherInterface { + private $listener; + + public function __construct() + { + $this->listener = new StopWorkerOnMessageLimitListener(2); + } + + public function dispatch(object $event, ?string $eventName = null): object + { + if ($event instanceof WorkerRunningEvent) { + $this->listener->onWorkerRunning($event); + } + + return $event; + } + }; + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); + $worker->run(['blocking-mode' => true]); + + $this->assertSame($apiMessage, $envelopes[0]->getMessage()); + $this->assertSame($ipaMessage, $envelopes[1]->getMessage()); + $this->assertCount(1, $envelopes[0]->all(ReceivedStamp::class)); + $this->assertCount(1, $envelopes[0]->all(ConsumedByWorkerStamp::class)); + $this->assertSame('transport', $envelopes[0]->last(ReceivedStamp::class)->getTransportName()); + + $this->assertSame(2, $receiver->getAcknowledgeCount()); + } + + public function testReceiverDoesNotSupportBlockingMode() + { + $receiver = $this->createMock(QueueReceiverInterface::class); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $worker = new Worker(['transport' => $receiver], $bus); + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage(sprintf('Receiver for "transport" does not implement "%s".', BlockingReceiverInterface::class)); + $worker->run(['blocking-mode' => true]); + } + + public function testMoreThanOneReceiverInBlockingMode() + { + $receiver1 = $this->createMock(QueueReceiverInterface::class); + $receiver2 = $this->createMock(QueueReceiverInterface::class); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus); + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage('In blocking mode only one receiver is supported'); + $worker->run(['blocking-mode' => true]); + } + + public function testWorkerLimitQueuesInBlockingMode() + { + $apiMessage = new DummyMessage('API'); + $ipaMessage = new DummyMessage('IPA'); + + $receiver = new QueueBlockingDummyReceiver([ + [new Envelope($apiMessage), new Envelope($ipaMessage)], + ]); + + $bus = $this->createMock(MessageBusInterface::class); + $envelopes = []; + + $bus->expects($this->exactly(2)) + ->method('dispatch') + ->willReturnCallback(function ($envelope) use (&$envelopes) { + return $envelopes[] = $envelope; + }); + + $dispatcher = new class() implements EventDispatcherInterface { + private $listener; + + public function __construct() + { + $this->listener = new StopWorkerOnMessageLimitListener(2); + } + + public function dispatch(object $event, ?string $eventName = null): object + { + if ($event instanceof WorkerRunningEvent) { + $this->listener->onWorkerRunning($event); + } + + return $event; + } + }; + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); + $worker->run([ + 'blocking-mode' => true, + 'queues' => ['foo'], + ]); + + $this->assertSame($apiMessage, $envelopes[0]->getMessage()); + $this->assertSame($ipaMessage, $envelopes[1]->getMessage()); + $this->assertCount(1, $envelopes[0]->all(ReceivedStamp::class)); + $this->assertCount(1, $envelopes[0]->all(ConsumedByWorkerStamp::class)); + $this->assertSame('transport', $envelopes[0]->last(ReceivedStamp::class)->getTransportName()); + + $this->assertSame(2, $receiver->getAcknowledgeCount()); + } + + public function testWorkerLimitQueuesUnsupportedInBlockingMode() + { + $receiver = $this->createMock(BlockingReceiverInterface::class); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $worker = new Worker(['transport' => $receiver], $bus); + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage(sprintf('Receiver for "transport" does not implement "%s".', QueueBlockingReceiverInterface::class)); + $worker->run([ + 'queues' => ['foo'], + 'blocking-mode' => true, + ]); + } +} + +class DummyReceiver implements ReceiverInterface +{ + private $deliveriesOfEnvelopes; + private $acknowledgedEnvelopes; + private $rejectedEnvelopes; + private $acknowledgeCount = 0; + private $rejectCount = 0; + + /** + * @param Envelope[][] $deliveriesOfEnvelopes + */ + public function __construct(array $deliveriesOfEnvelopes) + { + $this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes; + } + + public function get(): iterable + { + $val = array_shift($this->deliveriesOfEnvelopes); + + return $val ?? []; + } + + public function ack(Envelope $envelope): void + { + ++$this->acknowledgeCount; + $this->acknowledgedEnvelopes[] = $envelope; + } + + public function reject(Envelope $envelope): void + { + ++$this->rejectCount; + $this->rejectedEnvelopes[] = $envelope; + } + + public function getAcknowledgeCount(): int + { + return $this->acknowledgeCount; + } + + public function getRejectCount(): int + { + return $this->rejectCount; + } + + public function getAcknowledgedEnvelopes(): array + { + return $this->acknowledgedEnvelopes; + } +} + +class BlockingDummyReceiver extends DummyReceiver implements BlockingReceiverInterface +{ + public function pull(callable $callback): void + { + $envelopes = $this->get(); + + foreach ($envelopes as $envelope) { + $shouldContinue = $callback($envelope); + + if ($shouldContinue === false) { + return; + } + } + } } class DummyQueueReceiver extends DummyReceiver implements QueueReceiverInterface @@ -547,3 +754,12 @@ private function process(array $jobs): void } } } + +class QueueBlockingDummyReceiver extends BlockingDummyReceiver implements QueueBlockingReceiverInterface +{ + public function pullFromQueues(array $queueNames, callable $callback): void + { + $this->pull($callback); + } +} + diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/BlockingReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/BlockingReceiverInterface.php new file mode 100644 index 0000000000000..2f783d97302a2 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/BlockingReceiverInterface.php @@ -0,0 +1,28 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Receiver; + +use Symfony\Component\Messenger\Exception\TransportException; + +/** + * @author Alexander Melikhov + */ +interface BlockingReceiverInterface extends ReceiverInterface +{ + /** + * @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be + * returned to PHP script. + * + * @throws TransportException If there is an issue communicating with the transport + */ + public function pull(callable $callback): void; +} diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/QueueBlockingReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/QueueBlockingReceiverInterface.php new file mode 100644 index 0000000000000..3c0414602950a --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/QueueBlockingReceiverInterface.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Receiver; + +/** + * Some transports may have multiple queues. This interface is used to read from only some queues in blocking mode. + * + * @author Alexander Melikhov + */ +interface QueueBlockingReceiverInterface extends BlockingReceiverInterface +{ + /** + * Pull messages from the specified queue names instead of consuming from all queues. + * + * @param string[] $queueNames + * @param callable(\AMQPEnvelope):?false $callback If callback return false, then processing thread will be + * returned to PHP script. + */ + public function pullFromQueues(array $queueNames, callable $callback): void; +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index bba19aa4d9b63..14cb1c4ba8d36 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -28,6 +28,8 @@ use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp; use Symfony\Component\Messenger\Stamp\NoAutoAckStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Transport\Receiver\BlockingReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; @@ -77,15 +79,32 @@ public function run(array $options = []): void 'sleep' => 1000000, ], $options); $queueNames = $options['queues'] ?? null; + $blockingMode = $options['blocking-mode'] ?? false; $this->metadata->set(['queueNames' => $queueNames]); + if ($blockingMode) { + if (\count($this->receivers) > 1) { + throw new RuntimeException('In blocking mode only one receiver is supported.'); + } + + foreach ($this->receivers as $transportName => $receiver) { + if (!$receiver instanceof BlockingReceiverInterface) { + throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, BlockingReceiverInterface::class)); + } + } + } + $this->dispatchEvent(new WorkerStartedEvent($this)); if ($queueNames) { // if queue names are specified, all receivers must implement the QueueReceiverInterface foreach ($this->receivers as $transportName => $receiver) { - if (!$receiver instanceof QueueReceiverInterface) { + if ($blockingMode) { + if (!$receiver instanceof QueueBlockingReceiverInterface) { + throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueBlockingReceiverInterface::class)); + } + } elseif (!$receiver instanceof QueueReceiverInterface) { throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class)); } } @@ -95,20 +114,45 @@ public function run(array $options = []): void $envelopeHandled = false; $envelopeHandledStart = microtime(true); foreach ($this->receivers as $transportName => $receiver) { - if ($queueNames) { - $envelopes = $receiver->getFromQueues($queueNames); + if ($blockingMode) { + $callback = function (Envelope $envelope) use ($transportName, &$envelopeHandled) { + $envelopeHandled = true; + $this->handleMessage($envelope, $transportName); + + if ($this->eventDispatcher !== null) { + $this->eventDispatcher->dispatch(new WorkerRunningEvent($this, false)); + } + + if ($this->shouldStop) { + return false; + } + + return true; + }; + + if ($queueNames) { + \assert($receiver instanceof QueueBlockingReceiverInterface); + $receiver->pullFromQueues($queueNames, $callback); + } else { + \assert($receiver instanceof BlockingReceiverInterface); + $receiver->pull($callback); + } } else { - $envelopes = $receiver->get(); - } + if ($queueNames && $receiver instanceof QueueReceiverInterface) { + $envelopes = $receiver->getFromQueues($queueNames); + } else { + $envelopes = $receiver->get(); + } - foreach ($envelopes as $envelope) { - $envelopeHandled = true; + foreach ($envelopes as $envelope) { + $envelopeHandled = true; - $this->handleMessage($envelope, $transportName); - $this->dispatchEvent(new WorkerRunningEvent($this, false)); + $this->handleMessage($envelope, $transportName); + $this->dispatchEvent(new WorkerRunningEvent($this, false)); - if ($this->shouldStop) { - break 2; + if ($this->shouldStop) { + break 2; + } } } From 10a813fce70f6786fbff58c60767cca1a2cbf334 Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Fri, 25 Oct 2024 19:49:36 -0500 Subject: [PATCH 2/5] Fix the syntax to satisfy PHP 7.3 --- .../Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php index c2cc766b43bef..ffafa0ab6141c 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpTransportTest.php @@ -59,7 +59,7 @@ public function testReceivesMessagesInBlockingMode() $connection = $this->getMockBuilder(Connection::class) ->disableOriginalConstructor() ->onlyMethods(['getQueueNames', 'pull']) - ->getMock(), + ->getMock() ); $decodedMessage = new DummyMessage('Decoded.'); From 906b9de504a0a7ad7ab092aa8cab446b0b42f6f5 Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Wed, 30 Oct 2024 14:36:55 -0500 Subject: [PATCH 3/5] Add multiple bindings support --- .../Bridge/Amqp/Transport/Connection.php | 48 ++++++++++++++++--- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index 853b815d9ff43..c816f77dc26be 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -66,9 +66,21 @@ class Connection ]; private const AVAILABLE_QUEUE_OPTIONS = [ + 'flags', + 'arguments', + ]; + + private const NEW_QUEUE_OPTIONS = [ + 'bindings', + ]; + + private const DEPRECATED_BINDING_KEYS = [ 'binding_keys', 'binding_arguments', - 'flags', + ]; + + private const AVAILABLE_BINDINGS_OPTIONS = [ + 'key', 'arguments', ]; @@ -145,8 +157,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar * * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional. * * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional. * * queues[name]: An array of queues, keyed by the name - * * binding_keys: The binding keys (if any) to bind to this queue - * * binding_arguments: Arguments to be used while binding the queue. + * * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings') + * * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings') + * * bindings[name]: An array of bindings for this queue, keyed by the name + * * key: The binding key (if any) to bind to this queue + * * arguments: An array of arguments to be used while binding the queue. * * flags: Queue flags (Default: AMQP_DURABLE) * * arguments: Extra arguments * * exchange: @@ -261,9 +276,24 @@ private static function validateOptions(array $options): void continue; } - if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) { + if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) { + trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS)); + if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) { + throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions))); + } + } + + if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::NEW_QUEUE_OPTIONS, self::DEPRECATED_BINDING_KEYS))) { trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions)); } + + if (\is_array($queue['bindings'] ?? false)) { + foreach ($queue['bindings'] as $individualBinding) { + if (0 < \count(array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) { + throw new LogicException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS))); + } + } + } } } @@ -478,9 +508,9 @@ public function pull(string $queueName, ?callable $callback): void $this->queue($queueName)->consume($callback); } - public function ack(\AMQPEnvelope $message, string $queueName): bool + public function ack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool { - return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true; + return $this->queue($queueName)->ack($message->getDeliveryTag(), $flags); } public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool @@ -500,6 +530,12 @@ private function setupExchangeAndQueues(): void foreach ($this->queuesOptions as $queueName => $queueConfig) { $this->queue($queueName)->declareQueue(); + foreach ($queueConfig['bindings'] ?? [] as $binding) { + $this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []); + } + if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) { + continue; + } foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) { $this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []); } From 9e82776b5d9e05f7a4489f7e30c56fba58f8349c Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Mon, 4 Nov 2024 16:20:19 -0600 Subject: [PATCH 4/5] Add the hard requirement on amqp-messenger version, add changelog notes --- src/Symfony/Component/Messenger/CHANGELOG.md | 12 +++++++++++- src/Symfony/Component/Messenger/composer.json | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 6e219137d90d6..db7094d938ea8 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -1,6 +1,16 @@ CHANGELOG ========= +5.10 +--- + +* Add `BlockingReceiverInterface` to allow blocking receive operations (uses more efficient `consume` method instead of `get` method in amqp transport) +* Add `QueueBlockingReceiverInterface` to allow blocking receive operations on a specific queue (uses more efficient `consume` method instead of `get` method in amqp transport) +* Add `--blocking-mode` option to `messenger:consume` (will use more efficient `consume` method instead of `get` method in amqp transport) +* Add `MultipleBindings` support for AMQP transport by adding queue options `binding_keys` and `binding_arguments` to AMQP transport to allow bindings based on multiple arguments + +The minor version 10 is used to avoid any conflicts with the official Symfony post 5.4 releases even though they are not expected + 5.4 --- @@ -8,7 +18,7 @@ CHANGELOG * Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait * Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker. * Add support for resetting container services after each messenger message. - * Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from. + * Added `WorkerMetadata` class wUpdathich allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from. * New method `getMetadata()` was added to `Worker` class which returns the `WorkerMetadata` object. * Deprecate not setting the `reset_on_message` config option, its default value will change to `true` in 6.0 * Add log when worker should stop. diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index a62537cc63a97..36967ce24f03e 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -18,7 +18,7 @@ "require": { "php": ">=7.2.5", "psr/log": "^1|^2|^3", - "symfony/amqp-messenger": "^5.1|^6.0", + "symfony/amqp-messenger": "^5.10|^6.0", "symfony/deprecation-contracts": "^2.1|^3", "symfony/doctrine-messenger": "^5.1|^6.0", "symfony/polyfill-php80": "^1.16", From ab26a0254c4474f1c6b4a1c1aec309500d70019f Mon Sep 17 00:00:00 2001 From: Oleg Namaka Date: Mon, 4 Nov 2024 17:04:11 -0600 Subject: [PATCH 5/5] Connection::ack cannot return a bool, it breaks the tests "TypeError: Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection::ack(): Return value must be of type bool, null returned" --- .../Messenger/Bridge/Amqp/Transport/Connection.php | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php index c816f77dc26be..c2b96de9df0d8 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php @@ -277,14 +277,14 @@ private static function validateOptions(array $options): void } if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) { - trigger_deprecation('symfony/messenger', '6.3', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS)); + trigger_deprecation('symfony/messenger', '5.10', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS)); if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) { throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions))); } } if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::NEW_QUEUE_OPTIONS, self::DEPRECATED_BINDING_KEYS))) { - trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions)); + trigger_deprecation('symfony/messenger', '5.10', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions)); } if (\is_array($queue['bindings'] ?? false)) { @@ -508,9 +508,9 @@ public function pull(string $queueName, ?callable $callback): void $this->queue($queueName)->consume($callback); } - public function ack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool + public function ack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM) { - return $this->queue($queueName)->ack($message->getDeliveryTag(), $flags); + $this->queue($queueName)->ack($message->getDeliveryTag(), $flags); } public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool