Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branch with required changes based on 5.4.x #2

Open
wants to merge 5 commits into
base: 5.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@
"allow-plugins": {
"php-http/discovery": false,
"symfony/runtime": true
}
},
"use-parent-dir": true
},
"autoload": {
"psr-4": {
Expand Down Expand Up @@ -204,10 +205,6 @@
"symfony/contracts": "2.5.x-dev"
}
}
},
{
"type": "path",
"url": "src/Symfony/Component/Runtime"
}
],
"minimum-stability": "dev"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ public function testItReturnsTheDecodedMessageToTheHandler()
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
}

public function testItReturnsTheDecodedMessageToTheHandlerInBlockingMode()
{
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->onlyMethods(['getQueueNames', 'pull'])
->getMock();
$serializer = new Serializer(
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
);

$amqpEnvelope = $this->createAMQPEnvelope();

$amqpQueue = $this->createMock(\AMQPQueue::class);
$amqpQueue->method('getName')->willReturn('queueName');

$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
});

$receiver = new AmqpReceiver($connection, $serializer);
$receiver->pull(function (Envelope $envelope) {
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
});
}

public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
{
$this->expectException(TransportException::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,36 @@ public function testReceivesMessages()
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
}

public function testReceivesMessagesInBlockingMode()
{
$transport = $this->getTransport(
$serializer = $this->createMock(SerializerInterface::class),
$connection = $this->getMockBuilder(Connection::class)
->disableOriginalConstructor()
->onlyMethods(['getQueueNames', 'pull'])
->getMock()
);

$decodedMessage = new DummyMessage('Decoded.');

$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpEnvelope->method('getBody')->willReturn('body');
$amqpEnvelope->method('getHeaders')->willReturn(['my' => 'header']);

$amqpQueue = $this->createMock(\AMQPQueue::class);
$amqpQueue->method('getName')->willReturn('queueName');

$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
$connection->method('getQueueNames')->willReturn(['queueName']);
$connection->method('pull')->willReturnCallback(function (string $queueName, callable $callback) use ($amqpQueue, $amqpEnvelope) {
\call_user_func($callback, $amqpEnvelope, $amqpQueue);
});

$transport->pull(function (Envelope $envelope) use ($decodedMessage) {
$this->assertSame($decodedMessage, $envelope->getMessage());
});
}

private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): AmqpTransport
{
$serializer = $serializer ?? $this->createMock(SerializerInterface::class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -25,7 +26,7 @@
*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
class AmqpReceiver implements QueueReceiverInterface, QueueBlockingReceiverInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
Expand All @@ -44,9 +45,46 @@ public function get(): iterable
yield from $this->getFromQueues($this->connection->getQueueNames());
}

/**
* {@inheritdoc}
*/
public function pull(callable $callback): void
{
$this->pullFromQueues($this->connection->getQueueNames(), $callback);
}

public function pullFromQueues(array $queueNames, callable $callback): void
{
if (0 === \count($queueNames)) {
return;
}

// Pop last queue to send callback
$firstQueue = array_pop($queueNames);

foreach ($queueNames as $queueName) {
$this->pullEnvelope($queueName, null);
}

$this->pullEnvelope($firstQueue, $callback);
}

private function pullEnvelope(string $queueName, ?callable $callback): void
{
if (null !== $callback) {
$callback = function (\AMQPEnvelope $amqpEnvelope, \AMQPQueue $queue) use ($callback) {
$queueName = $queue->getName();
$body = $amqpEnvelope->getBody();
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);

return $callback($envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName)));
};
}

try {
$this->connection->pull($queueName, $callback);
} catch (\AMQPException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}

public function getFromQueues(array $queueNames): iterable
{
foreach ($queueNames as $queueName) {
Expand All @@ -67,9 +105,15 @@ private function getEnvelope(string $queueName): iterable
}

$body = $amqpEnvelope->getBody();
$envelope = $this->decodeAmqpEnvelope($amqpEnvelope, $body, $queueName);

yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
}

