Skip to content

Commit

Permalink
feat: introduce metrics based on system diagnostics (#84)
Browse files Browse the repository at this point in the history
* feat: introduce metrics based on system diagnostics
* closes: #58

---------

Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Co-authored-by: Gabriele Santomaggio <G.santomaggio@gmail.com>
Co-authored-by: Luke Bakken <luke@bakken.io>
  • Loading branch information
3 people authored Nov 19, 2024
1 parent bcdee01 commit f47744a
Show file tree
Hide file tree
Showing 30 changed files with 950 additions and 78 deletions.
14 changes: 8 additions & 6 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
<ItemGroup>
<!-- RabbitMQ.Amqp.Client -->
<PackageVersion Include="AMQPNetLite.Core" Version="2.4.11" />
<PackageVersion Include="OpenTelemetry" Version="1.10.0" />
<PackageVersion Include="OpenTelemetry.Exporter.Console" Version="1.10.0" />
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="9.0.0" />
<!-- HAClient -->
<PackageVersion Include="DotNext.Threading" Version="5.15.0" />
<!-- Tests -->
<PackageVersion Include="Microsoft.Extensions.Diagnostics" Version="9.0.0" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="9.0.0" />
<PackageVersion Include="System.Text.Json" Version="9.0.0" />
<PackageVersion Include="xunit" Version="2.9.2" />
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
<PackageVersion Include="Xunit.SkippableFact" Version="1.4.13" />
Expand All @@ -22,11 +28,7 @@
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1481#pullrequestreview-1847905299
* https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1594
-->
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="6.0.0" />
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.0.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)'=='net8.0'">
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="8.0.1" />
<PackageVersion Include="System.Runtime.CompilerServices.Unsafe" Version="6.1.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFrameworkIdentifier)'=='.NETFramework'">
<GlobalPackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.3" />
Expand All @@ -36,4 +38,4 @@
<GlobalPackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" />
<GlobalPackageReference Include="MinVer" Version="6.0.0" />
</ItemGroup>
</Project>
</Project>
1 change: 1 addition & 0 deletions RabbitMQ.AMQP.Client/ILifeCycle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public override string ToString()

public delegate void LifeCycleCallBack(object sender, State previousState, State currentState, Error? failureCause);

// TODO consider adding IAsyncDisposable that could call CloseAsync()
public interface ILifeCycle : IDisposable
{
Task CloseAsync();
Expand Down
40 changes: 40 additions & 0 deletions RabbitMQ.AMQP.Client/IMetricsReporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;

namespace RabbitMQ.AMQP.Client
{
public interface IMetricsReporter
{
enum PublishDispositionValue
{
ACCEPTED,
REJECTED,
RELEASED
};

enum ConsumeDispositionValue
{
ACCEPTED,
DISCARDED,
REQUEUED
};

void ConnectionOpened();
void ConnectionClosed();

void PublisherOpened();
void PublisherClosed();

void ConsumerOpened();
void ConsumerClosed();

void Published(TimeSpan elapsed);
void PublishDisposition(PublishDispositionValue disposition);

void Consumed(TimeSpan elapsed);
void ConsumeDisposition(ConsumeDispositionValue disposition);
}
}
33 changes: 19 additions & 14 deletions RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly AmqpManagement _management;
private readonly RecordingTopologyListener _recordingTopologyListener = new();

private readonly IConnectionSettings _connectionSettings;
internal readonly IConnectionSettings _connectionSettings;
private readonly IMetricsReporter? _metricsReporter;
internal readonly AmqpSessionManagement _nativePubSubSessions;

private readonly Dictionary<string, object> _connectionProperties = new();
Expand Down Expand Up @@ -104,7 +105,6 @@ public IRpcServerBuilder RpcServerBuilder()

public IRpcClientBuilder RpcClientBuilder()
{

return new AmqpRpcClientBuilder(this);
}

Expand All @@ -131,17 +131,22 @@ public IEnumerable<IConsumer> Consumers
public IReadOnlyDictionary<string, object> Properties => _connectionProperties;

public long Id { get; set; }

/// <summary>
/// Creates a new instance of <see cref="AmqpConnection"/>
/// Through the Connection is possible to create:
/// - Management. See <see cref="AmqpManagement"/>
/// - Publishers and Consumers: See <see cref="AmqpPublisherBuilder"/> and <see cref="AmqpConsumerBuilder"/>
/// </summary>
/// <param name="connectionSettings"></param>
/// <param name="metricsReporter"></param>
/// <returns></returns>
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings)
// TODO to play nicely with IoC containers, we should not have static Create methods
// TODO rename to CreateAndOpenAsync
public static async Task<IConnection> CreateAsync(IConnectionSettings connectionSettings,
IMetricsReporter? metricsReporter = default)
{
var connection = new AmqpConnection(connectionSettings);
var connection = new AmqpConnection(connectionSettings, metricsReporter);
await connection.OpenAsync()
.ConfigureAwait(false);
return connection;
Expand All @@ -154,7 +159,7 @@ public IManagement Management()

public IConsumerBuilder ConsumerBuilder()
{
return new AmqpConsumerBuilder(this);
return new AmqpConsumerBuilder(this, _metricsReporter);
}

// TODO cancellation token
Expand All @@ -170,7 +175,7 @@ await base.OpenAsync()
public IPublisherBuilder PublisherBuilder()
{
ThrowIfClosed();
var publisherBuilder = new AmqpPublisherBuilder(this);
var publisherBuilder = new AmqpPublisherBuilder(this, _metricsReporter);
return publisherBuilder;
}

Expand Down Expand Up @@ -235,6 +240,7 @@ protected override void Dispose(bool disposing)
{
_nativeConnection.Closed -= _closedCallback;
}

_semaphoreOpen.Dispose();
_semaphoreClose.Dispose();
}
Expand Down Expand Up @@ -265,9 +271,10 @@ await consumer.CloseAsync()
}
}

