diff --git a/Event/AMQPEvent.php b/Event/AMQPEvent.php index 1ae07c36..8cd889dc 100644 --- a/Event/AMQPEvent.php +++ b/Event/AMQPEvent.php @@ -3,8 +3,8 @@ namespace OldSound\RabbitMqBundle\Event; use OldSound\RabbitMqBundle\RabbitMq\Consumer; +use OldSound\RabbitMqBundle\RabbitMq\Producer; use PhpAmqpLib\Message\AMQPMessage; -use Symfony\Component\EventDispatcher\Event; /** * Class AMQPEvent @@ -18,6 +18,8 @@ class AMQPEvent extends AbstractAMQPEvent public const ON_IDLE = 'on_idle'; public const BEFORE_PROCESSING_MESSAGE = 'before_processing'; public const AFTER_PROCESSING_MESSAGE = 'after_processing'; + public const BEFORE_PUBLISH_MESSAGE = 'before_publishing'; + public const AFTER_PUBLISH_MESSAGE = 'after_publishing'; /** * @var AMQPMessage @@ -29,6 +31,11 @@ class AMQPEvent extends AbstractAMQPEvent */ protected $consumer; + /** + * @var Producer + */ + protected $producer; + /** * @return AMQPMessage */ @@ -68,4 +75,24 @@ public function setConsumer(Consumer $consumer) return $this; } + + /** + * @return Producer + */ + public function getProducer() + { + return $this->producer; + } + + /** + * @param Producer $producer + * + * @return AMQPEvent + */ + public function setProducer(Producer $producer) + { + $this->producer = $producer; + + return $this; + } } diff --git a/Event/AfterProducerPublishMessageEvent.php b/Event/AfterProducerPublishMessageEvent.php new file mode 100644 index 00000000..32be6fbf --- /dev/null +++ b/Event/AfterProducerPublishMessageEvent.php @@ -0,0 +1,41 @@ +setProducer($producer); + $this->setAMQPMessage($AMQPMessage); + $this->routingKey = $routingKey; + } + + /** + * @return string + */ + public function getRoutingKey() + { + return $this->routingKey; + } +} diff --git a/Event/BeforeProducerPublishMessageEvent.php b/Event/BeforeProducerPublishMessageEvent.php new file mode 100644 index 00000000..49b9a450 --- /dev/null +++ b/Event/BeforeProducerPublishMessageEvent.php @@ -0,0 +1,41 @@ +setProducer($producer); + $this->setAMQPMessage($AMQPMessage); + $this->routingKey = $routingKey; + } + + /** + * @return string + */ + public function getRoutingKey() + { + return $this->routingKey; + } +} diff --git a/README.md b/README.md index ccc0b487..b2b0642d 100644 --- a/README.md +++ b/README.md @@ -316,6 +316,49 @@ If you need to use a custom class for a producer (which should inherit from `Old The next piece of the puzzle is to have a consumer that will take the message out of the queue and process it accordingly. +#### Producer Events #### + +There are currently two events emitted by the producer. + +##### BeforeProducerPublishMessageEvent ##### +This event occurs immediately before publishing the message. This is a good hook to do any final logging, validation, etc. before actually sending the message. A sample implementation of a listener: + +```php +namespace App\EventListener; + +use OldSound\RabbitMqBundle\Event\BeforeProducerPublishMessageEvent; +use Symfony\Component\EventDispatcher\Attribute\AsEventListener; + +#[AsEventListener(event: BeforeProducerPublishMessageEvent::NAME)] +final class AMQPBeforePublishEventListener +{ + public function __invoke(BeforeProducerPublishMessageEvent $event): void + { + // Your code goes here + } +} +``` + +##### AfterProducerPublishMessageEvent ##### +This event occurs immediately after publishing the message. This is a good hook to do any confirmation logging, commits, etc. after actually sending the message. A sample implementation of a listener: + +```php +namespace App\EventListener; + +use OldSound\RabbitMqBundle\Event\AfterProducerPublishMessageEvent; +use Symfony\Component\EventDispatcher\Attribute\AsEventListener; + +#[AsEventListener(event: AfterProducerPublishMessageEvent::NAME)] +final class AMQPBeforePublishEventListener +{ + public function __invoke(AfterProducerPublishMessageEvent $event): void + { + // Your code goes here + } +} +``` + + ### Consumers ### A consumer will connect to the server and start a __loop__ waiting for incoming messages to process. Depending on the specified __callback__ for such consumer will be the behavior it will have. Let's review the consumer configuration from above: diff --git a/RabbitMq/Producer.php b/RabbitMq/Producer.php index 8fcb7349..571ce3b7 100644 --- a/RabbitMq/Producer.php +++ b/RabbitMq/Producer.php @@ -2,6 +2,8 @@ namespace OldSound\RabbitMqBundle\RabbitMq; +use OldSound\RabbitMqBundle\Event\AfterProducerPublishMessageEvent; +use OldSound\RabbitMqBundle\Event\BeforeProducerPublishMessageEvent; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -63,6 +65,12 @@ public function publish($msgBody, $routingKey = null, $additionalProperties = [] } $real_routingKey = $routingKey !== null ? $routingKey : $this->defaultRoutingKey; + + $this->dispatchEvent( + BeforeProducerPublishMessageEvent::NAME, + new BeforeProducerPublishMessageEvent($this, $msg, $real_routingKey) + ); + $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$real_routingKey); $this->logger->debug('AMQP message published', [ 'amqp' => [ @@ -72,5 +80,10 @@ public function publish($msgBody, $routingKey = null, $additionalProperties = [] 'headers' => $headers, ], ]); + + $this->dispatchEvent( + AfterProducerPublishMessageEvent::NAME, + new AfterProducerPublishMessageEvent($this, $msg, $real_routingKey) + ); } }