Skip to content

Commit

Permalink
Improve traces with events (#257)
Browse files Browse the repository at this point in the history
* prepare event handler scenario and clean up manual test

* Push abstraction of logger gateway as facing one

* fix construction

* collect logs
  • Loading branch information
dgafka authored Nov 2, 2023
1 parent d87f252 commit 11a7eb0
Show file tree
Hide file tree
Showing 25 changed files with 353 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,24 @@

use Ecotone\Messaging\Attribute\Parameter\Reference;
use Ecotone\Messaging\Config\ConfiguredMessagingSystem;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation;
use Ecotone\Messaging\MessageChannel;
use Psr\Log\LoggerInterface;
use Ecotone\Messaging\Message;

final class CollectorSenderInterceptor
{
public function __construct(private CollectorStorage $collectorStorage, private string $targetChannel)
{
}

public function send(MethodInvocation $methodInvocation, #[Reference] ConfiguredMessagingSystem $configuredMessagingSystem, #[Reference('logger')] LoggerInterface $logger): mixed
public function send(
MethodInvocation $methodInvocation,
Message $message,
#[Reference] ConfiguredMessagingSystem $configuredMessagingSystem,
#[Reference] LoggingGateway $logger
): mixed
{
/** For example Command Bus inside Command Bus */
if ($this->collectorStorage->isEnabled()) {
Expand All @@ -26,7 +33,7 @@ public function send(MethodInvocation $methodInvocation, #[Reference] Configured
$this->collectorStorage->enable();
try {
$result = $methodInvocation->proceed();
$collectedMessages = $this->collectorStorage->releaseMessages($logger);
$collectedMessages = $this->collectorStorage->releaseMessages($logger, $message);
if ($collectedMessages !== []) {
$messageChannel = $this->getTargetChannel($configuredMessagingSystem);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Ecotone\Messaging\Channel\Collector;

use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Message;
use Psr\Log\LoggerInterface;

Expand Down Expand Up @@ -39,19 +40,22 @@ public function isEnabled(): bool
return $this->enabled;
}

public function collect(Message $message, LoggerInterface $logger): void
public function collect(Message $message, LoggingGateway $logger): void
{
$logger->info(sprintf('Collecting message with id: %s', $message->getHeaders()->getMessageId()));
$logger->info(
sprintf('Collecting message with id: %s', $message->getHeaders()->getMessageId()),
$message
);
$this->collectedMessages[] = $message;
}

/**
* @return Message[]
*/
public function releaseMessages(LoggerInterface $logger): array
public function releaseMessages(LoggingGateway $logger, Message $message): array
{
if (count($this->collectedMessages) > 0) {
$logger->info(sprintf('Releasing collected %s message(s) to send them to Message Channels', count($this->collectedMessages)));
$logger->info(sprintf('Releasing collected %s message(s) to send them to Message Channels', count($this->collectedMessages)), $message);
}
$collectedMessages = $this->collectedMessages;
$this->disable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Handler\Logger\LoggingHandlerBuilder;
use Ecotone\Messaging\PrecedenceChannelInterceptor;

Expand All @@ -34,7 +35,7 @@ public function compile(MessagingContainerBuilder $builder): Definition
MessageCollectorChannelInterceptor::class,
[
$this->collectorStorageReference,
new Reference(LoggingHandlerBuilder::LOGGER_REFERENCE),
new Reference(LoggingGateway::class),
]
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Ecotone\Messaging\Channel\AbstractChannelInterceptor;
use Ecotone\Messaging\Channel\ChannelInterceptor;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\MessageChannel;
use Psr\Log\LoggerInterface;
Expand All @@ -14,7 +15,7 @@ final class MessageCollectorChannelInterceptor extends AbstractChannelIntercepto
{
public function __construct(
private CollectorStorage $collectorStorage,
private LoggerInterface $logger
private LoggingGateway $logger
) {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\MessagingCommands\MessagingCommandsModule;
use Ecotone\Messaging\Config\Configuration;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Conversion\ObjectToSerialized\SerializingConverterBuilder;
use Ecotone\Messaging\Conversion\SerializedToObject\DeserializingConverterBuilder;
use Ecotone\Messaging\Conversion\StringToUuid\StringToUuidConverterBuilder;
Expand All @@ -31,6 +33,8 @@
use Ecotone\Messaging\Handler\Gateway\ParameterToMessageConverter\GatewayHeadersBuilder;
use Ecotone\Messaging\Handler\Gateway\ParameterToMessageConverter\GatewayPayloadBuilder;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Handler\Logger\LoggingHandlerBuilder;
use Ecotone\Messaging\Handler\Logger\LoggingService;
use Ecotone\Messaging\Handler\MessageHandlerBuilder;
use Ecotone\Messaging\Handler\Router\HeaderRouter;
use Ecotone\Messaging\Handler\Router\RouterBuilder;
Expand Down Expand Up @@ -168,6 +172,17 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
GatewayPayloadBuilder::create('parameters'),
])
);

$messagingConfiguration->registerServiceDefinition(
LoggingService::class,
new Definition(
LoggingService::class,
[
Reference::to(ConversionService::REFERENCE_NAME),
Reference::to(LoggingHandlerBuilder::LOGGER_REFERENCE)
]
)
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Ecotone\Messaging\Config\ModulePackageList;
use Ecotone\Messaging\Config\ModuleReferenceSearchService;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Handler\Logger\LoggingHandlerBuilder;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\Converter\ReferenceBuilder;
use Ecotone\Messaging\Handler\Recoverability\ErrorHandler;
Expand Down Expand Up @@ -62,7 +63,7 @@ public function prepare(Configuration $messagingConfiguration, array $extensionO
->withEndpointId('error_handler.' . $extensionObject->getErrorChannelName())
->withInputChannelName($extensionObject->getErrorChannelName())
->withMethodParameterConverters([
ReferenceBuilder::create('logger', LoggingHandlerBuilder::LOGGER_REFERENCE),
ReferenceBuilder::create('logger', LoggingGateway::class),
]);
if ($extensionObject->getDeadLetterQueueChannel()) {
$errorHandler = $errorHandler->withOutputMessageChannel($extensionObject->getDeadLetterQueueChannel());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
use Ecotone\Messaging\Config\ServiceCacheConfiguration;
use Ecotone\Messaging\ConfigurationVariableService;
use Ecotone\Messaging\Handler\Gateway\ProxyFactory;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Handler\Logger\StubLoggingGateway;
use Ecotone\Messaging\InMemoryConfigurationVariableService;
use Psr\Container\ContainerInterface;

Expand All @@ -30,6 +32,9 @@ public static function buildMessagingSystemInMemoryContainer(
$container = new LazyInMemoryContainer($containerBuilder->getDefinitions(), $externalContainer);
$container->set(ConfigurationVariableService::REFERENCE_NAME, $configurationVariableService ?? InMemoryConfigurationVariableService::createEmpty());
$container->set(ProxyFactory::class, $proxyFactory ?? new ProxyFactory(ServiceCacheConfiguration::noCache()));
if (!$container->has(LoggingGateway::class)) {
$container->set(LoggingGateway::class, StubLoggingGateway::create());
}
return $container->get(ConfiguredMessagingSystem::class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ public static function prepare(
$serviceConfiguration->getNamespaces(),
$serviceConfiguration->getEnvironment(),
$serviceConfiguration->getLoadedCatalog() ?? '',
array_filter($modulesClasses, fn (string $moduleClassName): bool => class_exists($moduleClassName)),
array_filter($modulesClasses, fn (string $moduleClassName): bool => class_exists($moduleClassName) || interface_exists($moduleClassName)),
$userLandClassesToRegister,
$enableTestPackage
),
Expand Down
6 changes: 6 additions & 0 deletions packages/Ecotone/src/Messaging/Config/ModuleClassList.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\SplitterModule;
use Ecotone\Messaging\Config\Annotation\ModuleConfiguration\TransformerModule;
use Ecotone\Messaging\Handler\Logger\Config\LoggingModule;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Handler\Logger\LoggingService;
use Ecotone\Modelling\Config\BusModule;
use Ecotone\Modelling\Config\BusRoutingModule;
use Ecotone\Modelling\Config\DistributedGatewayModule;
Expand Down Expand Up @@ -77,6 +79,10 @@ class ModuleClassList
TransformerModule::class,
MessageConsumerModule::class,
InstantRetryModule::class,

/** Attribute based configurations */
LoggingGateway::class,
LoggingService::class
];

public const ASYNCHRONOUS_MODULE = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Endpoint\PollingConsumer\RejectMessageException;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;
use Ecotone\Messaging\Handler\Logger\LoggingGateway;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\AroundInterceptorBuilder;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation;
use Ecotone\Messaging\Message;
Expand Down Expand Up @@ -37,14 +38,15 @@ public static function createAroundInterceptorBuilder(InterfaceToCallRegistry $i
* @throws Throwable
* @throws MessagingException
*/
public function ack(MethodInvocation $methodInvocation, Message $message, #[Reference('logger')] LoggerInterface $logger)
public function ack(MethodInvocation $methodInvocation, Message $message, #[Reference] LoggingGateway $logger)
{
$logger->info(
sprintf(
'Message with id `%s` received from Message Channel `%s`',
$message->getHeaders()->getMessageId(),
$message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME) ? $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME) : 'unknown'
)
),
$message
);
if (! $message->getHeaders()->containsKey(MessageHeaders::CONSUMER_ACK_HEADER_LOCATION)) {
return $methodInvocation->proceed();
Expand All @@ -59,12 +61,18 @@ public function ack(MethodInvocation $methodInvocation, Message $message, #[Refe

if ($amqpAcknowledgementCallback->isAutoAck()) {
$amqpAcknowledgementCallback->accept();
$logger->info(sprintf('Message with id `%s` acknowledged in Message Channel', $message->getHeaders()->getMessageId()));
$logger->info(
sprintf('Message with id `%s` acknowledged in Message Channel', $message->getHeaders()->getMessageId()),
$message
);
}
} catch (RejectMessageException $exception) {
if ($amqpAcknowledgementCallback->isAutoAck()) {
$amqpAcknowledgementCallback->reject();
$logger->info(sprintf('Message with id `%s` rejected in Message Channel', $message->getHeaders()->getMessageId()));
$logger->info(
sprintf('Message with id `%s` rejected in Message Channel', $message->getHeaders()->getMessageId()),
$message
);
}
} catch (Throwable $exception) {
if ($amqpAcknowledgementCallback->isAutoAck()) {
Expand All @@ -75,14 +83,17 @@ public function ack(MethodInvocation $methodInvocation, Message $message, #[Refe
$message->getHeaders()->getMessageId(),
$exception->getMessage()
),
['exception' => $exception]
$message
);
}
}

$pollingMetadata = $message->getHeaders()->get(MessageHeaders::CONSUMER_POLLING_METADATA);
if ($pollingMetadata->isStoppedOnError() === true && $exception !== null) {
$logger->info('Should stop on error configuration enabled, stopping Message Consumer.');
$logger->info(
'Should stop on error configuration enabled, stopping Message Consumer.',
$message
);
throw $exception;
}

Expand Down
27 changes: 27 additions & 0 deletions packages/Ecotone/src/Messaging/Handler/Logger/LoggingGateway.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

declare(strict_types=1);

namespace Ecotone\Messaging\Handler\Logger;

use Ecotone\Messaging\Attribute\MessageGateway;
use Ecotone\Messaging\Attribute\Parameter\Header;
use Ecotone\Messaging\Attribute\Parameter\Payload;
use Ecotone\Messaging\Message;

interface LoggingGateway
{
#[MessageGateway(LoggingService::INFO_LOGGING_CHANNEL)]
public function info(
#[Payload] string $text,
#[Header(LoggingService::CONTEXT_MESSAGE_HEADER)] Message $message,
#[Header(LoggingService::CONTEXT_EXCEPTION_HEADER)] ?\Exception $exception = null
): void;

#[MessageGateway(LoggingService::ERROR_LOGGING_CHANNEL)]
public function error(
#[Payload] string $text,
#[Header(LoggingService::CONTEXT_MESSAGE_HEADER)] Message $message,
#[Header(LoggingService::CONTEXT_EXCEPTION_HEADER)] ?\Exception $exception = null
): void;
}
46 changes: 46 additions & 0 deletions packages/Ecotone/src/Messaging/Handler/Logger/LoggingService.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

namespace Ecotone\Messaging\Handler\Logger;

use Ecotone\Messaging\Attribute\Parameter\Header;
use Ecotone\Messaging\Attribute\Parameter\Payload;
use Ecotone\Messaging\Attribute\ServiceActivator;
use Ecotone\Messaging\Conversion\ConversionService;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\TypeDefinitionException;
Expand All @@ -21,6 +24,11 @@
*/
class LoggingService
{
public const CONTEXT_MESSAGE_HEADER = 'ecotone.logging.contextMessage';
public const CONTEXT_EXCEPTION_HEADER = 'ecotone.logging.exceptionMessage';
public const INFO_LOGGING_CHANNEL = 'infoLoggingChannel';
public const ERROR_LOGGING_CHANNEL = 'errorLoggingChannel';

private \Ecotone\Messaging\Conversion\ConversionService $conversionService;
private \Psr\Log\LoggerInterface $logger;

Expand All @@ -35,6 +43,44 @@ public function __construct(ConversionService $conversionService, LoggerInterfac
$this->logger = $logger;
}

#[ServiceActivator(self::INFO_LOGGING_CHANNEL)]
public function info(
#[Payload] string $text,
#[Header(self::CONTEXT_MESSAGE_HEADER)] Message $message,
#[Header(self::CONTEXT_EXCEPTION_HEADER)] ?\Exception $exception,
): void
{
$this->logger->info(
$text,
[
'message_id' => $message->getHeaders()->getMessageId(),
'correlation_id' => $message->getHeaders()->getCorrelationId(),
'parent_id' => $message->getHeaders()->getParentId(),
'headers' => (string)$message->getHeaders(),
'exception' => $exception
]
);
}

#[ServiceActivator(self::ERROR_LOGGING_CHANNEL)]
public function error(
#[Payload] string $text,
#[Header(self::CONTEXT_MESSAGE_HEADER)] Message $message,
#[Header(self::CONTEXT_EXCEPTION_HEADER)] ?\Exception $exception,
): void
{
$this->logger->critical(
$text,
[
'message_id' => $message->getHeaders()->getMessageId(),
'correlation_id' => $message->getHeaders()->getCorrelationId(),
'parent_id' => $message->getHeaders()->getParentId(),
'headers' => (string)$message->getHeaders(),
'exception' => $exception
]
);
}

/**
* @param LoggingLevel $loggingLevel
* @param Message $message
Expand Down
Loading

0 comments on commit 11a7eb0

Please sign in to comment.