Skip to content

Commit

Permalink
Add support for tracing scheduled handlers (#390)
Browse files Browse the repository at this point in the history
* Add support for tracing scheduled handlers

* fixes
  • Loading branch information
dgafka authored Oct 17, 2024
1 parent a8f7097 commit 61a96c2
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ final class ExecutionPollingMetadata
private ?string $cron = null;
private ?bool $stopOnError = null;
private ?bool $finishWhenNoMessages = null;
private ?int $executionAmountLimit = null;

private function __construct()
{
Expand Down Expand Up @@ -107,6 +108,14 @@ public function withFinishWhenNoMessages(bool $finishWhenNoMessages): ExecutionP
return $self;
}

public function withExecutionAmountLimit(int $limit): self
{
$self = clone $this;
$self->executionAmountLimit = $limit;

return $self;
}

public function getCron(): ?string
{
return $this->cron;
Expand Down Expand Up @@ -136,4 +145,9 @@ public function getFinishWhenNoMessages(): ?bool
{
return $this->finishWhenNoMessages;
}

public function getExecutionAmountLimit(): ?int
{
return $this->executionAmountLimit;
}
}
6 changes: 4 additions & 2 deletions packages/Ecotone/src/Messaging/Endpoint/PollingMetadata.php
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,10 @@ public function applyExecutionPollingMetadata(?ExecutionPollingMetadata $executi
$copy = $copy->setCron($executionPollingMetadata->getCron());
}
if (! is_null($executionPollingMetadata->getFinishWhenNoMessages())) {
$copy = $copy
->setFinishWhenNoMessages($executionPollingMetadata->getFinishWhenNoMessages());
$copy = $copy->setFinishWhenNoMessages($executionPollingMetadata->getFinishWhenNoMessages());
}
if (! is_null($executionPollingMetadata->getExecutionAmountLimit())) {
$copy = $copy->setExecutionAmountLimit($executionPollingMetadata->getExecutionAmountLimit());
}

return $copy;
Expand Down
4 changes: 3 additions & 1 deletion packages/OpenTelemetry/src/TracerInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public function traceAsynchronousEndpoint(MethodInvocation $methodInvocation, Me
$scope = $parentContext->activate();
try {
$trace = $this->trace(
'Receiving from channel: ' . $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME),
$message->getHeaders()->containsKey(MessageHeaders::POLLED_CHANNEL_NAME)
? 'Receiving from channel: ' . $message->getHeaders()->get(MessageHeaders::POLLED_CHANNEL_NAME)
: 'Endpoint: ' . $message->getHeaders()->get(MessageHeaders::CONSUMER_POLLING_METADATA)->getEndpointId() . ' produced Message',
$methodInvocation,
$message,
spanKind: SpanKind::KIND_CONSUMER,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\OpenTelemetry\Fixture\ScheduledHandler;

use Ecotone\Messaging\Attribute\Poller;
use Ecotone\Messaging\Attribute\Scheduled;

/**
* licence Apache-2.0
*/
final class ScheduledHandler
{
private int $counter = 0;

#[Scheduled(endpointId: 'scheduled_handler')]
#[Poller(fixedRateInMilliseconds: 1)]
public function handle(): void
{
$this->counter += 1;
}

public function getCounter(): int
{
return $this->counter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?php

declare(strict_types=1);

namespace Test\Ecotone\OpenTelemetry\Fixture\ScheduledHandler;

use Ecotone\Messaging\Attribute\InternalHandler;
use Ecotone\Messaging\Attribute\Poller;
use Ecotone\Messaging\Attribute\Scheduled;

/**
* licence Apache-2.0
*/
final class WorkflowScheduledHandler
{
private int $counter = 0;

#[Scheduled(requestChannelName: 'add', endpointId: 'scheduled_handler')]
#[Poller(fixedRateInMilliseconds: 1)]
public function produce(): int
{
return 1;
}

#[Scheduled(requestChannelName: 'add', endpointId: 'scheduled_handler_without_result')]
#[Poller(fixedRateInMilliseconds: 1)]
public function produceNothing(): ?int
{
return null;
}

#[InternalHandler(inputChannelName: 'add')]
public function add(): void
{
$this->counter += 1;
}

public function getCounter(): int
{
return $this->counter;
}
}
84 changes: 84 additions & 0 deletions packages/OpenTelemetry/tests/Integration/TracingTreeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
use Test\Ecotone\OpenTelemetry\Fixture\CommandEventFlow\RegisterUser;
use Test\Ecotone\OpenTelemetry\Fixture\CommandEventFlow\User;
use Test\Ecotone\OpenTelemetry\Fixture\MessageHandlerFlow\ExampleMessageHandler;
use Test\Ecotone\OpenTelemetry\Fixture\ScheduledHandler\ScheduledHandler;
use Test\Ecotone\OpenTelemetry\Fixture\ScheduledHandler\WorkflowScheduledHandler;

/**
* @internal
Expand Down Expand Up @@ -102,6 +104,88 @@ public function test_tracing_tree_with_two_levels_of_nesting_and_message_handler
);
}

public function test_tracing_scheduled_handler()
{
$exporter = new InMemoryExporter(new ArrayObject());

EcotoneLite::bootstrapFlowTesting(
[ScheduledHandler::class],
[TracerProviderInterface::class => TracingTest::prepareTracer($exporter), ScheduledHandler::class => new ScheduledHandler()],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::TRACING_PACKAGE]))
)
->run('scheduled_handler', ExecutionPollingMetadata::createWithDefaults()->withExecutionAmountLimit(2));

self::compareTreesByDetails(
[
[
'details' => ['name' => 'Endpoint: scheduled_handler produced Message'],
'children' => [],
],
[
'details' => ['name' => 'Endpoint: scheduled_handler produced Message'],
'children' => [],
],
],
self::buildTree($exporter)
);
}

public function test_tracing_workflow_scheduled_handler()
{
$exporter = new InMemoryExporter(new ArrayObject());

EcotoneLite::bootstrapFlowTesting(
[WorkflowScheduledHandler::class],
[TracerProviderInterface::class => TracingTest::prepareTracer($exporter), WorkflowScheduledHandler::class => new WorkflowScheduledHandler()],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::TRACING_PACKAGE]))
)
->run('scheduled_handler', ExecutionPollingMetadata::createWithDefaults()->withExecutionAmountLimit(2));

self::compareTreesByDetails(
[
[
'details' => ['name' => 'Endpoint: scheduled_handler produced Message'],
'children' => [
[
'details' => ['name' => 'Message Handler: ' . WorkflowScheduledHandler::class . '::add'],
'children' => [],
],
],
],
[
'details' => ['name' => 'Endpoint: scheduled_handler produced Message'],
'children' => [
[
'details' => ['name' => 'Message Handler: ' . WorkflowScheduledHandler::class . '::add'],
'children' => [],
],
],
]
],
self::buildTree($exporter)
);
}

public function test_no_tracing_when_there_is_nothing_to_trace()
{
$exporter = new InMemoryExporter(new ArrayObject());

EcotoneLite::bootstrapFlowTesting(
[WorkflowScheduledHandler::class],
[TracerProviderInterface::class => TracingTest::prepareTracer($exporter), WorkflowScheduledHandler::class => new WorkflowScheduledHandler()],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::TRACING_PACKAGE]))
)
->run('scheduled_handler_without_result', ExecutionPollingMetadata::createWithDefaults()->withExecutionAmountLimit(1));

$this->assertEquals(
[],
self::buildTree($exporter)
);
}

public function test_disabling_force_flushing_traces()
{
$exporter = new InMemoryExporter(new ArrayObject());
Expand Down

0 comments on commit 61a96c2

Please sign in to comment.