diff --git a/src/Adapter/AdapterInterface.php b/src/Adapter/AdapterInterface.php index 856355bc..0d8ceeda 100644 --- a/src/Adapter/AdapterInterface.php +++ b/src/Adapter/AdapterInterface.php @@ -20,7 +20,7 @@ public function runExisting(callable $handlerCallback): void; /** * Returns status code of a message with the given id. * - * @param string $id ID of a job message. + * @param int|string $id ID of a job message. * * @throws InvalidArgumentException When there is no such id in the adapter. * @@ -41,4 +41,6 @@ public function push(MessageInterface $message): MessageInterface; public function subscribe(callable $handlerCallback): void; public function withChannel(string $channel): self; + + public function getChannelName(): string; } diff --git a/src/Adapter/SynchronousAdapter.php b/src/Adapter/SynchronousAdapter.php index d2ee489b..84148941 100644 --- a/src/Adapter/SynchronousAdapter.php +++ b/src/Adapter/SynchronousAdapter.php @@ -86,4 +86,9 @@ public function withChannel(string $channel): self return $new; } + + public function getChannelName(): string + { + return $this->channel; + } } diff --git a/src/Debug/QueueCollector.php b/src/Debug/QueueCollector.php index 1a8c9d76..055cbde0 100644 --- a/src/Debug/QueueCollector.php +++ b/src/Debug/QueueCollector.php @@ -51,13 +51,17 @@ public function collectStatus(string $id, JobStatus $status): void } public function collectPush( - string $channel, + ?string $channel, MessageInterface $message, string|array|callable|MiddlewarePushInterface ...$middlewareDefinitions, ): void { if (!$this->isActive()) { return; } + if ($channel === null) { + $channel = 'null'; + } + $this->pushes[$channel][] = [ 'message' => $message, 'middlewares' => $middlewareDefinitions, @@ -69,7 +73,7 @@ public function collectWorkerProcessing(MessageInterface $message, QueueInterfac if (!$this->isActive()) { return; } - $this->processingMessages[$queue->getChannelName()][] = $message; + $this->processingMessages[$queue->getChannelName() ?? 'null'][] = $message; } private function reset(): void diff --git a/src/Debug/QueueDecorator.php b/src/Debug/QueueDecorator.php index ed174393..e6b83f8e 100644 --- a/src/Debug/QueueDecorator.php +++ b/src/Debug/QueueDecorator.php @@ -21,7 +21,7 @@ public function __construct( public function status(string|int $id): JobStatus { $result = $this->queue->status($id); - $this->collector->collectStatus($id, $result); + $this->collector->collectStatus((string) $id, $result); return $result; } @@ -50,15 +50,8 @@ public function withAdapter(AdapterInterface $adapter): QueueInterface return new self($this->queue->withAdapter($adapter), $this->collector); } - public function getChannelName(): string + public function getChannelName(): ?string { return $this->queue->getChannelName(); } - - public function withChannelName(string $channel): QueueInterface - { - $new = clone $this; - $new->queue = $this->queue->withChannelName($channel); - return $new; - } } diff --git a/src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php b/src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php index af11386e..bbc4e38b 100644 --- a/src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php +++ b/src/Middleware/FailureHandling/FailureMiddlewareDispatcher.php @@ -37,6 +37,7 @@ public function dispatch( FailureHandlingRequest $request, MessageFailureHandlerInterface $finishHandler ): FailureHandlingRequest { + /** @var string $channelName It is always string in this context */ $channelName = $request->getQueue()->getChannelName(); if (!isset($this->middlewareDefinitions[$channelName]) || $this->middlewareDefinitions[$channelName] === []) { $channelName = self::DEFAULT_PIPELINE; diff --git a/src/Provider/AdapterFactoryQueueProvider.php b/src/Provider/AdapterFactoryQueueProvider.php index ffe777f5..7f9cca30 100644 --- a/src/Provider/AdapterFactoryQueueProvider.php +++ b/src/Provider/AdapterFactoryQueueProvider.php @@ -85,7 +85,7 @@ private function getOrTryToCreate(string $channel): QueueInterface|null ), ); } - $this->queues[$channel] = $this->baseQueue->withAdapter($adapter)->withChannelName($channel); + $this->queues[$channel] = $this->baseQueue->withAdapter($adapter->withChannel($channel)); } else { $this->queues[$channel] = null; } diff --git a/src/Provider/PrototypeQueueProvider.php b/src/Provider/PrototypeQueueProvider.php index 53b0d6e2..4b7a2d4b 100644 --- a/src/Provider/PrototypeQueueProvider.php +++ b/src/Provider/PrototypeQueueProvider.php @@ -4,6 +4,7 @@ namespace Yiisoft\Queue\Provider; +use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\QueueInterface; /** @@ -17,12 +18,13 @@ final class PrototypeQueueProvider implements QueueProviderInterface */ public function __construct( private readonly QueueInterface $baseQueue, + private readonly AdapterInterface $baseAdapter, ) { } public function get(string $channel): QueueInterface { - return $this->baseQueue->withChannelName($channel); + return $this->baseQueue->withAdapter($this->baseAdapter->withChannel($channel)); } public function has(string $channel): bool diff --git a/src/Queue.php b/src/Queue.php index 6544c1b9..e8691f78 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -32,16 +32,15 @@ public function __construct( private LoggerInterface $logger, private PushMiddlewareDispatcher $pushMiddlewareDispatcher, private ?AdapterInterface $adapter = null, - private string $channelName = QueueInterface::DEFAULT_CHANNEL_NAME, MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions ) { $this->middlewareDefinitions = $middlewareDefinitions; $this->adapterPushHandler = new AdapterPushHandler(); } - public function getChannelName(): string + public function getChannelName(): ?string { - return $this->channelName; + return $this->adapter?->getChannelName(); } public function push( @@ -136,14 +135,6 @@ public function withMiddlewaresAdded(MiddlewarePushInterface|callable|array|stri return $instance; } - public function withChannelName(string $channel): self - { - $instance = clone $this; - $instance->channelName = $channel; - $instance->adapter = $this->adapter?->withChannel($channel); - return $instance; - } - private function handle(MessageInterface $message): bool { $this->worker->process($message, $this); diff --git a/src/QueueInterface.php b/src/QueueInterface.php index 2204a4b1..0f8b8172 100644 --- a/src/QueueInterface.php +++ b/src/QueueInterface.php @@ -36,7 +36,7 @@ public function run(int $max = 0): int; public function listen(): void; /** - * @param string $id A message id + * @param int|string $id A message id * * @throws InvalidArgumentException when there is no such id in the adapter * @@ -46,7 +46,5 @@ public function status(string|int $id): JobStatus; public function withAdapter(AdapterInterface $adapter): self; - public function getChannelName(): string; - - public function withChannelName(string $channel): self; + public function getChannelName(): ?string; } diff --git a/stubs/StubAdapter.php b/stubs/StubAdapter.php index b47c850b..0a8e06b2 100644 --- a/stubs/StubAdapter.php +++ b/stubs/StubAdapter.php @@ -7,12 +7,17 @@ use Yiisoft\Queue\Adapter\AdapterInterface; use Yiisoft\Queue\Enum\JobStatus; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\QueueInterface; /** * Stub adapter that does nothing. Job status is always "done". */ final class StubAdapter implements AdapterInterface { + public function __construct(private string $channelName = QueueInterface::DEFAULT_CHANNEL_NAME) + { + } + public function runExisting(callable $handlerCallback): void { } @@ -33,6 +38,14 @@ public function subscribe(callable $handlerCallback): void public function withChannel(string $channel): AdapterInterface { - return clone $this; + $new = clone $this; + $new->channelName = $channel; + + return $new; + } + + public function getChannelName(): string + { + return $this->channelName; } } diff --git a/stubs/StubQueue.php b/stubs/StubQueue.php index 8436b46a..e4b8708b 100644 --- a/stubs/StubQueue.php +++ b/stubs/StubQueue.php @@ -15,10 +15,8 @@ */ final class StubQueue implements QueueInterface { - public function __construct( - private string $channelName = QueueInterface::DEFAULT_CHANNEL_NAME, - private ?AdapterInterface $adapter = null, - ) { + public function __construct(private ?AdapterInterface $adapter = null) + { } public function push( @@ -51,21 +49,12 @@ public function withAdapter(AdapterInterface $adapter): QueueInterface { $new = clone $this; $new->adapter = $adapter; - return $new; - } - public function getChannelName(): string - { - return $this->channelName; + return $new; } - public function withChannelName(string $channel): QueueInterface + public function getChannelName(): ?string { - $new = clone $this; - $new->channelName = $channel; - if ($new->adapter !== null) { - $new->adapter = $new->adapter->withChannel($channel); - } - return $new; + return $this->adapter?->getChannelName(); } } diff --git a/tests/App/DummyQueue.php b/tests/App/DummyQueue.php index 42125909..05dd1801 100644 --- a/tests/App/DummyQueue.php +++ b/tests/App/DummyQueue.php @@ -47,9 +47,4 @@ public function getChannelName(): string { return $this->channelName; } - - public function withChannelName(string $channel): QueueInterface - { - throw new Exception('`withChannelName()` method is not implemented yet.'); - } } diff --git a/tests/App/FakeAdapter.php b/tests/App/FakeAdapter.php index 8caa67c9..aec34910 100644 --- a/tests/App/FakeAdapter.php +++ b/tests/App/FakeAdapter.php @@ -43,4 +43,9 @@ public function withChannel(string $channel): AdapterInterface return $instance; } + + public function getChannelName(): string + { + return $this->channel; + } } diff --git a/tests/Benchmark/Support/VoidAdapter.php b/tests/Benchmark/Support/VoidAdapter.php index edd927aa..b1a42966 100644 --- a/tests/Benchmark/Support/VoidAdapter.php +++ b/tests/Benchmark/Support/VoidAdapter.php @@ -49,4 +49,9 @@ public function withChannel(string $channel): AdapterInterface { throw new RuntimeException('Method is not implemented'); } + + public function getChannelName(): string + { + throw new RuntimeException('Method is not implemented'); + } } diff --git a/tests/Unit/Debug/QueueDecoratorTest.php b/tests/Unit/Debug/QueueDecoratorTest.php index 646787ac..37e47112 100644 --- a/tests/Unit/Debug/QueueDecoratorTest.php +++ b/tests/Unit/Debug/QueueDecoratorTest.php @@ -99,19 +99,6 @@ public function testGetChannelName(): void $this->assertEquals('getChannelName', $decorator->getChannelName()); } - public function testWithChannelName(): void - { - $queue = $this->createMock(QueueInterface::class); - $queue->expects($this->once())->method('withChannelName')->willReturn($queue); - $collector = new QueueCollector(); - $decorator = new QueueDecorator( - $queue, - $collector, - ); - - $this->assertInstanceOf(QueueInterface::class, $decorator->withChannelName('test')); - } - public function testImmutable(): void { $queueDecorator = new QueueDecorator( @@ -119,6 +106,5 @@ public function testImmutable(): void new QueueCollector() ); $this->assertNotSame($queueDecorator, $queueDecorator->withAdapter(new FakeAdapter())); - $this->assertNotSame($queueDecorator, $queueDecorator->withChannelName('test')); } } diff --git a/tests/Unit/Provider/CompositeQueueProviderTest.php b/tests/Unit/Provider/CompositeQueueProviderTest.php index 491017ee..7c33de2b 100644 --- a/tests/Unit/Provider/CompositeQueueProviderTest.php +++ b/tests/Unit/Provider/CompositeQueueProviderTest.php @@ -15,7 +15,7 @@ final class CompositeQueueProviderTest extends TestCase { public function testBase(): void { - $queue = new StubQueue('channel'); + $queue = new StubQueue(new StubAdapter()); $provider = new CompositeQueueProvider( new AdapterFactoryQueueProvider( $queue, @@ -38,7 +38,10 @@ public function testBase(): void public function testNotFound(): void { $provider = new CompositeQueueProvider( - new AdapterFactoryQueueProvider(new StubQueue('channel'), ['channel1' => new StubAdapter()]), + new AdapterFactoryQueueProvider( + new StubQueue(new StubAdapter()), + ['channel1' => new StubAdapter()] + ), ); $this->expectException(ChannelNotFoundException::class); diff --git a/tests/Unit/Provider/PrototypeQueueProviderTest.php b/tests/Unit/Provider/PrototypeQueueProviderTest.php index 601a0f24..f8fa4c42 100644 --- a/tests/Unit/Provider/PrototypeQueueProviderTest.php +++ b/tests/Unit/Provider/PrototypeQueueProviderTest.php @@ -6,6 +6,7 @@ use PHPUnit\Framework\TestCase; use Yiisoft\Queue\Provider\PrototypeQueueProvider; +use Yiisoft\Queue\Stubs\StubAdapter; use Yiisoft\Queue\Stubs\StubQueue; final class PrototypeQueueProviderTest extends TestCase @@ -14,6 +15,7 @@ public function testBase(): void { $provider = new PrototypeQueueProvider( new StubQueue(), + new StubAdapter(), ); $queue = $provider->get('test-channel'); diff --git a/tests/Unit/Stubs/StubQueueTest.php b/tests/Unit/Stubs/StubQueueTest.php index 29efe050..548705ab 100644 --- a/tests/Unit/Stubs/StubQueueTest.php +++ b/tests/Unit/Stubs/StubQueueTest.php @@ -6,7 +6,6 @@ use PHPUnit\Framework\TestCase; use Yiisoft\Queue\Message\Message; -use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Stubs\StubQueue; use Yiisoft\Queue\Stubs\StubAdapter; @@ -20,7 +19,7 @@ public function testBase(): void $this->assertSame($message, $queue->push($message)); $this->assertSame(0, $queue->run()); $this->assertTrue($queue->status('test')->isDone()); - $this->assertSame(QueueInterface::DEFAULT_CHANNEL_NAME, $queue->getChannelName()); + $this->assertNull($queue->getChannelName()); $this->assertNull($queue->getAdapter()); $queue->listen(); } @@ -34,15 +33,4 @@ public function testWithAdapter(): void $this->assertNotSame($queue, $sourceQueue); $this->assertInstanceOf(StubAdapter::class, $queue->getAdapter()); } - - public function testWithChannelName(): void - { - $sourceQueue = new StubQueue(); - - $queue = $sourceQueue->withChannelName('test'); - - $this->assertNotSame($queue, $sourceQueue); - $this->assertSame(QueueInterface::DEFAULT_CHANNEL_NAME, $sourceQueue->getChannelName()); - $this->assertSame('test', $queue->getChannelName()); - } }