Skip to content

Commit

Permalink
APPS-7128: MessageBroker for Message Bus v 2.0 implementation (#10302)
Browse files Browse the repository at this point in the history
MessageBroker for Message Bus v 2.0 implementation
  • Loading branch information
oleksander-kiiashko authored Aug 31, 2023
1 parent 628fa33 commit 39bca2e
Show file tree
Hide file tree
Showing 23 changed files with 649 additions and 81 deletions.
17 changes: 17 additions & 0 deletions src/Spryker/Shared/MessageBroker/MessageBrokerConstants.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ interface MessageBrokerConstants
public const MESSAGE_TO_CHANNEL_MAP = 'MESSAGE_BROKER:MESSAGE_TO_CHANNEL_MAP';

/**
* @deprecated Use {@link \Spryker\Shared\MessageBroker\MessageBrokerConstants::CHANNEL_TO_SENDER_TRANSPORT_MAP} instead.
*
* @var string
*/
public const CHANNEL_TO_TRANSPORT_MAP = 'MESSAGE_BROKER:SENDER_CHANNEL_TO_CLIENT_MAP';
Expand All @@ -31,4 +33,19 @@ interface MessageBrokerConstants
* @var string
*/
public const IS_ENABLED = 'MESSAGE_BROKER:IS_ENABLED';

/**
* @var string
*/
public const TENANT_IDENTIFIER = 'MESSAGE_BROKER:TENANT_IDENTIFIER';

/**
* @var string
*/
public const CHANNEL_TO_RECEIVER_TRANSPORT_MAP = 'MESSAGE_BROKER:CHANNEL_TO_RECEIVER_TRANSPORT_MAP';

/**
* @var string
*/
public const CHANNEL_TO_SENDER_TRANSPORT_MAP = 'MESSAGE_BROKER:CHANNEL_TO_SENDER_TRANSPORT_MAP';
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@
<transfers xmlns="spryker:transfer-01" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="spryker:transfer-01 http://static.spryker.com/transfer-01.xsd">

<transfer name="MessageAttributes">
<property name="transferName" type="string"/>
<property name="event" type="string"/>
<property name="transferName" type="string" deprecated="Use {@link \Generated\Shared\Transfer\MessageAttributesTransfer::$name} instead."/>
<property name="event" type="string" deprecated="Use {@link \Generated\Shared\Transfer\MessageAttributesTransfer::$name} instead."/>
<property name="storeReference" type="string"/>
<property name="emitter" type="string"/>
<property name="publisher" type="Publisher" deprecated="Use emitter property instead."/>
<property name="emitter" type="string" deprecated="Use {@link \Generated\Shared\Transfer\MessageAttributesTransfer::$actorId} instead."/>
<property name="publisher" type="Publisher" deprecated="Use {@link \Generated\Shared\Transfer\MessageAttributesTransfer::$actorId} instead."/>
<property name="timestamp" type="string"/>
<property name="correlationId" type="string"/>
<property name="tenantIdentifier" type="string"/>
<property name="transactionId" type="string"/>
<property name="actorId" type="string"/>
<property name="name" type="string"/>
</transfer>

<transfer name="Publisher" deprecated="Will be removed in next major.">
Expand Down
18 changes: 15 additions & 3 deletions src/Spryker/Zed/MessageBroker/Business/Debug/DebugPrinter.php
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ public function printDebug(OutputInterface $output, ?string $pathToAsyncApiFile
protected function printDebugForConfiguration(OutputInterface $output): void
{
$messageToChannelMap = $this->getMessageToChannelMap();
$channelToTransportMap = $this->getChannelToTransportMap();
$messagesToHandlerMap = $this->getMessagesToHandlerMap();

foreach ($messageToChannelMap as $messageClassName => $channelName) {
Expand All @@ -108,7 +107,7 @@ protected function printDebugForConfiguration(OutputInterface $output): void
$table->addRow([
$channelName,
$messageClassName,
$channelToTransportMap[$channelName] ?? 'Not configured',
$this->getTransportForChannel($channelName) ?: 'Not configured',
$handlersForMessage,
]);

Expand Down Expand Up @@ -242,11 +241,16 @@ protected function getTransportForChannel(string $channelName): ?string
{
$channelToTransportMap = $this->getChannelToTransportMap();

$transport = null;
if (isset($channelToTransportMap[$channelName])) {
if (is_array($channelToTransportMap[$channelName])) {
return implode(', ', array_unique($channelToTransportMap[$channelName]));
}

return $channelToTransportMap[$channelName];
}

return null;
return $transport;
}

/**
Expand Down Expand Up @@ -290,6 +294,14 @@ protected function getChannelToTransportMap(): array
$channelToTransportMap = $this->configFormatter->format($channelToTransportMap);
}

$channelToReceiverTransportMap = $this->config->getChannelToReceiverTransportMap();
$channelToSenderTransportMap = $this->config->getChannelToSenderTransportMap();
$channelToTransportMap = array_merge_recursive(
$channelToTransportMap,
$channelToReceiverTransportMap,
$channelToSenderTransportMap,
);

return $channelToTransportMap;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

/**
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
*/

namespace Spryker\Zed\MessageBroker\Business\Exception;

use Exception;

class MissingMessageSenderException extends Exception
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
use Spryker\Zed\MessageBroker\Business\Logger\MessagePublishLoggerInterface;
use Spryker\Zed\MessageBroker\Business\MessageAttributeProvider\MessageAttributeProvider;
use Spryker\Zed\MessageBroker\Business\MessageAttributeProvider\MessageAttributeProviderInterface;
use Spryker\Zed\MessageBroker\Business\MessageChannelProvider\MessageChannelProvider;
use Spryker\Zed\MessageBroker\Business\MessageChannelProvider\MessageChannelProviderInterface;
use Spryker\Zed\MessageBroker\Business\MessageHandler\MessageHandlerLocator;
use Spryker\Zed\MessageBroker\Business\MessageSender\MessageSenderLocator;
use Spryker\Zed\MessageBroker\Business\MessageValidator\MessageValidatorStack;
use Spryker\Zed\MessageBroker\Business\MessageValidator\MessageValidatorStackInterface;
use Spryker\Zed\MessageBroker\Business\Middleware\AddChannelNameStampMiddleware;
use Spryker\Zed\MessageBroker\Business\Middleware\LogHandleMessageExceptionMiddleware;
use Spryker\Zed\MessageBroker\Business\Publisher\MessagePublisher;
use Spryker\Zed\MessageBroker\Business\Publisher\MessagePublisherInterface;
use Spryker\Zed\MessageBroker\Business\Worker\Worker;
Expand Down Expand Up @@ -106,10 +110,38 @@ public function createMessageBus(): MessageBusInterface
*/
public function getMiddlewares(): array
{
return array_merge($this->getMiddlewarePlugins(), [
$this->createSendMessageMiddleware(),
$this->createHandleMessageMiddleware(),
]);
return array_merge(
[
$this->createLogHandleMessageExceptionMiddleware(),
],
$this->getMiddlewarePlugins(),
[
$this->createAddChannelNameStampMiddleware(),
$this->createSendMessageMiddleware(),
$this->createHandleMessageMiddleware(),
],
);
}

/**
* @return \Symfony\Component\Messenger\Middleware\MiddlewareInterface
*/
public function createAddChannelNameStampMiddleware(): MiddlewareInterface
{
return new AddChannelNameStampMiddleware(
$this->createMessageChannelProvider(),
);
}

/**
* @return \Spryker\Zed\MessageBroker\Business\MessageChannelProvider\MessageChannelProviderInterface
*/
public function createMessageChannelProvider(): MessageChannelProviderInterface
{
return new MessageChannelProvider(
$this->getConfig(),
$this->createConfigFormatter(),
);
}

/**
Expand All @@ -122,6 +154,14 @@ public function createSendMessageMiddleware(): MiddlewareInterface
);
}

/**
* @return \Symfony\Component\Messenger\Middleware\MiddlewareInterface
*/
public function createLogHandleMessageExceptionMiddleware(): MiddlewareInterface
{
return new LogHandleMessageExceptionMiddleware();
}

/**
* @return \Symfony\Component\Messenger\Transport\Sender\SendersLocatorInterface
*/
Expand All @@ -131,6 +171,7 @@ public function createMessageSenderLocator(): SendersLocatorInterface
$this->getConfig(),
$this->createConfigFormatter(),
$this->getMessageSenderPlugins(),
$this->createMessageChannelProvider(),
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ interface MessageBrokerFacadeInterface
/**
* Specification:
* - Adds message attributes to the transfer object.
* - Wraps message in a Symfony Envelope and sends it through the configured transport for this message.
* - Wraps message in a Symfony Envelope and adds a channel timestamp.
* - Throws `MissingMessageSenderException` if no message sender is found for the current message channel.
* - Sends the message through the configured transport for this message.
* - Writes Logger::INFO level log in case of successful envelope message sending.
* - Writes Logger::ERROR level log in case of any error during envelope message sending.
* - Will not send message if {@link \Spryker\Zed\MessageBroker\MessageBrokerConfig::isEnabled()} is `false`
* - Will not send message if {@link \Spryker\Zed\MessageBroker\MessageBrokerConfig::isEnabled()} is `false`.
*
* @api
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

/**
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
*/

namespace Spryker\Zed\MessageBroker\Business\MessageChannelProvider;

use Spryker\Zed\MessageBroker\Business\Config\ConfigFormatterInterface;
use Spryker\Zed\MessageBroker\Business\Exception\CouldNotMapMessageToChannelNameException;
use Spryker\Zed\MessageBroker\MessageBrokerConfig;
use Symfony\Component\Messenger\Envelope;

class MessageChannelProvider implements MessageChannelProviderInterface
{
/**
* @var \Spryker\Zed\MessageBroker\MessageBrokerConfig
*/
protected MessageBrokerConfig $config;

/**
* @var \Spryker\Zed\MessageBroker\Business\Config\ConfigFormatterInterface
*/
protected ConfigFormatterInterface $configFormatter;

/**
* @param \Spryker\Zed\MessageBroker\MessageBrokerConfig $config
* @param \Spryker\Zed\MessageBroker\Business\Config\ConfigFormatterInterface $configFormatter
*/
public function __construct(MessageBrokerConfig $config, ConfigFormatterInterface $configFormatter)
{
$this->config = $config;
$this->configFormatter = $configFormatter;
}

/**
* @param \Symfony\Component\Messenger\Envelope $envelope
*
* @throws \Spryker\Zed\MessageBroker\Business\Exception\CouldNotMapMessageToChannelNameException
*
* @return string
*/
public function getChannelForMessage(Envelope $envelope): string
{
$messageToChannelMap = $this->config->getMessageToChannelMap();

if (is_string($messageToChannelMap)) {
$messageToChannelMap = $this->configFormatter->format($messageToChannelMap);
}

$messageClass = get_class($envelope->getMessage());

if (isset($messageToChannelMap[$messageClass])) {
return $messageToChannelMap[$messageClass];
}

throw new CouldNotMapMessageToChannelNameException(sprintf(
'Could not map "%s" message class to a channel',
$messageClass,
));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

/**
* Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
* Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
*/

namespace Spryker\Zed\MessageBroker\Business\MessageChannelProvider;

use Symfony\Component\Messenger\Envelope;

interface MessageChannelProviderInterface
{
/**
* @param \Symfony\Component\Messenger\Envelope $envelope
*
* @throws \Spryker\Zed\MessageBroker\Business\Exception\CouldNotMapMessageToChannelNameException
*
* @return string
*/
public function getChannelForMessage(Envelope $envelope): string;
}
Loading

0 comments on commit 39bca2e

Please sign in to comment.