Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating the yiisoft/queue library #6

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
final class Adapter implements AdapterInterface
{
public function __construct(
private QueueProviderInterface $provider,
private QueueProviderInterface $provider,
private MessageSerializerInterface $serializer,
private LoopInterface $loop,
private int $timeout = 3
) {
private LoopInterface $loop,
private int $timeout = 3
)
{
}

public function runExisting(callable $handlerCallback): void
Expand Down Expand Up @@ -66,15 +67,18 @@ public function push(MessageInterface $message): MessageInterface

public function subscribe(callable $handlerCallback): void
{
while ($this->loop->canContinue()) {
$continue = true;
while ($continue) {
$message = $this->reserve();
if (null === $message) {
$continue = $this->loop->canContinue();
continue;
}

$result = $handlerCallback($message);
if ($result) {
$this->provider->delete((string) $message->getId());
$this->provider->delete((string) $message->getId());
if (!$result) {
$continue = false;
}
}
}
Expand All @@ -99,4 +103,9 @@ private function reserve(): ?IdEnvelope

return $envelope;
}

public function getChannelName(): string
{
return $this->provider->getChannelName();
}
}
14 changes: 10 additions & 4 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ final class Message implements MessageInterface
{
public function __construct(
private string $handlerName,
private mixed $data,
private array $metadata,
private int $delay = 0 //delay in seconds
) {
private mixed $data,
private array $metadata,
private int $delay = 0 //delay in seconds
)
{
if ($this->delay > 0) {
$this->metadata['delay'] = $delay;
}
Expand All @@ -40,4 +41,9 @@ public function getMetadata(): array
{
return $this->metadata;
}

public static function fromData(string $handlerName, mixed $data, array $metadata = []): MessageInterface
{
return new self($handlerName, $data, $metadata, $metadata['delay'] ? (int)$metadata['delay'] : 0);
}
}
8 changes: 7 additions & 1 deletion src/QueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class QueueProvider implements QueueProviderInterface
{
private const DEFAULT_CHANNEL_NAME = 'yii-queue';

Check failure on line 12 in src/QueueProvider.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.3-ubuntu-latest

MissingClassConstType

src/QueueProvider.php:12:19: MissingClassConstType: Class constant "Yiisoft\Queue\Redis\QueueProvider::DEFAULT_CHANNEL_NAME" should have a declared type. (see https://psalm.dev/359)

/**
* @throws RedisException
Expand All @@ -17,7 +17,8 @@
public function __construct(
private \Redis $redis, //redis connection,
private string $channelName = self::DEFAULT_CHANNEL_NAME
) {
)
{
}

/**
Expand Down Expand Up @@ -152,4 +153,9 @@
throw new NotConnectedRedisException('Redis is not connected.');
}
}

public function getChannelName(): string
{
return $this->channelName;
}
}
2 changes: 2 additions & 0 deletions src/QueueProviderInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ public function existInWaiting(int $id): bool;
public function existInReserved(int $id): bool;

public function withChannelName(string $channelName): self;

public function getChannelName(): string;
}
12 changes: 12 additions & 0 deletions tests/Integration/QueueProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,16 @@ public function testDelay(QueueProvider $provider): void
$reserv = $provider->reserve($id);
$this->assertNotNull($reserv);
}

/**
* @depends test__construct
*/
public function testWithChannelName(QueueProvider $provider): void
{
self::assertEquals('test', $provider->getChannelName());
$providerOther = $provider->withChannelName('test');
self::assertEquals($providerOther->getChannelName(), $provider->getChannelName());
$providerOther = $provider->withChannelName('test1');
self::assertEquals('test1', $providerOther->getChannelName());
}
}
13 changes: 12 additions & 1 deletion tests/Integration/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Yiisoft\Queue\Redis\Adapter;
use Yiisoft\Queue\Redis\QueueProvider;
use Yiisoft\Queue\Redis\QueueProviderInterface;
use Yiisoft\Queue\Redis\Reserve;
use Yiisoft\Queue\Redis\Tests\Support\FileHelper;
use Yiisoft\Queue\Redis\Tests\Support\IntegrationTestCase;

Expand Down Expand Up @@ -85,6 +86,7 @@ public function testListen(): void
$mockLoop,
);
$queue = $this->getDefaultQueue($adapter);
self::assertEquals('yii-queue', $adapter->getChannelName());

$queue->push(
new Message('ext-simple', ['file_name' => 'test-listen' . $time, 'payload' => ['time' => $time]])
Expand Down Expand Up @@ -124,7 +126,10 @@ public function testAdapterStatusException()
public function testAdapterNullMessage()
{
$provider = $this->createMock(QueueProviderInterface::class);
$provider->method('reserve')->willReturn(null);
$provider->method('reserve')->willReturnOnConsecutiveCalls(
null, null, null, new Reserve(1, '{"name":"handler"}')
);
$provider->method('delete');

$mockLoop = $this->createMock(LoopInterface::class);
$mockLoop->expects($this->exactly(2))->method('canContinue')->willReturn(true, false);
Expand All @@ -145,5 +150,11 @@ public function testAdapterNullMessage()
$notUseHandler = false;
});
$this->assertTrue($notUseHandler);

$adapter->subscribe(function (MessageInterface $message) use (&$notUseHandler): mixed {
$notUseHandler = false;
return null;
});
$this->assertFalse($notUseHandler);
}
}
11 changes: 11 additions & 0 deletions tests/Unit/Message/MessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,15 @@ public function testWithDelay(): void
$this->assertNotSame($message, $delayedMessage);
$this->assertEquals(5, $delayedMessage->getMetadata()['delay']);
}

public function testFromData(): void
{
$message = Message::fromData('test-handler', ['data' => 'test-data'], ['delay' => 2]);
self::assertEquals('test-handler', $message->getHandlerName());
self::assertEquals(['data' => 'test-data'], $message->getData());
self::assertEquals(['delay' => 2], $message->getMetadata());

$message = Message::fromData('test-handler', ['data' => 'test-data'], ['delay' => '3']);
self::assertEquals(['delay' => 3], $message->getMetadata());
}
}
8 changes: 8 additions & 0 deletions tests/Unit/QueueProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ public function test__construct()
return $provider;
}

/**
* @depends test__construct
*/
public function testGetChannelName(QueueProvider $provider)
{
self::assertEquals('test', $provider->getChannelName());
}

/**
* @depends test__construct
* @throws \PHPUnit\Framework\MockObject\Exception
Expand Down
Loading