Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce open telemetry support #49

Merged
merged 1 commit into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,35 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Introduce instrumentation support for Open Telemetry.
```csharp
builder
.Services
.AddOpenTelemetry()
.WithMetrics(metrics =>
{
metrics
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddRuntimeInstrumentation();
})
.WithTracing(tracing =>
{
if (builder.Environment.IsDevelopment())
{
// We want to view all traces in development
tracing.SetSampler(new AlwaysOnSampler());
}

tracing
.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddSource(EventStoreDiagnostics.SourceName); // enable trace telemetry from event store and cqrs.
});
```

## [1.12.6] - 2023-10-10

### Fixed
Expand Down
14 changes: 14 additions & 0 deletions src/Atc.Cosmos.EventStore.Cqrs/Commands/CommandProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
using Atc.Cosmos.EventStore.Cqrs.Diagnostics;

namespace Atc.Cosmos.EventStore.Cqrs.Commands;

internal class CommandProcessor<TCommand> : ICommandProcessor<TCommand>
where TCommand : ICommand
{
private readonly ICommandTelemetry telemetry;
private readonly IStateWriter<TCommand> stateWriter;
private readonly IStateProjector<TCommand> stateProjector;
private readonly ICommandHandlerFactory handlerFactory;

public CommandProcessor(
ICommandTelemetry telemetry,
IStateWriter<TCommand> stateWriter,
IStateProjector<TCommand> stateProjector,
ICommandHandlerFactory handlerFactory)
{
this.telemetry = telemetry;
this.stateWriter = stateWriter;
this.stateProjector = stateProjector;
this.handlerFactory = handlerFactory;
Expand Down Expand Up @@ -44,6 +49,7 @@ private async ValueTask<CommandResult> SafeExecuteAsync(
int reruns,
CancellationToken cancellationToken)
{
using var activity = telemetry.CommandStarted(command);
try
{
var handler = handlerFactory.Create<TCommand>();
Expand All @@ -61,6 +67,8 @@ await handler

if (context.Events.Count == 0)
{
activity.NotModified();

// Command did not yield any events
return new CommandResult(
state.Id,
Expand All @@ -74,6 +82,8 @@ await handler
.WriteEventAsync(command, context.Events, cancellationToken)
.ConfigureAwait(false);

activity.Changed();

return new CommandResult(
result.Id,
result.Version,
Expand All @@ -90,13 +100,17 @@ await handler
.ConfigureAwait(false);
}

activity.Conflict();

return new CommandResult(
conflict.StreamId,
conflict.Version,
ResultType.Conflict);
}
catch (StreamVersionConflictException versionConflict)
{
activity.Conflict();

return new CommandResult(
versionConflict.StreamId,
versionConflict.Version,
Expand Down
7 changes: 7 additions & 0 deletions src/Atc.Cosmos.EventStore.Cqrs/Commands/StateProjector.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
using Atc.Cosmos.EventStore.Cqrs.Diagnostics;
using Atc.Cosmos.EventStore.Streams;

namespace Atc.Cosmos.EventStore.Cqrs.Commands;

internal class StateProjector<TCommand> : IStateProjector<TCommand>
where TCommand : ICommand
{
private readonly ICommandTelemetry telemetry;
private readonly IEventStoreClient eventStore;
private readonly IStreamReadValidator readValidator;
private readonly ICommandHandlerMetadata<TCommand> handlerMetadata;

public StateProjector(
ICommandTelemetry telemetry,
IEventStoreClient eventStore,
IStreamReadValidator readValidator,
ICommandHandlerMetadata<TCommand> handlerMetadata)
{
this.telemetry = telemetry;
this.eventStore = eventStore;
this.readValidator = readValidator;
this.handlerMetadata = handlerMetadata;
Expand Down Expand Up @@ -47,6 +51,7 @@ public async ValueTask<IStreamState> ProjectAsync(
return state;
}

using var activity = telemetry.ProjectionStarted();
await foreach (var evt in eventStore
.ReadFromStreamAsync(
state.Id,
Expand All @@ -65,6 +70,8 @@ await handlerMetadata
state.Version = evt.Metadata.Version;
}

activity.Completed(state.Version);

return state;
}
}
13 changes: 10 additions & 3 deletions src/Atc.Cosmos.EventStore.Cqrs/Commands/StateWriter.cs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
using Atc.Cosmos.EventStore.Cqrs.Diagnostics;

namespace Atc.Cosmos.EventStore.Cqrs.Commands;

internal class StateWriter<TCommand> : IStateWriter<TCommand>
where TCommand : ICommand
{
private readonly ICommandTelemetry telemetry;
private readonly IEventStoreClient eventStore;

public StateWriter(
ICommandTelemetry telemetry,
IEventStoreClient eventStore)
{
this.telemetry = telemetry;
this.eventStore = eventStore;
}

Expand Down Expand Up @@ -43,9 +48,11 @@ private async ValueTask<CommandResult> WriteToEventStoreAsync(
StreamVersion version,
StreamWriteOptions options,
IReadOnlyCollection<object> events,
int reties,
int retries,
CancellationToken cancellationToken)
{
using var activity = telemetry.WriteToStreamStarted(version, events.Count, retries);

try
{
var response = await eventStore
Expand All @@ -61,14 +68,14 @@ private async ValueTask<CommandResult> WriteToEventStoreAsync(
}
catch (StreamWriteConflictException)
{
if (reties > 0)
if (retries > 0)
{
return await WriteToEventStoreAsync(
id,
version,
options,
events,
reties - 1,
retries - 1,
cancellationToken)
.ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ public static EventStoreOptionsBuilder UseCQRS(
builder.Services.AddSingleton(typeof(ICommandProcessor<>), typeof(CommandProcessor<>));
builder.Services.AddSingleton<ICommandProcessorFactory, CommandProcessorFactory>();
builder.Services.AddSingleton<ICommandHandlerFactory, CommandHandlerFactory>();
builder.Services.AddSingleton<ICommandTelemetry, CommandTelemetry>();

builder.Services.AddSingleton<IProjectionOptionsFactory, ProjectionOptionsFactory>();
builder.Services.AddSingleton<IProjectionFactory, ProjectionFactory>();

builder.Services.AddSingleton(typeof(ProjectionMetadata<>));

builder.Services.TryAddSingleton<IProjectionDiagnostics, ProjectionDiagnostics>();
builder.Services.TryAddSingleton<IProjectionProcessOperation, ProjectionProcessOperation>();
builder.Services.TryAddSingleton<IProjectionTelemetry, ProjectionTelemetry>();

return builder;
}
Expand Down
33 changes: 33 additions & 0 deletions src/Atc.Cosmos.EventStore.Cqrs/Diagnostics/CommandActivity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System.Diagnostics;

namespace Atc.Cosmos.EventStore.Cqrs.Diagnostics;

internal sealed class CommandActivity : ICommandActivity
{
private readonly Activity? activity;

public CommandActivity(
Activity? activity)
=> this.activity = activity;

public void Changed()
{
activity?.SetStatus(ActivityStatusCode.Ok, "Changed");
activity?.Stop();
}

public void Conflict()
{
activity?.SetStatus(ActivityStatusCode.Error, "Conflict");
activity?.Stop();
}

public void Dispose()
=> activity?.Dispose();

public void NotModified()
{
activity?.SetStatus(ActivityStatusCode.Ok, "NotModified");
activity?.Stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using System.Diagnostics;

namespace Atc.Cosmos.EventStore.Cqrs.Diagnostics;

public sealed class CommandProjectionActivity
: ICommandProjectionActivity
{
private readonly Activity? activity;

public CommandProjectionActivity(
Activity? activity)
=> this.activity = activity;

public void Completed(StreamVersion version)
{
activity?.AddTag(EventStoreDiagnostics.TagAttributes.StreamVersion, $"{(long)version}");
activity?.SetStatus(ActivityStatusCode.Ok);
activity?.Stop();
}

public void Dispose()
{
activity?.Stop();
activity?.Dispose();
}
}
52 changes: 52 additions & 0 deletions src/Atc.Cosmos.EventStore.Cqrs/Diagnostics/CommandTelemetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using System.Diagnostics;

namespace Atc.Cosmos.EventStore.Cqrs.Diagnostics;

public class CommandTelemetry : ICommandTelemetry
{
public ICommandActivity CommandStarted<TCommand>(
TCommand command)
where TCommand : ICommand
{
var activity = EventStoreDiagnostics.Source.StartActivity(
name: $"Execute {typeof(TCommand).Name} {command.GetEventStreamId().Value}",
kind: ActivityKind.Internal,
tags: new Dictionary<string, object?>
{
{ EventStoreDiagnostics.TagAttributes.StreamId, command.GetEventStreamId().Value },
{ EventStoreDiagnostics.TagAttributes.RequiredVersion, (long?)command.RequiredVersion },
{ EventStoreDiagnostics.TagAttributes.Behavior, command.Behavior },
{ EventStoreDiagnostics.TagAttributes.BehaviorCount, command.BehaviorCount },
{ EventStoreDiagnostics.TagAttributes.CommandId, command.CommandId },
{ EventStoreDiagnostics.TagAttributes.CorrelationId, command.CorrelationId },
});

return new CommandActivity(activity);
}

public ICommandProjectionActivity ProjectionStarted()
{
var activity = EventStoreDiagnostics.Source.StartActivity(
$"Projection events from stream",
ActivityKind.Internal);

return new CommandProjectionActivity(activity);
}

public IDisposable WriteToStreamStarted(
StreamVersion version,
int count,
int retries)
{
var activity = EventStoreDiagnostics.Source.StartActivity(
name: $"Write events to stream",
kind: ActivityKind.Internal,
tags: new Dictionary<string, object?>
{
{ EventStoreDiagnostics.TagAttributes.EventCount, count },
{ EventStoreDiagnostics.TagAttributes.RequiredVersion, (long)version },
});

return new CommandWriterActivity(activity);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using System.Diagnostics;

namespace Atc.Cosmos.EventStore.Cqrs.Diagnostics;

internal sealed class CommandWriterActivity : IDisposable
{
private readonly Activity? activity;

public CommandWriterActivity(
Activity? activity)
=> this.activity = activity;

public void Dispose()
{
activity?.SetStatus(ActivityStatusCode.Ok);
activity?.Stop();
activity?.Dispose();
}
}
10 changes: 10 additions & 0 deletions src/Atc.Cosmos.EventStore.Cqrs/Diagnostics/ICommandActivity.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
namespace Atc.Cosmos.EventStore.Cqrs.Diagnostics;

public interface ICommandActivity : IDisposable
{
void Changed();

void Conflict();

void NotModified();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace Atc.Cosmos.EventStore.Cqrs.Diagnostics;

public interface ICommandProjectionActivity
: IDisposable
{
void Completed(StreamVersion version);
}
15 changes: 15 additions & 0 deletions src/Atc.Cosmos.EventStore.Cqrs/Diagnostics/ICommandTelemetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace Atc.Cosmos.EventStore.Cqrs.Diagnostics;

public interface ICommandTelemetry
{
ICommandActivity CommandStarted<TCommand>(
TCommand command)
where TCommand : ICommand;

ICommandProjectionActivity ProjectionStarted();

IDisposable WriteToStreamStarted(
StreamVersion version,
int count,
int retries);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Atc.Cosmos.EventStore.Cqrs.Diagnostics;

public interface IProjectionBatchTelemetry : IDisposable
{
IProjectionProcessOperationTelemetry StartProjection(
StreamId streamId);

void BatchCompleted();

void BatchFailed(
Exception exception);
}
Loading
Loading