From 1ca51715417d11595aac7cf5b6a62d585c18bd70 Mon Sep 17 00:00:00 2001 From: Diyorbek Ibragimov <167644693+DikoIbragimov@users.noreply.github.com> Date: Wed, 4 Dec 2024 13:09:31 +0500 Subject: [PATCH 1/8] Update Adapter.php --- src/Adapter.php | 68 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 23 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index 589f017..648978b 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -2,27 +2,37 @@ declare(strict_types=1); -namespace Yiisoft\Queue\AMQP; +namespace App; use PhpAmqpLib\Message\AMQPMessage; use Throwable; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\AMQP\Exception\NotImplementedException; +use Yiisoft\Queue\AMQP\ExistingMessagesConsumer; +use Yiisoft\Queue\AMQP\MessageSerializerInterface; +use Yiisoft\Queue\AMQP\QueueProviderInterface; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Enum\JobStatus; use Yiisoft\Queue\Message\MessageInterface; -final class Adapter implements AdapterInterface -{ +final class Adapter implements AdapterInterface { + /** + * @param QueueProviderInterface $queueProvider + * @param MessageSerializerInterface $serializer + * @param LoopInterface $loop + */ public function __construct( - private QueueProviderInterface $queueProvider, - private MessageSerializerInterface $serializer, - private LoopInterface $loop, + private QueueProviderInterface $queueProvider, + private readonly MessageSerializerInterface $serializer, + private readonly LoopInterface $loop, ) { } - public function withChannel(string $channel): self - { + /** + * @param string $channel + * @return $this + */ + public function withChannel(string $channel): self { $instance = clone $this; $instance->queueProvider = $this->queueProvider->withChannelName($channel); @@ -30,10 +40,9 @@ public function withChannel(string $channel): self } /** - * @param callable(MessageInterface): bool $handlerCallback + * @param callable(MessageInterface): bool $handlerCallback */ - public function runExisting(callable $handlerCallback): void - { + public function runExisting(callable $handlerCallback): void { $channel = $this->queueProvider->getChannel(); (new ExistingMessagesConsumer($channel, $this->queueProvider ->getQueueSettings() @@ -42,19 +51,22 @@ public function runExisting(callable $handlerCallback): void } /** - * @return never + * @param string|int $id + * @return JobStatus */ - public function status(string $id): JobStatus - { + public function status(string|int $id): JobStatus { throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.'); } - public function push(MessageInterface $message): void - { + /** + * @param MessageInterface $message + * @return MessageInterface + */ + public function push(MessageInterface $message): MessageInterface { $payload = $this->serializer->serialize($message); $amqpMessage = new AMQPMessage( $payload, - array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties()) + array_merge(['message_id' => uniqid('', true)], $this->queueProvider->getMessageProperties()) ); $exchangeSettings = $this->queueProvider->getExchangeSettings(); $this->queueProvider @@ -69,10 +81,15 @@ public function push(MessageInterface $message): void /** @var string $messageId */ $messageId = $amqpMessage->get('message_id'); $message->setId($messageId); + + return $message; } - public function subscribe(callable $handlerCallback): void - { + /** + * @param callable $handlerCallback + * @return void + */ + public function subscribe(callable $handlerCallback): void { $channel = $this->queueProvider->getChannel(); $channel->basic_consume( $this->queueProvider @@ -105,13 +122,18 @@ function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { } } - public function getQueueProvider(): QueueProviderInterface - { + /** + * @return QueueProviderInterface + */ + public function getQueueProvider(): QueueProviderInterface { return $this->queueProvider; } - public function withQueueProvider(QueueProviderInterface $queueProvider): self - { + /** + * @param QueueProviderInterface $queueProvider + * @return $this + */ + public function withQueueProvider(QueueProviderInterface $queueProvider): self { $new = clone $this; $new->queueProvider = $queueProvider; From 7a51028951e86d5d6d7d863ef092184481141dc4 Mon Sep 17 00:00:00 2001 From: Diyorbek Ibragimov <167644693+DikoIbragimov@users.noreply.github.com> Date: Wed, 4 Dec 2024 13:22:17 +0500 Subject: [PATCH 2/8] Update FakeAdapter.php --- tests/Support/FakeAdapter.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/Support/FakeAdapter.php b/tests/Support/FakeAdapter.php index ddbbe15..3f0f7ee 100644 --- a/tests/Support/FakeAdapter.php +++ b/tests/Support/FakeAdapter.php @@ -25,12 +25,12 @@ public function runExisting(callable $handlerCallback): void // TODO: Implement runExisting() method. } - public function status(string $id): JobStatus + public function status(string|int $id): JobStatus { // TODO: Implement status() method. } - public function push(MessageInterface $message): void + public function push(MessageInterface $message): MessageInterface { // TODO: Implement push() method. } From 176759d7709bc91981bf7d2a09a2da224f15a887 Mon Sep 17 00:00:00 2001 From: Diyorbek Ibragimov <167644693+DikoIbragimov@users.noreply.github.com> Date: Wed, 4 Dec 2024 13:25:38 +0500 Subject: [PATCH 3/8] Fix namespace of Adapter.php --- src/Adapter.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Adapter.php b/src/Adapter.php index 648978b..a2ecac7 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace App; +namespace Yiisoft\Queue\AMQP; use PhpAmqpLib\Message\AMQPMessage; use Throwable; From 38b2048608377a90483e31454d5c192f776ef544 Mon Sep 17 00:00:00 2001 From: uzdevid Date: Sat, 7 Dec 2024 23:16:30 +0500 Subject: [PATCH 4/8] Update Adapter.php --- src/Adapter.php | 57 ++++++++++++++++++++++++------------------------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index a2ecac7..adcae0a 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -8,12 +8,11 @@ use Throwable; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\AMQP\Exception\NotImplementedException; -use Yiisoft\Queue\AMQP\ExistingMessagesConsumer; -use Yiisoft\Queue\AMQP\MessageSerializerInterface; -use Yiisoft\Queue\AMQP\QueueProviderInterface; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Enum\JobStatus; +use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Message\MessageSerializerInterface; final class Adapter implements AdapterInterface { /** @@ -33,10 +32,10 @@ public function __construct( * @return $this */ public function withChannel(string $channel): self { - $instance = clone $this; - $instance->queueProvider = $this->queueProvider->withChannelName($channel); + $new = clone $this; + $new->queueProvider = $this->queueProvider->withChannelName($channel); - return $instance; + return $new; } /** @@ -44,10 +43,14 @@ public function withChannel(string $channel): self { */ public function runExisting(callable $handlerCallback): void { $channel = $this->queueProvider->getChannel(); - (new ExistingMessagesConsumer($channel, $this->queueProvider - ->getQueueSettings() - ->getName(), $this->serializer)) - ->consume($handlerCallback); + $queueName = $this->queueProvider->getQueueSettings()->getName(); + $consumer = new ExistingMessagesConsumer( + $channel, + $queueName, + $this->serializer + ); + + $consumer->consume($handlerCallback); } /** @@ -55,7 +58,7 @@ public function runExisting(callable $handlerCallback): void { * @return JobStatus */ public function status(string|int $id): JobStatus { - throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.'); + throw new NotImplementedException(sprintf('Status check is not supported by the adapter %s.', self::class)); } /** @@ -69,20 +72,18 @@ public function push(MessageInterface $message): MessageInterface { array_merge(['message_id' => uniqid('', true)], $this->queueProvider->getMessageProperties()) ); $exchangeSettings = $this->queueProvider->getExchangeSettings(); - $this->queueProvider - ->getChannel() - ->basic_publish( - $amqpMessage, - $exchangeSettings?->getName() ?? '', - $exchangeSettings ? '' : $this->queueProvider - ->getQueueSettings() - ->getName() - ); + $channel = $this->queueProvider->getChannel(); + $channel->basic_publish( + $amqpMessage, + $exchangeSettings?->getName() ?? '', + $exchangeSettings ? '' : $this->queueProvider + ->getQueueSettings() + ->getName() + ); /** @var string $messageId */ $messageId = $amqpMessage->get('message_id'); - $message->setId($messageId); - return $message; + return new IdEnvelope($message, $messageId); } /** @@ -91,20 +92,18 @@ public function push(MessageInterface $message): MessageInterface { */ public function subscribe(callable $handlerCallback): void { $channel = $this->queueProvider->getChannel(); + $queueName = $this->queueProvider->getQueueSettings()->getName(); + $channel->basic_consume( - $this->queueProvider - ->getQueueSettings() - ->getName(), - $this->queueProvider - ->getQueueSettings() - ->getName(), + $queueName, + $queueName, false, false, false, true, function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void { try { - $handlerCallback($this->serializer->unserialize($amqpMessage->body)); + $handlerCallback($this->serializer->unserialize($amqpMessage->getBody())); $channel->basic_ack($amqpMessage->getDeliveryTag()); } catch (Throwable $exception) { $consumerTag = $amqpMessage->getConsumerTag(); From 7069cbb6a5d2976b29ef5ac2b7becde214620d60 Mon Sep 17 00:00:00 2001 From: uzdevid Date: Sat, 7 Dec 2024 23:18:10 +0500 Subject: [PATCH 5/8] Change AMQP\MessageSerializerInterface to Queue\MessageSerializerInterface and MessageSerializer to JsonMessageSerializer --- config/di.php | 3 -- src/Exception/NoKeyInPayloadException.php | 2 +- src/ExistingMessagesConsumer.php | 1 + src/MessageSerializer.php | 64 ----------------------- src/MessageSerializerInterface.php | 14 ----- tests/Integration/DelayMiddlewareTest.php | 6 +-- tests/Support/FakeAdapter.php | 2 +- tests/Unit/ExchangeSettingsTest.php | 4 +- tests/Unit/MessageSerializerTest.php | 4 +- tests/Unit/QueueProviderTest.php | 6 +-- tests/Unit/QueueSettingsTest.php | 6 +-- tests/Unit/QueueTest.php | 8 +-- tests/Unit/UnitTestCase.php | 4 +- tests/yii | 8 +-- 14 files changed, 26 insertions(+), 106 deletions(-) delete mode 100644 src/MessageSerializer.php delete mode 100644 src/MessageSerializerInterface.php diff --git a/config/di.php b/config/di.php index d7a7eb3..9ab313d 100644 --- a/config/di.php +++ b/config/di.php @@ -2,15 +2,12 @@ declare(strict_types=1); -use Yiisoft\Queue\AMQP\MessageSerializer; -use Yiisoft\Queue\AMQP\MessageSerializerInterface; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\QueueProviderInterface; use Yiisoft\Queue\AMQP\Settings\Queue; use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface; return [ - MessageSerializerInterface::class => MessageSerializer::class, QueueProviderInterface::class => QueueProvider::class, QueueSettingsInterface::class => Queue::class, ]; diff --git a/src/Exception/NoKeyInPayloadException.php b/src/Exception/NoKeyInPayloadException.php index 79ec038..192d961 100644 --- a/src/Exception/NoKeyInPayloadException.php +++ b/src/Exception/NoKeyInPayloadException.php @@ -7,7 +7,7 @@ use InvalidArgumentException; use Throwable; use Yiisoft\FriendlyException\FriendlyExceptionInterface; -use Yiisoft\Queue\AMQP\MessageSerializerInterface; +use Yiisoft\Queue\Message\MessageSerializerInterface; class NoKeyInPayloadException extends InvalidArgumentException implements FriendlyExceptionInterface { diff --git a/src/ExistingMessagesConsumer.php b/src/ExistingMessagesConsumer.php index 4e030eb..f01f97a 100644 --- a/src/ExistingMessagesConsumer.php +++ b/src/ExistingMessagesConsumer.php @@ -8,6 +8,7 @@ use PhpAmqpLib\Message\AMQPMessage; use Throwable; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Message\MessageSerializerInterface; /** * @internal diff --git a/src/MessageSerializer.php b/src/MessageSerializer.php deleted file mode 100644 index 9afc07e..0000000 --- a/src/MessageSerializer.php +++ /dev/null @@ -1,64 +0,0 @@ - $message->getId(), - 'name' => $message->getHandlerName(), - 'data' => $message->getData(), - 'meta' => $message->getMetadata(), - ]; - - return json_encode($payload, JSON_THROW_ON_ERROR); - } - - /** - * @throws JsonException - * @throws NoKeyInPayloadException - * @throws InvalidArgumentException - */ - public function unserialize(string $value): Message - { - $payload = json_decode($value, true, 512, JSON_THROW_ON_ERROR); - if (!is_array($payload)) { - throw new InvalidArgumentException('Payload must be array. Got ' . get_debug_type($payload) . '.'); - } - - $name = $payload['name'] ?? null; - if (!is_string($name)) { - throw new NoKeyInPayloadException('name', $payload); - } - - $id = $payload['id'] ?? null; - if ($id !== null && !is_string($id)) { - throw new NoKeyInPayloadException('id', $payload); - } - - $meta = $payload['meta'] ?? []; - if (!is_array($meta)) { - throw new NoKeyInPayloadException('meta', $payload); - } - - return new Message( - $name, - $payload['data'] ?? null, - $meta, - $id, - ); - } -} diff --git a/src/MessageSerializerInterface.php b/src/MessageSerializerInterface.php deleted file mode 100644 index 4d84521..0000000 --- a/src/MessageSerializerInterface.php +++ /dev/null @@ -1,14 +0,0 @@ -createConnection(), new QueueSettings(), ), - new MessageSerializer(), + new JsonMessageSerializer(), new SignalLoop(), ); $queue = $this->makeQueue($adapter); @@ -75,7 +75,7 @@ public function testMainFlowWithFakeAdapter(): void $this->createConnection(), new QueueSettings(), ), - new MessageSerializer(), + new JsonMessageSerializer(), new SignalLoop(), ); $queue = $this->makeQueue($adapter); diff --git a/tests/Support/FakeAdapter.php b/tests/Support/FakeAdapter.php index 3f0f7ee..cf8a7c9 100644 --- a/tests/Support/FakeAdapter.php +++ b/tests/Support/FakeAdapter.php @@ -5,11 +5,11 @@ namespace Yiisoft\Queue\AMQP\Tests\Support; use Yiisoft\Queue\Adapter\AdapterInterface; -use Yiisoft\Queue\AMQP\MessageSerializerInterface; use Yiisoft\Queue\AMQP\QueueProviderInterface; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Enum\JobStatus; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Message\MessageSerializerInterface; final class FakeAdapter implements AdapterInterface { diff --git a/tests/Unit/ExchangeSettingsTest.php b/tests/Unit/ExchangeSettingsTest.php index 5d16634..2cb6f23 100644 --- a/tests/Unit/ExchangeSettingsTest.php +++ b/tests/Unit/ExchangeSettingsTest.php @@ -7,10 +7,10 @@ use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Wire\AMQPTable; use Yiisoft\Queue\AMQP\Adapter; -use Yiisoft\Queue\AMQP\MessageSerializer; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings; use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings; +use Yiisoft\Queue\Message\JsonMessageSerializer; final class ExchangeSettingsTest extends UnitTestCase { @@ -38,7 +38,7 @@ public function testCommonSettings(): void ]) ) ), - new MessageSerializer(), + new JsonMessageSerializer(), $this->getLoop(), ); $exchangeSettings = $adapter->getQueueProvider()->getExchangeSettings(); diff --git a/tests/Unit/MessageSerializerTest.php b/tests/Unit/MessageSerializerTest.php index 8a2c9a0..e9e2877 100644 --- a/tests/Unit/MessageSerializerTest.php +++ b/tests/Unit/MessageSerializerTest.php @@ -10,10 +10,10 @@ use PhpAmqpLib\Message\AMQPMessage; use Yiisoft\Queue\AMQP\Adapter; use Yiisoft\Queue\AMQP\Exception\NoKeyInPayloadException; -use Yiisoft\Queue\AMQP\MessageSerializer; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings; use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings; +use Yiisoft\Queue\Message\JsonMessageSerializer; /** * Testing message serialization options @@ -49,7 +49,7 @@ private function getCustomAdapter(string $queueExchangeName): Adapter $queueProvider ->withQueueSettings(new QueueSettings($queueExchangeName)) ->withExchangeSettings(new ExchangeSettings($queueExchangeName)), - new MessageSerializer(), + new JsonMessageSerializer(), $this->getLoop(), ); } diff --git a/tests/Unit/QueueProviderTest.php b/tests/Unit/QueueProviderTest.php index 897f257..6aefdd8 100644 --- a/tests/Unit/QueueProviderTest.php +++ b/tests/Unit/QueueProviderTest.php @@ -6,13 +6,13 @@ use Yiisoft\Queue\AMQP\Adapter; use Yiisoft\Queue\AMQP\Exception\ExchangeDeclaredException; -use Yiisoft\Queue\AMQP\MessageSerializer; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings; use Yiisoft\Queue\AMQP\Settings\ExchangeSettingsInterface; use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings; use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface; use Yiisoft\Queue\AMQP\Tests\Support\FileHelper; +use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Message\Message; final class QueueProviderTest extends UnitTestCase @@ -34,7 +34,7 @@ public function testWithQueueAndExchangeSettings(): void ->withExchangeSettings( new ExchangeSettings($this->exchangeName) ), - new MessageSerializer(), + new JsonMessageSerializer(), $this->getLoop(), ); @@ -81,7 +81,7 @@ public function testWithChannelNameExchangeDeclaredException(): void new ExchangeSettings('yii-queue-test-with-channel-name') ) ->withChannelName('yii-queue-test-channel-name'), - new MessageSerializer(), + new JsonMessageSerializer(), $this->getLoop(), ); } diff --git a/tests/Unit/QueueSettingsTest.php b/tests/Unit/QueueSettingsTest.php index 446e6db..c9acb93 100644 --- a/tests/Unit/QueueSettingsTest.php +++ b/tests/Unit/QueueSettingsTest.php @@ -7,10 +7,10 @@ use PhpAmqpLib\Exception\AMQPProtocolChannelException; use PhpAmqpLib\Wire\AMQPTable; use Yiisoft\Queue\AMQP\Adapter; -use Yiisoft\Queue\AMQP\MessageSerializer; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings; use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings; +use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Message\Message; final class QueueSettingsTest extends UnitTestCase @@ -40,7 +40,7 @@ public function testCommonSettings(): void ->withExchangeSettings( new ExchangeSettings('yii-queue-test-queue-common-settings') ), - new MessageSerializer(), + new JsonMessageSerializer(), $this->getLoop(), ); @@ -85,7 +85,7 @@ public function testArgumentsXExpires(): void ->withExchangeSettings( new ExchangeSettings('yii-queue-test-queue-settings-arg') ), - new MessageSerializer(), + new JsonMessageSerializer(), $this->getLoop(), ); diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 84f1619..b333213 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -8,8 +8,6 @@ use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\AMQP\Adapter; use Yiisoft\Queue\AMQP\Exception\NotImplementedException; -use Yiisoft\Queue\AMQP\MessageSerializer; -use Yiisoft\Queue\AMQP\MessageSerializerInterface; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\QueueProviderInterface; use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings; @@ -17,7 +15,9 @@ use Yiisoft\Queue\AMQP\Tests\Support\FileHelper; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Exception\JobFailureException; +use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Message\Message; +use Yiisoft\Queue\Message\MessageSerializerInterface; use Yiisoft\Queue\Queue; final class QueueTest extends UnitTestCase @@ -84,7 +84,7 @@ public function testListenWithException(): void $queueProvider ->withQueueSettings(new QueueSettings($this->queueName)) ->withExchangeSettings(new ExchangeSettings($this->exchangeName)), - new MessageSerializer(), + new JsonMessageSerializer(), $this->getLoop(), ); $queue = $this->getDefaultQueue($adapter); @@ -112,7 +112,7 @@ public function testListen(): void $adapter = new Adapter( $queueProvider ->withChannelName('yii-queue'), - new MessageSerializer(), + new JsonMessageSerializer(), $mockLoop, ); $queue = $this->getDefaultQueue($adapter); diff --git a/tests/Unit/UnitTestCase.php b/tests/Unit/UnitTestCase.php index 2f119c5..9bf6cd4 100644 --- a/tests/Unit/UnitTestCase.php +++ b/tests/Unit/UnitTestCase.php @@ -10,7 +10,6 @@ use Yiisoft\Injector\Injector; use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\AMQP\Adapter; -use Yiisoft\Queue\AMQP\MessageSerializer; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings; use Yiisoft\Queue\AMQP\Tests\Support\ExtendedSimpleMessageHandler; @@ -18,6 +17,7 @@ use Yiisoft\Queue\AMQP\Tests\Support\MainTestCase; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Cli\SignalLoop; +use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\Middleware\CallableFactory; use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; @@ -134,7 +134,7 @@ protected function getAdapter(): AdapterInterface { return $this->adapter ??= new Adapter( $this->getQueueProvider(), - new MessageSerializer(), + new JsonMessageSerializer(), $this->getLoop(), ); } diff --git a/tests/yii b/tests/yii index 7ae7c3e..15c3764 100755 --- a/tests/yii +++ b/tests/yii @@ -4,10 +4,7 @@ use PhpAmqpLib\Connection\AMQPStreamConnection; use Psr\Log\NullLogger; use Yiisoft\Injector\Injector; -use Yiisoft\Test\Support\Container\SimpleContainer; -use Yiisoft\Yii\Console\Application; use Yiisoft\Queue\AMQP\Adapter; -use Yiisoft\Queue\AMQP\MessageSerializer; use Yiisoft\Queue\AMQP\QueueProvider; use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings; use Yiisoft\Queue\AMQP\Tests\Support\FileHelper; @@ -15,6 +12,7 @@ use Yiisoft\Queue\AMQP\Tests\Support\SimpleMessageHandler; use Yiisoft\Queue\Cli\SignalLoop; use Yiisoft\Queue\Command\ListenCommand; use Yiisoft\Queue\Command\RunCommand; +use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Middleware\CallableFactory; use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher; use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume; @@ -24,6 +22,8 @@ use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush; use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; use Yiisoft\Queue\Queue; use Yiisoft\Queue\QueueFactory; +use Yiisoft\Test\Support\Container\SimpleContainer; +use Yiisoft\Yii\Console\Application; require_once dirname(__DIR__) . '/vendor/autoload.php'; @@ -53,7 +53,7 @@ $adapter = new Adapter( ), new QueueSettings(), ), - new MessageSerializer(), + new JsonMessageSerializer(), $loop, ); $queue = new Queue( From e70aba52574ef7e5a8fa2ecd99ce9282d096bf3c Mon Sep 17 00:00:00 2001 From: uzdevid Date: Sat, 7 Dec 2024 23:27:12 +0500 Subject: [PATCH 6/8] Change the method for getting the message ID --- tests/Unit/QueueTest.php | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index b333213..827a484 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -15,6 +15,7 @@ use Yiisoft\Queue\AMQP\Tests\Support\FileHelper; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Exception\JobFailureException; +use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Message\Message; use Yiisoft\Queue\Message\MessageSerializerInterface; @@ -41,7 +42,9 @@ public function testStatus(): void $this->expectException(NotImplementedException::class); $this->expectExceptionMessage("Status check is not supported by the adapter $adapterClass."); - $adapter->status($message->getId()); + + $messageId = (string)($message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null'); + $adapter->status($messageId); } /** From 4c4e44bd6324823c36059f18ec519b2420e8ba6d Mon Sep 17 00:00:00 2001 From: uzdevid Date: Sat, 7 Dec 2024 23:29:28 +0500 Subject: [PATCH 7/8] Change the deprecated method to a getter method. --- src/ExistingMessagesConsumer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ExistingMessagesConsumer.php b/src/ExistingMessagesConsumer.php index f01f97a..37d361b 100644 --- a/src/ExistingMessagesConsumer.php +++ b/src/ExistingMessagesConsumer.php @@ -38,7 +38,7 @@ public function consume(callable $callback): void false, function (AMQPMessage $amqpMessage) use ($callback): void { try { - $message = $this->serializer->unserialize($amqpMessage->body); + $message = $this->serializer->unserialize($amqpMessage->getBody()); if ($this->messageConsumed = $callback($message)) { $this->channel->basic_ack($amqpMessage->getDeliveryTag()); } From ae7516ac47577531f8748b191b68bb767d7980b6 Mon Sep 17 00:00:00 2001 From: uzdevid Date: Tue, 10 Dec 2024 12:06:36 +0500 Subject: [PATCH 8/8] Update implementation of AdapterInterface --- src/Adapter.php | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/Adapter.php b/src/Adapter.php index adcae0a..b992a7b 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -19,12 +19,22 @@ final class Adapter implements AdapterInterface { * @param QueueProviderInterface $queueProvider * @param MessageSerializerInterface $serializer * @param LoopInterface $loop + * @param string $channel */ public function __construct( private QueueProviderInterface $queueProvider, private readonly MessageSerializerInterface $serializer, private readonly LoopInterface $loop, + private string $channel ) { + + } + + /** + * @return string + */ + public function getChannelName(): string { + return $this->channel; } /** @@ -32,8 +42,12 @@ public function __construct( * @return $this */ public function withChannel(string $channel): self { + if ($channel === $this->channel) { + return $this; + } + $new = clone $this; - $new->queueProvider = $this->queueProvider->withChannelName($channel); + $new->channel = $channel; return $new; }