Symfony bundle for convenient work with queues. Currently it supports RabbitMQ.
-
Install bundle
composer require lamoda/queue-bundle
-
Extend
Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass
use Doctrine\ORM\Mapping as ORM; use Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass; /** * @ORM\Entity(repositoryClass="Lamoda\QueueBundle\Entity\QueueRepository") */ class Queue extends QueueEntityMappedSuperclass { }
-
Configure bundle parameters
lamoda_queue: ## required entity_class: App\Entity\Queue max_attempts: 5 batch_size_per_requeue: 5 batch_size_per_republish: 5 ## optional (will use for default delay Geometric Progression Strategy) strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
-
Register bundle
class AppKernel extends Kernel { // ... public function registerBundles() { $bundles = [ // ... new Lamoda\QueueBundle\LamodaQueueBundle(), // ... ]; return $bundles; } // ... }
or add to
config/bundles.php
return [ // ... Lamoda\QueueBundle\LamodaQueueBundle::class => ['all' => true], // ... ];
-
Migrate schema
doctrine:migrations:diff
to create migration forqueue
tabledoctrine:migrations:migrate
- apply the migration
-
Define new exchange constant
namespace App\Constant; class Exchanges { public const DEFAULT = 'default'; }
-
Add new node to
old_sound_rabbit_mq.producers
with previous defined constant name, example:old_sound_rabbit_mq: producers: default: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct"
-
Define new queue constant
namespace App\Constant; class Queues { public const NOTIFICATION = 'notification'; }
-
Register consumer for queue in
old_sound_rabbit_mq.consumers
with previous defined constant name, example:old_sound_rabbit_mq: consumers: notification: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct" queue_options: name: !php/const App\Constant\Queues::NOTIFICATION routing_keys: - !php/const App\Constant\Queues::NOTIFICATION callback: "lamoda_queue.consumer"
-
Create job class, extend
AbstractJob
by example:namespace App\Job; use App\Constant\Exchanges; use App\Constant\Queues; use Lamoda\QueueBundle\Job\AbstractJob; use JMS\Serializer\Annotation as JMS; class SendNotificationJob extends AbstractJob { /** * @var string * * @JMS\Type("int") */ private $message; public function __construct(string $message) { $this->message = $message; } public function getDefaultQueue(): string { return Queues::NOTIFICATION; } public function getDefaultExchange(): string { return Exchanges::DEFAULT; } }
-
Create job handler, implement HandlerInterface by example:
namespace App\Handler; use Lamoda\QueueBundle\Handler\HandlerInterface; use Lamoda\QueueBundle\QueueInterface; class SendNotificationHandler implements HandlerInterface { public function handle(QueueInterface $job): void { // implement service logic here } }
-
Tag handler at service container
services: App\Handler\SendNotificationHandler: public: true tags: - { name: queue.handler, handle: App\Job\SendNotificationJob }
-
Configure delay strategy parameters (optional)
lamoda_queue: ... queues: queue_one: 'delay_arithmetic_progression' queue_two: 'delay_special_geometric_progression' # Settings of special behaviors for Delay strategies (optional) services: lamoda_queue.strategy.delay.arithmetic_progression: class: Lamoda\QueueBundle\Strategy\Delay\ArithmeticProgressionStrategy tags: - { name: 'lamoda_queue_strategy', key: 'delay_arithmetic_progression' } arguments: - 60 # start_interval_sec parameter - 1700 # multiplier parameter lamoda_queue.strategy.delay.geometric_progression: class: Lamoda\QueueBundle\Strategy\Delay\GeometricProgressionStrategy tags: - { name: 'lamoda_queue_strategy', key: 'delay_special_geometric_progression' } arguments: - 70 # start_interval_sec parameter - 4 # multiplier parameter
In this block, you can config special delay behaviors for each queue. For this, you have to register new services that use one of several base strategies (ArithmeticProgressionStrategy, GeometricProgressionStrategy) or yours (for this you have to make Service Class that implements DelayStrategyInterface).
Each strategy service has to have a tag with name
lamoda_queue_strategy
and uniquekey
. After this, you can use thesekeys
for matching with queues inlamoda_queue.queues
section.By default, use GeometricProgressionStrategy with params (their you can customize in
lamoda_queue
config section):strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
-
Add queue name in "codeception.yml" at
modules.config.AMQP.queues
-
Execute
./bin/console queue:init
command
./bin/console queue:init
$job = new SendNotificationJob($id);
$container->get(Lamoda\QueueBundle\Factory\PublisherFactory::class)->publish($job);
./bin/console queue:consume notification
./bin/console queue:requeue
You can queue any primitive class, just implement QueueInterface
:
namespace App\Process;
use Lamoda\QueueBundle\Entity\QueueInterface;
class MyProcess implements QueueInterface
{
// implement interface functions
}
services:
App\Handler\MyProcessHandler:
public: true
tags:
- { name: queue.handler, handle: App\Process\MyProcess }
$process = new MyProcess();
$container->get('queue.publisher')->publish($process);
If you want to rerun queue, throw Lamoda\QueueBundle\Exception\RuntimeException
.
If you want mark queue as failed, throw any another kind of exception.
namespace App\Handler;
use Lamoda\QueueBundle\Handler\HandlerInterface;
use Lamoda\QueueBundle\QueueInterface;
class SendNotificationHandler implements HandlerInterface
{
public function handle(QueueInterface $job): void
{
// implement service logic here
// Rerun queue
if ($rerun === true) {
throw new Lamoda\QueueBundle\Exception\RuntimeException('Error message');
}
// Mark queue as failed
if ($failed === true) {
throw new \Exception();
}
}
}
By default delay time is calculated exponentially. You can affect it through configuration.
lamoda_queue:
## required
## ...
max_attempts: 5
## optional
strategy_delay_geometric_progression_start_interval_sec: 60
strategy_delay_geometric_progression_multiplier: 2
When consumer wants to execute reached maximum attempts queue.
Properties:
- Queue Entity
QueueAttemptsReachedEvent::getQueue()
make php-cs-check
make php-cs-fix
Unit
make test-unit