private AmqpConnection(IConnectionSettings connectionSettings)
private AmqpConnection(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter)
{
_connectionSettings = connectionSettings;
_metricsReporter = metricsReporter;
_nativePubSubSessions = new AmqpSessionManagement(this, 1);
_management =
new AmqpManagement(new AmqpManagementParameters(this).TopologyListener(_recordingTopologyListener));
Expand All @@ -291,10 +298,7 @@ await _semaphoreOpen.WaitAsync(cancellationToken)
HostName = $"vhost:{_connectionSettings.VirtualHost}",
// Note: no need to set cf.AMQP.ContainerId
ContainerId = _connectionSettings.ContainerId,
Properties = new Fields()
{
[new Symbol("connection_name")] = _connectionSettings.ContainerId,
}
Properties = new Fields() { [new Symbol("connection_name")] = _connectionSettings.ContainerId, }
};

if (_connectionSettings.MaxFrameSize > uint.MinValue)
Expand Down Expand Up @@ -350,7 +354,8 @@ void OnOpened(Amqp.IConnection connection, Open openOnOpened)
if (_connectionSettings is null)
{
// TODO create "internal bug" exception type?
throw new InvalidOperationException("_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
throw new InvalidOperationException(
"_connectionSettings is null, report via https://github.com/rabbitmq/rabbitmq-amqp-dotnet-client/issues");
}
else
{
Expand Down Expand Up @@ -497,8 +502,8 @@ await OpenConnectionAsync(CancellationToken.None)
if (false == connected)
{
var notRecoveredError = new Error(ConnectionNotRecoveredCode,
$"{ConnectionNotRecoveredMessage}," +
$"recover status: {_connectionSettings.Recovery}");
$"{ConnectionNotRecoveredMessage}," +
$"recover status: {_connectionSettings.Recovery}");
DoClose(notRecoveredError);
return;
}
Expand Down
35 changes: 29 additions & 6 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
using Trace = Amqp.Trace;
using TraceLevel = Amqp.TraceLevel;

namespace RabbitMQ.AMQP.Client.Impl
{
Expand All @@ -28,12 +31,13 @@ private enum PauseStatus
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
private readonly ConsumerConfiguration _configuration;
private readonly IMetricsReporter? _metricsReporter;

internal AmqpConsumer(AmqpConnection amqpConnection, ConsumerConfiguration configuration)
internal AmqpConsumer(AmqpConnection amqpConnection, ConsumerConfiguration configuration, IMetricsReporter? metricsReporter)
{
_amqpConnection = amqpConnection;
_configuration = configuration;

_metricsReporter = metricsReporter;
_amqpConnection.AddConsumer(_id, this);
}

Expand Down Expand Up @@ -127,11 +131,21 @@ private async Task ProcessMessages()
return;
}

Stopwatch? stopwatch = null;
if (_metricsReporter is not null)
{
stopwatch = new();
}

while (_receiverLink is { LinkState: LinkState.Attached })
{
stopwatch?.Restart();

// TODO the timeout waiting for messages should be configurable
TimeSpan timeout = TimeSpan.FromSeconds(60);
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout).ConfigureAwait(false);
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout)
.ConfigureAwait(false);

