diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs new file mode 100644 index 0000000000..742efa89a1 --- /dev/null +++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQBrokerQueueDetails.cs @@ -0,0 +1,40 @@ +#nullable enable +namespace ServiceControl.Transports.RabbitMQ; + +using System.Collections.Generic; +using System.Text.Json; +using ServiceControl.Transports.BrokerThroughput; + +public class RabbitMQBrokerQueueDetails(JsonElement token) : IBrokerQueue +{ + public string QueueName { get; } = token.GetProperty("name").GetString()!; + public string SanitizedName => QueueName; + public string Scope => VHost; + public string VHost { get; } = token.GetProperty("vhost").GetString()!; + public List EndpointIndicators { get; } = []; + long? AckedMessages { get; set; } = FromToken(token); + long Baseline { get; set; } = FromToken(token) ?? 0; + + public long CalculateThroughputFrom(RabbitMQBrokerQueueDetails newReading) + { + var newlyAckedMessages = 0L; + if (newReading.AckedMessages is null) + { + return newlyAckedMessages; + } + + if (newReading.AckedMessages.Value >= Baseline) + { + newlyAckedMessages = newReading.AckedMessages.Value - Baseline; + AckedMessages += newlyAckedMessages; + } + Baseline = newReading.AckedMessages.Value; + + return newlyAckedMessages; + } + + static long? FromToken(JsonElement jsonElement) => + jsonElement.TryGetProperty("message_stats", out var stats) && stats.TryGetProperty("ack", out var val) + ? val.GetInt64() + : null; +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs index 458a29363d..9bacec74c3 100644 --- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs +++ b/src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs @@ -9,6 +9,7 @@ namespace ServiceControl.Transports.RabbitMQ; using System.Net.Http; using System.Net.Http.Json; using System.Runtime.CompilerServices; +using System.Text.Json; using System.Text.Json.Nodes; using System.Threading; using System.Threading.Tasks; @@ -88,15 +89,21 @@ protected override void InitializeCore(ReadOnlyDictionary settin if (InitialiseErrors.Count == 0) { - httpClient = new HttpClient(new SocketsHttpHandler - { - Credentials = defaultCredential, - PooledConnectionLifetime = TimeSpan.FromMinutes(2) - }) - { BaseAddress = new Uri(apiUrl) }; + // ideally we would use the HttpClientFactory, but it would be a bit more involved to set that up + // so for now we are using a virtual method that can be overriden in tests + // https://github.com/Particular/ServiceControl/issues/4493 + httpClient = CreateHttpClient(defaultCredential, apiUrl); } } + protected virtual HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) => + new(new SocketsHttpHandler + { + Credentials = defaultCredential, + PooledConnectionLifetime = TimeSpan.FromMinutes(2) + }) + { BaseAddress = new Uri(apiUrl) }; + public override async IAsyncEnumerable GetThroughputPerDay(IBrokerQueue brokerQueue, DateOnly startDate, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -105,49 +112,22 @@ public override async IAsyncEnumerable GetThroughputPerDay(IBro var url = $"/api/queues/{HttpUtility.UrlEncode(queue.VHost)}/{HttpUtility.UrlEncode(queue.QueueName)}"; logger.LogDebug($"Querying {url}"); - var node = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync(url, token), cancellationToken); - queue.AckedMessages = GetAck(); + var newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync(url, token)), cancellationToken); + _ = queue.CalculateThroughputFrom(newReading); // looping for 24hrs, in 4 increments of 15 minutes for (var i = 0; i < 24 * 4; i++) { - bool throughputSent = false; await Task.Delay(TimeSpan.FromMinutes(15), timeProvider, cancellationToken); logger.LogDebug($"Querying {url}"); - node = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync(url, token), cancellationToken); - var newReading = GetAck(); - if (newReading is not null) - { - if (newReading >= queue.AckedMessages) - { - yield return new QueueThroughput - { - DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime), - TotalThroughput = newReading.Value - queue.AckedMessages.Value - }; - throughputSent = true; - } - queue.AckedMessages = newReading; - } - - if (!throughputSent) - { - yield return new QueueThroughput - { - DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime), - TotalThroughput = 0 - }; - } - } - yield break; + newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync(url, token)), cancellationToken); - long? GetAck() - { - if (node!["message_stats"] is JsonObject stats && stats["ack"] is JsonValue val) + var newTotalThroughput = queue.CalculateThroughputFrom(newReading); + yield return new QueueThroughput { - return val.GetValue(); - } - return null; + DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime), + TotalThroughput = newTotalThroughput + }; } } @@ -266,7 +246,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can } } - async Task<(RabbitMQBrokerQueueDetails[]?, bool morePages)> GetPage(int page, CancellationToken cancellationToken) + public async Task<(RabbitMQBrokerQueueDetails[]?, bool morePages)> GetPage(int page, CancellationToken cancellationToken) { var url = $"/api/queues/{HttpUtility.UrlEncode(connectionConfiguration.VirtualHost)}?page={page}&page_size=500&name=&use_regex=false&pagination=true"; @@ -283,22 +263,27 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can return (null, false); } - var queues = items.Select(item => new RabbitMQBrokerQueueDetails(item!)).ToArray(); - - return (queues, pageCount > pageReturned); + return (MaterializeQueueDetails(items), pageCount > pageReturned); } // Older versions of RabbitMQ API did not have paging and returned the array of items directly case JsonArray arr: { - var queues = arr.Select(item => new RabbitMQBrokerQueueDetails(item!)).ToArray(); - - return (queues, false); + return (MaterializeQueueDetails(arr), false); } default: throw new Exception("Was not able to get list of queues from RabbitMQ broker."); } } + static RabbitMQBrokerQueueDetails[] MaterializeQueueDetails(JsonArray items) + { + // It is not possible to directly operated on the JsonNode. When the JsonNode is a JObject + // and the indexer is access the internal dictionary is initialized which can cause key not found exceptions + // when the payload contains the same key multiple times (which happened in the past). + var queues = items.Select(item => new RabbitMQBrokerQueueDetails(item!.Deserialize())).ToArray(); + return queues; + } + public override KeyDescriptionPair[] Settings => [ new KeyDescriptionPair(RabbitMQSettings.API, RabbitMQSettings.APIDescription), diff --git a/src/ServiceControl.Transports.RabbitMQ/RabbitMQQueueDetails.cs b/src/ServiceControl.Transports.RabbitMQ/RabbitMQQueueDetails.cs deleted file mode 100644 index 638d3ad56c..0000000000 --- a/src/ServiceControl.Transports.RabbitMQ/RabbitMQQueueDetails.cs +++ /dev/null @@ -1,16 +0,0 @@ -#nullable enable -namespace ServiceControl.Transports.RabbitMQ; - -using System.Collections.Generic; -using System.Text.Json.Nodes; -using ServiceControl.Transports.BrokerThroughput; - -public class RabbitMQBrokerQueueDetails(JsonNode token) : IBrokerQueue -{ - public string QueueName { get; } = token["name"]!.GetValue(); - public string SanitizedName => QueueName; - public string? Scope => VHost; - public string VHost { get; } = token["vhost"]!.GetValue(); - public long? AckedMessages { get; set; } - public List EndpointIndicators { get; } = []; -} \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_fetch_queue_details_in_old_format.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_fetch_queue_details_in_old_format.approved.txt new file mode 100644 index 0000000000..063ba3dd64 --- /dev/null +++ b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_fetch_queue_details_in_old_format.approved.txt @@ -0,0 +1,23 @@ +[ + { + "QueueName": "queue1", + "SanitizedName": "queue1", + "Scope": "vhost1", + "VHost": "vhost1", + "EndpointIndicators": [] + }, + { + "QueueName": "queue2", + "SanitizedName": "queue2", + "Scope": "vhost2", + "VHost": "vhost2", + "EndpointIndicators": [] + }, + { + "QueueName": "queue3", + "SanitizedName": "queue3", + "Scope": "vhost1", + "VHost": "vhost1", + "EndpointIndicators": [] + } +] \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_handle_duplicated_json_data.approved.txt b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_handle_duplicated_json_data.approved.txt new file mode 100644 index 0000000000..016fd18d12 --- /dev/null +++ b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/ApprovalFiles/RabbitMQQuery_ResponseParsing_Tests.Should_handle_duplicated_json_data.approved.txt @@ -0,0 +1,16 @@ +[ + { + "QueueName": "queue1", + "SanitizedName": "queue1", + "Scope": "vhost1", + "VHost": "vhost1", + "EndpointIndicators": [] + }, + { + "QueueName": "queue2", + "SanitizedName": "queue2", + "Scope": "vhost2", + "VHost": "vhost2", + "EndpointIndicators": [] + } +] \ No newline at end of file diff --git a/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs new file mode 100644 index 0000000000..9e706832b9 --- /dev/null +++ b/src/ServiceControl.Transports.RabbitMQQuorumDirectRouting.Tests/RabbitMQQuery_ResponseParsing_Tests.cs @@ -0,0 +1,148 @@ +namespace ServiceControl.Transport.Tests; + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Time.Testing; +using NUnit.Framework; +using Transports; +using Transports.RabbitMQ; +using System.Net.Http; +using Particular.Approvals; +using System.Collections.ObjectModel; +using System.Net; + +[TestFixture] +class RabbitMQQuery_ResponseParsing_Tests +{ + FakeTimeProvider provider; + TransportSettings transportSettings; + FakeHttpHandler httpHandler; + RabbitMQQuery rabbitMQQuery; + + [SetUp] + public void SetUp() + { + provider = new(); + provider.SetUtcNow(DateTimeOffset.UtcNow); + transportSettings = new TransportSettings + { + ConnectionString = "host=localhost;username=rabbitmq;password=rabbitmq", + MaxConcurrency = 1, + EndpointName = Guid.NewGuid().ToString("N") + }; + httpHandler = new FakeHttpHandler(); + var httpClient = new HttpClient(httpHandler) { BaseAddress = new Uri("http://localhost:15672") }; + + rabbitMQQuery = new TestableRabbitMQQuery(provider, transportSettings, httpClient); + rabbitMQQuery.Initialize(ReadOnlyDictionary.Empty); + } + + [TearDown] + public void TearDown() => httpHandler.Dispose(); + + public Func SendCallback + { + get => httpHandler.SendCallback; + set => httpHandler.SendCallback = value; + } + + [Test] + public async Task Should_handle_duplicated_json_data() + { + SendCallback = _ => + { + var response = new HttpResponseMessage + { + Content = new StringContent(""" + { + "items": [ + { + "name": "queue1", + "vhost": "vhost1", + "memory": 1024, + "memory": 1024, + "message_stats": { + "ack": 1 + } + }, + { + "name": "queue2", + "vhost": "vhost2", + "vhost": "vhost2", + "message_stats": { + "ack": 2 + } + } + ], + "page": 1, + "page_count": 1, + "page_size": 500, + "total_count": 2 + } + """) + }; + return response; + }; + + var queues = (await rabbitMQQuery.GetPage(1, default)).Item1; + Approver.Verify(queues); + } + + [Test] + public async Task Should_fetch_queue_details_in_old_format() + { + SendCallback = _ => + { + var response = new HttpResponseMessage + { + Content = new StringContent(""" + [ + { + "name": "queue1", + "vhost": "vhost1", + "memory": 1024, + "message_stats": { + "ack": 1 + } + }, + { + "name": "queue2", + "vhost": "vhost2", + "message_stats": { + "ack": 2 + } + }, + { + "name": "queue3", + "vhost": "vhost1" + } + ] + """) + }; + return response; + }; + + var queues = (await rabbitMQQuery.GetPage(1, default)).Item1; + Approver.Verify(queues); + } + + sealed class TestableRabbitMQQuery( + TimeProvider timeProvider, + TransportSettings transportSettings, + HttpClient customHttpClient) + : RabbitMQQuery(NullLogger.Instance, timeProvider, transportSettings) + { + protected override HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) => customHttpClient; + } + + sealed class FakeHttpHandler : HttpClientHandler + { + public Func SendCallback { get; set; } + + protected override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) => SendCallback(request); + + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) => Task.FromResult(SendCallback(request)); + } +} \ No newline at end of file diff --git a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs index ced3129705..c6eefe193b 100644 --- a/src/ServiceControl.Transports.Tests/TransportTestFixture.cs +++ b/src/ServiceControl.Transports.Tests/TransportTestFixture.cs @@ -48,7 +48,10 @@ public virtual async Task Cleanup() await transportInfrastructure.Shutdown(); } - await dispatcherTransportInfrastructure.Shutdown(); + if (dispatcherTransportInfrastructure != null) + { + await dispatcherTransportInfrastructure.Shutdown(); + } if (configuration != null) {