Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Adapter.php #110

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

declare(strict_types=1);

use Yiisoft\Queue\AMQP\MessageSerializer;
use Yiisoft\Queue\AMQP\MessageSerializerInterface;
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Queue\AMQP\Settings\Queue;
use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface;

return [
MessageSerializerInterface::class => MessageSerializer::class,
QueueProviderInterface::class => QueueProvider::class,
QueueSettingsInterface::class => Queue::class,
];
129 changes: 82 additions & 47 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,84 +10,114 @@
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

final class Adapter implements AdapterInterface
{
final class Adapter implements AdapterInterface {
/**
* @param QueueProviderInterface $queueProvider
* @param MessageSerializerInterface $serializer
* @param LoopInterface $loop
* @param string $channel
*/
public function __construct(
private QueueProviderInterface $queueProvider,
private MessageSerializerInterface $serializer,
private LoopInterface $loop,
private QueueProviderInterface $queueProvider,
private readonly MessageSerializerInterface $serializer,
private readonly LoopInterface $loop,
private string $channel
) {

}

public function withChannel(string $channel): self
{
$instance = clone $this;
$instance->queueProvider = $this->queueProvider->withChannelName($channel);
/**
* @return string
*/
public function getChannelName(): string {
return $this->channel;
}

return $instance;
/**
* @param string $channel
* @return $this
*/
public function withChannel(string $channel): self {
if ($channel === $this->channel) {
return $this;
}

$new = clone $this;
$new->channel = $channel;

return $new;
}

/**
* @param callable(MessageInterface): bool $handlerCallback
* @param callable(MessageInterface): bool $handlerCallback
*/
public function runExisting(callable $handlerCallback): void
{
public function runExisting(callable $handlerCallback): void {
$channel = $this->queueProvider->getChannel();
(new ExistingMessagesConsumer($channel, $this->queueProvider
->getQueueSettings()
->getName(), $this->serializer))
->consume($handlerCallback);
$queueName = $this->queueProvider->getQueueSettings()->getName();
$consumer = new ExistingMessagesConsumer(
$channel,
$queueName,
$this->serializer
);

$consumer->consume($handlerCallback);
}

/**
* @return never
* @param string|int $id
* @return JobStatus
*/
public function status(string $id): JobStatus
{
throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
public function status(string|int $id): JobStatus {
throw new NotImplementedException(sprintf('Status check is not supported by the adapter %s.', self::class));
}

public function push(MessageInterface $message): void
{
/**
* @param MessageInterface $message
* @return MessageInterface
*/
public function push(MessageInterface $message): MessageInterface {
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
$payload,
array_merge(['message_id' => uniqid(more_entropy: true)], $this->queueProvider->getMessageProperties())
array_merge(['message_id' => uniqid('', true)], $this->queueProvider->getMessageProperties())
);
$exchangeSettings = $this->queueProvider->getExchangeSettings();
$this->queueProvider
->getChannel()
->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
$channel = $this->queueProvider->getChannel();
$channel->basic_publish(
$amqpMessage,
$exchangeSettings?->getName() ?? '',
$exchangeSettings ? '' : $this->queueProvider
->getQueueSettings()
->getName()
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');
$message->setId($messageId);

return new IdEnvelope($message, $messageId);
}

public function subscribe(callable $handlerCallback): void
{
/**
* @param callable $handlerCallback
* @return void
*/
public function subscribe(callable $handlerCallback): void {
$channel = $this->queueProvider->getChannel();
$queueName = $this->queueProvider->getQueueSettings()->getName();

$channel->basic_consume(
$this->queueProvider
->getQueueSettings()
->getName(),
$this->queueProvider
->getQueueSettings()
->getName(),
$queueName,
$queueName,
false,
false,
false,
true,
function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
try {
$handlerCallback($this->serializer->unserialize($amqpMessage->body));
$handlerCallback($this->serializer->unserialize($amqpMessage->getBody()));
$channel->basic_ack($amqpMessage->getDeliveryTag());
} catch (Throwable $exception) {
$consumerTag = $amqpMessage->getConsumerTag();
Expand All @@ -105,13 +135,18 @@ function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
}
}

public function getQueueProvider(): QueueProviderInterface
{
/**
* @return QueueProviderInterface
*/
public function getQueueProvider(): QueueProviderInterface {
return $this->queueProvider;
}

public function withQueueProvider(QueueProviderInterface $queueProvider): self
{
/**
* @param QueueProviderInterface $queueProvider
* @return $this
*/
public function withQueueProvider(QueueProviderInterface $queueProvider): self {
$new = clone $this;
$new->queueProvider = $queueProvider;

Expand Down
2 changes: 1 addition & 1 deletion src/Exception/NoKeyInPayloadException.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use InvalidArgumentException;
use Throwable;
use Yiisoft\FriendlyException\FriendlyExceptionInterface;
use Yiisoft\Queue\AMQP\MessageSerializerInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

class NoKeyInPayloadException extends InvalidArgumentException implements FriendlyExceptionInterface
{
Expand Down
3 changes: 2 additions & 1 deletion src/ExistingMessagesConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

/**
* @internal
Expand Down Expand Up @@ -37,7 +38,7 @@ public function consume(callable $callback): void
false,
function (AMQPMessage $amqpMessage) use ($callback): void {
try {
$message = $this->serializer->unserialize($amqpMessage->body);
$message = $this->serializer->unserialize($amqpMessage->getBody());
if ($this->messageConsumed = $callback($message)) {
$this->channel->basic_ack($amqpMessage->getDeliveryTag());
}
Expand Down
64 changes: 0 additions & 64 deletions src/MessageSerializer.php

This file was deleted.

14 changes: 0 additions & 14 deletions src/MessageSerializerInterface.php

This file was deleted.

6 changes: 3 additions & 3 deletions tests/Integration/DelayMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Psr\Log\LoggerInterface;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Adapter;
use Yiisoft\Queue\AMQP\MessageSerializer;
use Yiisoft\Queue\AMQP\Middleware\DelayMiddleware;
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings;
Expand All @@ -18,6 +17,7 @@
use Yiisoft\Queue\AMQP\Tests\Support\SimpleMessageHandler;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Cli\SignalLoop;
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Middleware\CallableFactory;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush;
Expand All @@ -44,7 +44,7 @@ public function testMainFlow(): void
$this->createConnection(),
new QueueSettings(),
),
new MessageSerializer(),
new JsonMessageSerializer(),
new SignalLoop(),
);
$queue = $this->makeQueue($adapter);
Expand Down Expand Up @@ -75,7 +75,7 @@ public function testMainFlowWithFakeAdapter(): void
$this->createConnection(),
new QueueSettings(),
),
new MessageSerializer(),
new JsonMessageSerializer(),
new SignalLoop(),
);
$queue = $this->makeQueue($adapter);
Expand Down
6 changes: 3 additions & 3 deletions tests/Support/FakeAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace Yiisoft\Queue\AMQP\Tests\Support;

use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\MessageSerializerInterface;
use Yiisoft\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

final class FakeAdapter implements AdapterInterface
{
Expand All @@ -25,12 +25,12 @@ public function runExisting(callable $handlerCallback): void
// TODO: Implement runExisting() method.
}

public function status(string $id): JobStatus
public function status(string|int $id): JobStatus
{
// TODO: Implement status() method.
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
// TODO: Implement push() method.
}
Expand Down
4 changes: 2 additions & 2 deletions tests/Unit/ExchangeSettingsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
use Yiisoft\Queue\AMQP\Adapter;
use Yiisoft\Queue\AMQP\MessageSerializer;
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\Settings\Exchange as ExchangeSettings;
use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings;
use Yiisoft\Queue\Message\JsonMessageSerializer;

final class ExchangeSettingsTest extends UnitTestCase
{
Expand Down Expand Up @@ -38,7 +38,7 @@ public function testCommonSettings(): void
])
)
),
new MessageSerializer(),
new JsonMessageSerializer(),
$this->getLoop(),
);
$exchangeSettings = $adapter->getQueueProvider()->getExchangeSettings();
Expand Down
Loading
Loading