if (nativeMessage is null)
{
// this is not a problem, it is just a timeout.
Expand All @@ -144,15 +158,24 @@ private async Task ProcessMessages()

_unsettledMessageCounter.Increment();

IContext context = new DeliveryContext(_receiverLink, nativeMessage, _unsettledMessageCounter);
IContext context = new DeliveryContext(_receiverLink, nativeMessage,
_unsettledMessageCounter, _metricsReporter);
var amqpMessage = new AmqpMessage(nativeMessage);

// TODO catch exceptions thrown by handlers,
// then call exception handler?
if (_configuration.Handler != null)
if (_configuration.Handler is not null)
{
await _configuration.Handler(context, amqpMessage).ConfigureAwait(false);
await _configuration.Handler(context, amqpMessage)
.ConfigureAwait(false);
}

if (_metricsReporter is not null && stopwatch is not null)
{
stopwatch.Stop();
_metricsReporter.Consumed(stopwatch.Elapsed);
}

}
}
catch (Exception e)
Expand Down
7 changes: 5 additions & 2 deletions RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ internal sealed class ConsumerConfiguration
public string Address { get; set; } = "";
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
public Map Filters { get; set; } = new();
// TODO is a MessageHandler *really* optional???
public MessageHandler? Handler { get; set; }
// TODO re-name to ListenerContextAction? Callback?
public Action<IConsumerBuilder.ListenerContext>? ListenerContext = null;
Expand All @@ -32,10 +33,12 @@ public class AmqpConsumerBuilder : IConsumerBuilder
{
private readonly ConsumerConfiguration _configuration = new();
private readonly AmqpConnection _amqpConnection;
private readonly IMetricsReporter? _metricsReporter;

public AmqpConsumerBuilder(AmqpConnection connection)
public AmqpConsumerBuilder(AmqpConnection connection, IMetricsReporter? metricsReporter)
{
_amqpConnection = connection;
_metricsReporter = metricsReporter;
}

public IConsumerBuilder Queue(IQueueSpecification queueSpec)
Expand Down Expand Up @@ -81,7 +84,7 @@ public async Task<IConsumer> BuildAndStartAsync(CancellationToken cancellationTo
throw new ConsumerException("Message handler is not set");
}

AmqpConsumer consumer = new(_amqpConnection, _configuration);
AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);

// TODO pass cancellationToken
await consumer.OpenAsync()
Expand Down
13 changes: 8 additions & 5 deletions RabbitMQ.AMQP.Client/Impl/AmqpEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,26 @@ namespace RabbitMQ.AMQP.Client.Impl
{
public class AmqpEnvironment : IEnvironment
{
private IConnectionSettings? ConnectionSettings { get; }
private IConnectionSettings ConnectionSettings { get; }
private long _sequentialId = 0;
private readonly ConcurrentDictionary<long, IConnection> _connections = new();
private readonly IMetricsReporter? _metricsReporter;

private AmqpEnvironment(IConnectionSettings connectionSettings)
private AmqpEnvironment(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter = default)
{
ConnectionSettings = connectionSettings;
_metricsReporter = metricsReporter;
}

public static Task<IEnvironment> CreateAsync(IConnectionSettings connectionSettings)
// TODO to play nicely with IoC containers, we should not have static Create methods
public static IEnvironment Create(IConnectionSettings connectionSettings, IMetricsReporter? metricsReporter = default)
{
return Task.FromResult<IEnvironment>(new AmqpEnvironment(connectionSettings));
return new AmqpEnvironment(connectionSettings, metricsReporter);
}

public async Task<IConnection> CreateConnectionAsync(IConnectionSettings connectionSettings)
{
IConnection c = await AmqpConnection.CreateAsync(connectionSettings).ConfigureAwait(false);
IConnection c = await AmqpConnection.CreateAsync(connectionSettings, _metricsReporter).ConfigureAwait(false);
c.Id = Interlocked.Increment(ref _sequentialId);
_connections.TryAdd(c.Id, c);
c.ChangeState += (sender, previousState, currentState, failureCause) =>
Expand Down
Loading

0 comments on commit f47744a

Please sign in to comment.