private function decodeAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, $body, string $queueName): Envelope
{
try {
$envelope = $this->serializer->decode([
return $this->serializer->decode([
'body' => false === $body ? '' : $body, // workaround https://github.com/pdezwart/php-amqp/issues/351
'headers' => $amqpEnvelope->getHeaders(),
]);
Expand All @@ -79,8 +123,6 @@ private function getEnvelope(string $queueName): iterable

throw $exception;
}

yield $envelope->with(new AmqpReceivedStamp($amqpEnvelope, $queueName));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueBlockingReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand All @@ -22,7 +23,7 @@
/**
* @author Nicolas Grekas <p@tchwork.com>
*/
class AmqpTransport implements QueueReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
class AmqpTransport implements QueueReceiverInterface, QueueBlockingReceiverInterface, TransportInterface, SetupableTransportInterface, MessageCountAwareInterface
{
private $serializer;
private $connection;
Expand All @@ -43,6 +44,22 @@ public function get(): iterable
return ($this->receiver ?? $this->getReceiver())->get();
}

/**
* {@inheritdoc}
*/
public function pull(callable $callback): void
{
$this->getReceiver()->pull($callback);
}

/**
* {@inheritdoc}
*/
public function pullFromQueues(array $queueNames, callable $callback): void
{
$this->getReceiver()->pullFromQueues($queueNames, $callback);
}

/**
* {@inheritdoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,21 @@ class Connection
];

private const AVAILABLE_QUEUE_OPTIONS = [
'flags',
'arguments',
];

private const NEW_QUEUE_OPTIONS = [
'bindings',
];

private const DEPRECATED_BINDING_KEYS = [
'binding_keys',
'binding_arguments',
'flags',
];

private const AVAILABLE_BINDINGS_OPTIONS = [
'key',
'arguments',
];

Expand Down Expand Up @@ -145,8 +157,11 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
* * connect_timeout: Connection timeout. Note: 0 or greater seconds. May be fractional.
* * confirm_timeout: Timeout in seconds for confirmation, if none specified transport will not wait for message confirmation. Note: 0 or greater seconds. May be fractional.
* * queues[name]: An array of queues, keyed by the name
* * binding_keys: The binding keys (if any) to bind to this queue
* * binding_arguments: Arguments to be used while binding the queue.
* * binding_keys: The binding keys (if any) to bind to this queue (Usage is deprecated. See 'bindings')
* * binding_arguments: Arguments to be used while binding the queue. (Usage is deprecated. See 'bindings')
* * bindings[name]: An array of bindings for this queue, keyed by the name
* * key: The binding key (if any) to bind to this queue
* * arguments: An array of arguments to be used while binding the queue.
* * flags: Queue flags (Default: AMQP_DURABLE)
* * arguments: Extra arguments
* * exchange:
Expand Down Expand Up @@ -261,8 +276,23 @@ private static function validateOptions(array $options): void
continue;
}

if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS))) {
trigger_deprecation('symfony/messenger', '5.1', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions));
if (0 < \count($deprecatedQueueOptions = array_intersect(array_keys($queue), self::DEPRECATED_BINDING_KEYS))) {
trigger_deprecation('symfony/messenger', '5.10', 'Deprecated queue option(s) "%s" passed to the AMQP Messenger transport. The "%s" option(s) should be used rather than "%s".', implode('", "', $deprecatedQueueOptions), implode('", ', self::NEW_QUEUE_OPTIONS), implode('", ', self::DEPRECATED_BINDING_KEYS));
if (0 < \count($newQueueOptions = array_intersect(array_keys($queue), self::NEW_QUEUE_OPTIONS))) {
throw new LogicException(sprintf('New "%s" and deprecated "%s" option(s) passed to the AMQP Messenger transport', implode('", "', $newQueueOptions), implode('", "', $deprecatedQueueOptions)));
}
}

if (0 < \count($invalidQueueOptions = array_diff(array_keys($queue), self::AVAILABLE_QUEUE_OPTIONS, self::NEW_QUEUE_OPTIONS, self::DEPRECATED_BINDING_KEYS))) {
trigger_deprecation('symfony/messenger', '5.10', 'Invalid queue option(s) "%s" passed to the AMQP Messenger transport. Passing invalid queue options is deprecated.', implode('", "', $invalidQueueOptions));
}

if (\is_array($queue['bindings'] ?? false)) {
foreach ($queue['bindings'] as $individualBinding) {
if (0 < \count(array_diff(array_keys($individualBinding), self::AVAILABLE_BINDINGS_OPTIONS))) {
throw new LogicException(sprintf("Valid options for each 'bindings' are: %s", implode(', ', self::AVAILABLE_BINDINGS_OPTIONS)));
}
}
}
}
}
Expand All @@ -281,7 +311,7 @@ private static function normalizeQueueArguments(array $arguments): array
}

if (!is_numeric($arguments[$key])) {
throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", "%s" given.', $key, get_debug_type($arguments[$key])));
throw new InvalidArgumentException(sprintf('Integer expected for queue argument "%s", "%s" given.', $key, gettype($arguments[$key])));
}

$arguments[$key] = (int) $arguments[$key];
Expand Down Expand Up @@ -457,9 +487,30 @@ public function get(string $queueName): ?\AMQPEnvelope
return null;
}

public function ack(\AMQPEnvelope $message, string $queueName): bool
/**
* Consume a message from the specified queue in blocking mode.
*
* @param ?callable(\AMQPEnvelope,\AMQPQueue):?false $callback If callback return false, then processing thread will be
* returned from AMQPQueue::consume() to PHP script. If null is passed, then the messages delivered to this client
* will be made available to the first real callback registered. That allows one to have a single callback
* consuming from multiple queues.
*
* @throws \AMQPException
*/
public function pull(string $queueName, ?callable $callback): void
{
$this->clearWhenDisconnected();

if ($this->autoSetupExchange) {
$this->setupExchangeAndQueues();
}

$this->queue($queueName)->consume($callback);
}

public function ack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM)
{
return $this->queue($queueName)->ack($message->getDeliveryTag()) ?? true;
$this->queue($queueName)->ack($message->getDeliveryTag(), $flags);
}

public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AMQP_NOPARAM): bool
Expand All @@ -479,6 +530,12 @@ private function setupExchangeAndQueues(): void

foreach ($this->queuesOptions as $queueName => $queueConfig) {
$this->queue($queueName)->declareQueue();
foreach ($queueConfig['bindings'] ?? [] as $binding) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $binding['key'] ?? null, $binding['arguments'] ?? []);
}
if (isset($queueConfig['bindings']) && empty($queueConfig['binding_keys'])) {
continue;
}
foreach ($queueConfig['binding_keys'] ?? [null] as $bindingKey) {
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
}
Expand Down
12 changes: 11 additions & 1 deletion src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
CHANGELOG
=========

