From 69bef4cee6cfd5152135948c10c52bb22f6c7910 Mon Sep 17 00:00:00 2001 From: aegorov Date: Tue, 10 Dec 2024 00:17:57 +0300 Subject: [PATCH] move MessageTest.php to Unit --- src/Adapter.php | 23 ++++++++++++++++------- src/QueueProvider.php | 8 +++++++- src/QueueProviderInterface.php | 2 ++ 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index c503559..2d05b9e 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -14,11 +14,12 @@ final class Adapter implements AdapterInterface { public function __construct( - private QueueProviderInterface $provider, + private QueueProviderInterface $provider, private MessageSerializerInterface $serializer, - private LoopInterface $loop, - private int $timeout = 3 - ) { + private LoopInterface $loop, + private int $timeout = 3 + ) + { } public function runExisting(callable $handlerCallback): void @@ -66,15 +67,18 @@ public function push(MessageInterface $message): MessageInterface public function subscribe(callable $handlerCallback): void { - while ($this->loop->canContinue()) { + $continue = true; + while ($continue) { $message = $this->reserve(); if (null === $message) { + $continue = $this->loop->canContinue(); continue; } $result = $handlerCallback($message); - if ($result) { - $this->provider->delete((string) $message->getId()); + $this->provider->delete((string) $message->getId()); + if (!$result) { + $continue = false; } } } @@ -99,4 +103,9 @@ private function reserve(): ?IdEnvelope return $envelope; } + + public function getChannelName(): string + { + return $this->provider->getChannelName(); + } } diff --git a/src/QueueProvider.php b/src/QueueProvider.php index d065f4c..b684e2a 100644 --- a/src/QueueProvider.php +++ b/src/QueueProvider.php @@ -17,7 +17,8 @@ class QueueProvider implements QueueProviderInterface public function __construct( private \Redis $redis, //redis connection, private string $channelName = self::DEFAULT_CHANNEL_NAME - ) { + ) + { } /** @@ -145,4 +146,9 @@ private function checkConnection(): void throw new NotConnectedRedisException('Redis is not connected.'); } } + + public function getChannelName(): string + { + return $this->channelName; + } } diff --git a/src/QueueProviderInterface.php b/src/QueueProviderInterface.php index 1bce6fc..f59b523 100644 --- a/src/QueueProviderInterface.php +++ b/src/QueueProviderInterface.php @@ -20,4 +20,6 @@ public function existInWaiting(int $id): bool; public function existInReserved(int $id): bool; public function withChannelName(string $channelName): self; + + public function getChannelName(): string; }