Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Omit custom handlers #96

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\MessageInterface;

final class Adapter implements AdapterInterface
Expand All @@ -23,64 +24,63 @@ public function __construct(

public function withChannel(string $channel): self
{
$instance = clone $this;
$instance->queueProvider = $this->queueProvider->withChannelName($channel);
$new = clone $this;
$new->queueProvider = $this->queueProvider->withChannelName($channel);

return $instance;
return $new;
}

/**
* @param callable(MessageInterface): bool $handlerCallback
* @param callable(MessageInterface): bool $handlerCallback
*/
public function runExisting(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
(new ExistingMessagesConsumer($channel, $this->queueProvider
->getQueueSettings()
->getName(), $this->serializer))
->consume($handlerCallback);
$queueName = $this->queueProvider->getQueueSettings()->getName();
$consumer = new ExistingMessagesConsumer(
$channel,
$queueName,
$this->serializer
);

$consumer->consume($handlerCallback);
}

/**
* @return never
*/
public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
$payload,
array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
);
$exchangeSettings = $this->queueProvider->getExchangeSettings();
$this->queueProvider
->getChannel()
->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
$channel = $this->queueProvider->getChannel();
$channel->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');
$message->setId($messageId);

return new IdEnvelope($message, $messageId);
}

public function subscribe(callable $handlerCallback): void
{
$channel = $this->queueProvider->getChannel();
$queueName = $this->queueProvider->getQueueSettings()->getName();

$channel->basic_consume(
$this->queueProvider
->getQueueSettings()
->getName(),
$this->queueProvider
->getQueueSettings()
->getName(),
$queueName,
$queueName,
false,
false,
false,
Expand Down
3 changes: 2 additions & 1 deletion src/MessageSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use InvalidArgumentException;
use JsonException;
use Yiisoft\Queue\AMQP\Exception\NoKeyInPayloadException;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Message\MessageInterface;

Expand All @@ -18,7 +19,7 @@
public function serialize(MessageInterface $message): string
{
$payload = [
'id' => $message->getId(),
'id' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? null,
'name' => $message->getHandlerName(),
'data' => $message->getData(),
'meta' => $message->getMetadata(),
Expand Down Expand Up @@ -54,7 +55,7 @@
throw new NoKeyInPayloadException('meta', $payload);
}

return new Message(

Check failure on line 58 in src/MessageSerializer.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.0-ubuntu-latest

TooManyArguments

src/MessageSerializer.php:58:16: TooManyArguments: Too many arguments for Yiisoft\Queue\Message\Message::__construct - expecting 3 but saw 4 (see https://psalm.dev/026)

Check failure on line 58 in src/MessageSerializer.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.1-ubuntu-latest

TooManyArguments

src/MessageSerializer.php:58:16: TooManyArguments: Too many arguments for Yiisoft\Queue\Message\Message::__construct - expecting 3 but saw 4 (see https://psalm.dev/026)

Check failure on line 58 in src/MessageSerializer.php

View workflow job for this annotation

GitHub Actions / psalm / PHP 8.2-ubuntu-latest

TooManyArguments

src/MessageSerializer.php:58:16: TooManyArguments: Too many arguments for Yiisoft\Queue\Message\Message::__construct - expecting 3 but saw 4 (see https://psalm.dev/026)
$name,
$payload['data'] ?? null,
$meta,
Expand Down
41 changes: 22 additions & 19 deletions src/Middleware/DelayMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,27 @@ public function processPush(PushRequest $request, MessageHandlerPushInterface $h
{
$adapter = $request->getAdapter();
if (!$adapter instanceof Adapter) {
$type = get_debug_type($adapter);
$class = Adapter::class;
throw new InvalidArgumentException(
"This middleware works only with the $class. $type given."
sprintf(
'This middleware works only with the %s. %s given.',
Adapter::class,
get_debug_type($adapter)
)
);
}

$queueProvider = $adapter->getQueueProvider();
$exchangeSettings = $this->getExchangeSettings($queueProvider->getExchangeSettings());
$queueSettings = $this->getQueueSettings($queueProvider->getQueueSettings(), $queueProvider->getExchangeSettings());
$originalExchangeSettings = $queueProvider->getExchangeSettings();
$delayedExchangeSettings = $this->getExchangeSettings($originalExchangeSettings);
$queueSettings = $this->getQueueSettings(
$queueProvider->getQueueSettings(),
$originalExchangeSettings
);

$adapter = $adapter->withQueueProvider(
$queueProvider
->withMessageProperties($this->getMessageProperties($queueProvider))
->withExchangeSettings($exchangeSettings)
->withExchangeSettings($delayedExchangeSettings)
->withQueueSettings($queueSettings)
);

Expand All @@ -78,20 +85,17 @@ private function getMessageProperties(QueueProviderInterface $queueProvider): ar

private function getQueueSettings(
QueueSettingsInterface $queueSettings,
?ExchangeSettingsInterface $exchangeSettings
?ExchangeSettingsInterface $originalExchangeSettings
): QueueSettingsInterface {
$deliveryTime = time() + $this->delayInSeconds;

$arguments = [
'x-dead-letter-exchange' => ['S', $originalExchangeSettings?->getName() ?? ''],
'x-dead-routing-key' => ['S', $queueSettings->getName()],
'x-expires' => ['I', $this->delayInSeconds * 1000 + 30_000],
'x-message-ttl' => ['I', $this->delayInSeconds * 1000],
];
return $queueSettings
->withName("{$queueSettings->getName()}.dlx.$deliveryTime")
->withAutoDeletable(true)
->withArguments(
[
'x-dead-letter-exchange' => ['S', $exchangeSettings?->getName() ?? ''],
'x-expires' => ['I', $this->delayInSeconds * 1000 + 30000],
'x-message-ttl' => ['I', $this->delayInSeconds * 1000],
]
);
->withName("{$queueSettings->getName()}.dlx")
->withArguments($arguments);
}

/**
Expand All @@ -104,7 +108,6 @@ private function getExchangeSettings(?ExchangeSettingsInterface $exchangeSetting
/** @noinspection NullPointerExceptionInspection */
return $exchangeSettings
?->withName("{$exchangeSettings->getName()}.dlx")
->withAutoDelete(true)
->withType(AMQPExchangeType::TOPIC);
}
}
1 change: 1 addition & 0 deletions src/QueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public function __construct(
public function __destruct()
{
$this->channel?->close();
unset($this->channel);
}

public function getChannel(): AMQPChannel
Expand Down
2 changes: 1 addition & 1 deletion src/Settings/Exchange.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
private string $type = AMQPExchangeType::DIRECT,
private bool $passive = false,
private bool $durable = false,
private bool $autoDelete = true,
private bool $autoDelete = false,
private bool $internal = false,
private bool $nowait = false,
private AMQPTable|array $arguments = [],
Expand Down
2 changes: 1 addition & 1 deletion src/Settings/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public function __construct(
private bool $passive = false,
private bool $durable = false,
private bool $exclusive = false,
private bool $autoDelete = true,
private bool $autoDelete = false,
private bool $nowait = false,
private AMQPTable|array $arguments = [],
private ?int $ticket = null
Expand Down
6 changes: 2 additions & 4 deletions tests/Integration/DelayMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public function testMainFlow(): void

$time = time();
$queue->push(
new Message('simple', 'test-delay-middleware-main'),
new Message(SimpleMessageHandler::class, 'test-delay-middleware-main'),
new DelayMiddleware(3),
);

Expand Down Expand Up @@ -96,9 +96,7 @@ private function makeQueue(AdapterInterface $adapter): Queue
$this->createMock(LoggerInterface::class),
new PushMiddlewareDispatcher(
new MiddlewareFactoryPush(
new SimpleContainer([
'simple' => new SimpleMessageHandler(new FileHelper()),
]),
new SimpleContainer([]),
new CallableFactory($this->createMock(ContainerInterface::class)),
),
),
Expand Down
10 changes: 9 additions & 1 deletion tests/Integration/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,21 @@ protected function tearDown(): void

protected function queueListen(?string $queue = null): void
{
// TODO Fail test on subprocess error exit code
$command = [PHP_BINARY, dirname(__DIR__) . '/yii', 'queue:listen'];
if ($queue !== null) {
$command[] = "--channel=$queue";
}
$process = new Process($command);
$this->processes[] = $process;
$process->start();

if ($process->isTerminated()) {
throw new \RuntimeException(
sprintf(
"Failed to start queue:listen process: \n%s",
!empty($process->getErrorOutput()) ? $process->getErrorOutput() : $process->getOutput()
)
);
}
}
}
19 changes: 19 additions & 0 deletions tests/Support/ExceptionListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\AMQP\Tests\Support;

use PHPUnit\Util\Exception as PHPUnitException;
use Yiisoft\Queue\Message\MessageInterface;

final class ExceptionListener
{
public function __invoke(MessageInterface $message): void
{
$data = $message->getData();
if (null !== $data) {
throw new PHPUnitException((string) $data['payload']['time']);
}
}
}
2 changes: 1 addition & 1 deletion tests/Support/ExtendedSimpleMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public function __construct(private FileHelper $fileHelper)
{
}

public function handle(MessageInterface $message): void
public function __invoke(MessageInterface $message): void
{
$data = $message->getData();
if (null !== $data) {
Expand Down
4 changes: 2 additions & 2 deletions tests/Support/FakeAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public function runExisting(callable $handlerCallback): void
// TODO: Implement runExisting() method.
}

public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
// TODO: Implement status() method.
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
// TODO: Implement push() method.
}
Expand Down
18 changes: 15 additions & 3 deletions tests/Support/FileHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ final class FileHelper
*/
public function put(string $fileName, int|string $data): void
{
if (file_put_contents("{$this->getRuntimeDir()}/$fileName", $data) === false) {
throw new RuntimeException("Runtime dir {$this->getRuntimeDir()} or file $fileName are not writable.");
$path = $this->getRuntimeDir() . DIRECTORY_SEPARATOR . $fileName;
if (file_put_contents($path, $data) === false) {
throw new RuntimeException(
sprintf(
'Runtime dir %"s" or file "%s" are not writable.',
$this->getRuntimeDir(),
$fileName
)
);
}
}

Expand All @@ -28,7 +35,12 @@ public function get(string $filename): ?string

$result = file_get_contents($path);
if ($result === false) {
throw new RuntimeException("File '$path' exists but is not readable.");
throw new RuntimeException(
sprintf(
'File "%s" exists but is not readable.',
$path
)
);
}

return $result;
Expand Down
16 changes: 8 additions & 8 deletions tests/Unit/MessageSerializerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings;
use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings;
use Yiisoft\Queue\AMQP\Tests\Support\ExtendedSimpleMessageHandler;

/**
* Testing message serialization options
Expand All @@ -27,11 +28,10 @@ final class MessageSerializerTest extends UnitTestCase
*/
private function publishWithAMQPLib(string $queue, string $exchange, AMQPMessage $message): void
{
$channel = $this
->createConnection()
->channel();
$channel = $this->createConnection()->channel();

$channel->queue_declare($queue);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT);
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, auto_delete: true);
$channel->queue_bind($queue, $exchange);
$channel->basic_publish($message, $exchange);
}
Expand All @@ -47,8 +47,8 @@ private function getCustomAdapter(string $queueExchangeName): Adapter
);
return new Adapter(
$queueProvider
->withQueueSettings(new QueueSettings($queueExchangeName))
->withExchangeSettings(new ExchangeSettings($queueExchangeName)),
->withQueueSettings(new QueueSettings($queueExchangeName, autoDelete: true))
->withExchangeSettings(new ExchangeSettings($queueExchangeName, autoDelete: true)),
new MessageSerializer(),
$this->getLoop(),
);
Expand Down Expand Up @@ -80,7 +80,7 @@ public function testNoKeyInPayloadExceptionId(): void
$queueExchangeName,
$queueExchangeName,
new AMQPMessage(
json_encode(['name' => 'ext-simple', 'id' => 1], JSON_THROW_ON_ERROR),
json_encode(['name' => ExtendedSimpleMessageHandler::class, 'id' => 1], JSON_THROW_ON_ERROR),
['content_type' => 'text/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
)
);
Expand All @@ -100,7 +100,7 @@ public function testNoKeyInPayloadExceptionMeta(): void
$queueExchangeName,
$queueExchangeName,
new AMQPMessage(
json_encode(['name' => 'ext-simple', 'meta' => ''], JSON_THROW_ON_ERROR),
json_encode(['name' => ExtendedSimpleMessageHandler::class, 'meta' => ''], JSON_THROW_ON_ERROR),
['content_type' => 'text/json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
)
);
Expand Down
Loading
Loading