5.10
---

* Add `BlockingReceiverInterface` to allow blocking receive operations (uses more efficient `consume` method instead of `get` method in amqp transport)
* Add `QueueBlockingReceiverInterface` to allow blocking receive operations on a specific queue (uses more efficient `consume` method instead of `get` method in amqp transport)
* Add `--blocking-mode` option to `messenger:consume` (will use more efficient `consume` method instead of `get` method in amqp transport)
* Add `MultipleBindings` support for AMQP transport by adding queue options `binding_keys` and `binding_arguments` to AMQP transport to allow bindings based on multiple arguments

The minor version 10 is used to avoid any conflicts with the official Symfony post 5.4 releases even though they are not expected

5.4
---

* Add `AsMessageHandler` attribute for declaring message handlers on PHP 8.
* Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
* Add support for resetting container services after each messenger message.
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
* Added `WorkerMetadata` class wUpdathich allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
* New method `getMetadata()` was added to `Worker` class which returns the `WorkerMetadata` object.
* Deprecate not setting the `reset_on_message` config option, its default value will change to `true` in 6.0
* Add log when worker should stop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ protected function configure(): void
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
new InputOption('blocking-mode', null, InputOption::VALUE_NONE, 'Consume messages in blocking mode. If option is specified only one receiver is supported'),
])
->setDescription(self::$defaultDescription)
->setHelp(<<<'EOF'
Expand Down Expand Up @@ -122,6 +123,12 @@ protected function configure(): void
Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):

<info>php %command.full_name% <receiver-name> --no-reset</info>

Use the --blocking-mode option to force receiver to work in blocking mode
("consume" method will be used instead of "get" in RabbitMQ for example).
Only supported by some receivers, and you should pass only one receiver:

<info>php %command.full_name% <receiver-name> --blocking-mode</info>
EOF
)
;
Expand Down Expand Up @@ -227,6 +234,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
$options = [
'sleep' => $input->getOption('sleep') * 1000000,
'blocking-mode' => (bool) $input->getOption('blocking-mode'),
];
if ($queues = $input->getOption('queues')) {
$options['queues'] = $queues;
Expand Down
Loading
Loading