Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ queue details parsing doesn't account for duplicated elements #4490

Merged
merged 8 commits into from
Oct 3, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#nullable enable
namespace ServiceControl.Transports.RabbitMQ;

using System.Collections.Generic;
using System.Text.Json;
using ServiceControl.Transports.BrokerThroughput;

public class RabbitMQBrokerQueueDetails(JsonElement token) : IBrokerQueue
{
public string QueueName { get; } = token.GetProperty("name").GetString()!;
public string SanitizedName => QueueName;
public string Scope => VHost;
public string VHost { get; } = token.GetProperty("vhost").GetString()!;
public List<string> EndpointIndicators { get; } = [];
long? AckedMessages { get; set; } = FromToken(token);
long Baseline { get; set; } = FromToken(token) ?? 0;

public long CalculateThroughputFrom(RabbitMQBrokerQueueDetails newReading)
{
var newlyAckedMessages = 0L;
if (newReading.AckedMessages is null)
{
return newlyAckedMessages;
}

if (newReading.AckedMessages.Value >= Baseline)
{
newlyAckedMessages = newReading.AckedMessages.Value - Baseline;
AckedMessages += newlyAckedMessages;
}
Baseline = newReading.AckedMessages.Value;

return newlyAckedMessages;
}

static long? FromToken(JsonElement jsonElement) =>
jsonElement.TryGetProperty("message_stats", out var stats) && stats.TryGetProperty("ack", out var val)
? val.GetInt64()
: null;
}
81 changes: 33 additions & 48 deletions src/ServiceControl.Transports.RabbitMQ/RabbitMQQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ namespace ServiceControl.Transports.RabbitMQ;
using System.Net.Http;
using System.Net.Http.Json;
using System.Runtime.CompilerServices;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -88,15 +89,21 @@ protected override void InitializeCore(ReadOnlyDictionary<string, string> settin

if (InitialiseErrors.Count == 0)
{
httpClient = new HttpClient(new SocketsHttpHandler
{
Credentials = defaultCredential,
PooledConnectionLifetime = TimeSpan.FromMinutes(2)
})
{ BaseAddress = new Uri(apiUrl) };
// ideally we would use the HttpClientFactory, but it would be a bit more involved to set that up
// so for now we are using a virtual method that can be overriden in tests
// https://github.com/Particular/ServiceControl/issues/4493
httpClient = CreateHttpClient(defaultCredential, apiUrl);
}
}

protected virtual HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) =>
new(new SocketsHttpHandler
{
Credentials = defaultCredential,
PooledConnectionLifetime = TimeSpan.FromMinutes(2)
})
{ BaseAddress = new Uri(apiUrl) };

public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBrokerQueue brokerQueue,
DateOnly startDate,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
Expand All @@ -105,49 +112,22 @@ public override async IAsyncEnumerable<QueueThroughput> GetThroughputPerDay(IBro
var url = $"/api/queues/{HttpUtility.UrlEncode(queue.VHost)}/{HttpUtility.UrlEncode(queue.QueueName)}";

logger.LogDebug($"Querying {url}");
var node = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync<JsonNode>(url, token), cancellationToken);
queue.AckedMessages = GetAck();
var newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync<JsonElement>(url, token)), cancellationToken);
_ = queue.CalculateThroughputFrom(newReading);

// looping for 24hrs, in 4 increments of 15 minutes
for (var i = 0; i < 24 * 4; i++)
{
bool throughputSent = false;
await Task.Delay(TimeSpan.FromMinutes(15), timeProvider, cancellationToken);
logger.LogDebug($"Querying {url}");
node = await pipeline.ExecuteAsync(async token => await httpClient!.GetFromJsonAsync<JsonNode>(url, token), cancellationToken);
var newReading = GetAck();
if (newReading is not null)
{
if (newReading >= queue.AckedMessages)
{
yield return new QueueThroughput
{
DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
TotalThroughput = newReading.Value - queue.AckedMessages.Value
};
throughputSent = true;
}
queue.AckedMessages = newReading;
}

if (!throughputSent)
{
yield return new QueueThroughput
{
DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
TotalThroughput = 0
};
}
}
yield break;
newReading = await pipeline.ExecuteAsync(async token => new RabbitMQBrokerQueueDetails(await httpClient!.GetFromJsonAsync<JsonElement>(url, token)), cancellationToken);

long? GetAck()
{
if (node!["message_stats"] is JsonObject stats && stats["ack"] is JsonValue val)
var newTotalThroughput = queue.CalculateThroughputFrom(newReading);
yield return new QueueThroughput
{
return val.GetValue<long>();
}
return null;
DateUTC = DateOnly.FromDateTime(timeProvider.GetUtcNow().DateTime),
TotalThroughput = newTotalThroughput
};
}
}

Expand Down Expand Up @@ -266,7 +246,7 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
}
}

async Task<(RabbitMQBrokerQueueDetails[]?, bool morePages)> GetPage(int page, CancellationToken cancellationToken)
public async Task<(RabbitMQBrokerQueueDetails[]?, bool morePages)> GetPage(int page, CancellationToken cancellationToken)
{
var url = $"/api/queues/{HttpUtility.UrlEncode(connectionConfiguration.VirtualHost)}?page={page}&page_size=500&name=&use_regex=false&pagination=true";

Expand All @@ -283,22 +263,27 @@ async Task AddAdditionalQueueDetails(RabbitMQBrokerQueueDetails brokerQueue, Can
return (null, false);
}

var queues = items.Select(item => new RabbitMQBrokerQueueDetails(item!)).ToArray();

return (queues, pageCount > pageReturned);
return (MaterializeQueueDetails(items), pageCount > pageReturned);
}
// Older versions of RabbitMQ API did not have paging and returned the array of items directly
case JsonArray arr:
{
var queues = arr.Select(item => new RabbitMQBrokerQueueDetails(item!)).ToArray();

return (queues, false);
return (MaterializeQueueDetails(arr), false);
}
default:
throw new Exception("Was not able to get list of queues from RabbitMQ broker.");
}
}

static RabbitMQBrokerQueueDetails[] MaterializeQueueDetails(JsonArray items)
{
// It is not possible to directly operated on the JsonNode. When the JsonNode is a JObject
// and the indexer is access the internal dictionary is initialized which can cause key not found exceptions
// when the payload contains the same key multiple times (which happened in the past).
var queues = items.Select(item => new RabbitMQBrokerQueueDetails(item!.Deserialize<JsonElement>())).ToArray();
return queues;
}

public override KeyDescriptionPair[] Settings =>
[
new KeyDescriptionPair(RabbitMQSettings.API, RabbitMQSettings.APIDescription),
Expand Down
16 changes: 0 additions & 16 deletions src/ServiceControl.Transports.RabbitMQ/RabbitMQQueueDetails.cs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[
{
"QueueName": "queue1",
"SanitizedName": "queue1",
"Scope": "vhost1",
"VHost": "vhost1",
"EndpointIndicators": []
},
{
"QueueName": "queue2",
"SanitizedName": "queue2",
"Scope": "vhost2",
"VHost": "vhost2",
"EndpointIndicators": []
},
{
"QueueName": "queue3",
"SanitizedName": "queue3",
"Scope": "vhost1",
"VHost": "vhost1",
"EndpointIndicators": []
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[
{
"QueueName": "queue1",
"SanitizedName": "queue1",
"Scope": "vhost1",
"VHost": "vhost1",
"EndpointIndicators": []
},
{
"QueueName": "queue2",
"SanitizedName": "queue2",
"Scope": "vhost2",
"VHost": "vhost2",
"EndpointIndicators": []
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
namespace ServiceControl.Transport.Tests;

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Extensions.Time.Testing;
using NUnit.Framework;
using Transports;
using Transports.RabbitMQ;
using System.Net.Http;
using Particular.Approvals;
using System.Collections.ObjectModel;
using System.Net;

[TestFixture]
class RabbitMQQuery_ResponseParsing_Tests
{
FakeTimeProvider provider;
TransportSettings transportSettings;
FakeHttpHandler httpHandler;
RabbitMQQuery rabbitMQQuery;

[SetUp]
public void SetUp()
{
provider = new();
provider.SetUtcNow(DateTimeOffset.UtcNow);
transportSettings = new TransportSettings
{
ConnectionString = "host=localhost;username=rabbitmq;password=rabbitmq",
MaxConcurrency = 1,
EndpointName = Guid.NewGuid().ToString("N")
};
httpHandler = new FakeHttpHandler();
var httpClient = new HttpClient(httpHandler) { BaseAddress = new Uri("http://localhost:15672") };

rabbitMQQuery = new TestableRabbitMQQuery(provider, transportSettings, httpClient);
rabbitMQQuery.Initialize(ReadOnlyDictionary<string, string>.Empty);
}

[TearDown]
public void TearDown() => httpHandler.Dispose();

public Func<HttpRequestMessage, HttpResponseMessage> SendCallback
{
get => httpHandler.SendCallback;
set => httpHandler.SendCallback = value;
}

[Test]
public async Task Should_handle_duplicated_json_data()
{
SendCallback = _ =>
{
var response = new HttpResponseMessage
{
Content = new StringContent("""
{
"items": [
{
"name": "queue1",
"vhost": "vhost1",
"memory": 1024,
"memory": 1024,
"message_stats": {
"ack": 1
}
},
{
"name": "queue2",
"vhost": "vhost2",
"vhost": "vhost2",
"message_stats": {
"ack": 2
}
}
],
"page": 1,
"page_count": 1,
"page_size": 500,
"total_count": 2
}
""")
};
return response;
};

var queues = (await rabbitMQQuery.GetPage(1, default)).Item1;
Approver.Verify(queues);
}

[Test]
public async Task Should_fetch_queue_details_in_old_format()
{
SendCallback = _ =>
{
var response = new HttpResponseMessage
{
Content = new StringContent("""
[
{
"name": "queue1",
"vhost": "vhost1",
"memory": 1024,
"message_stats": {
"ack": 1
}
},
{
"name": "queue2",
"vhost": "vhost2",
"message_stats": {
"ack": 2
}
},
{
"name": "queue3",
"vhost": "vhost1"
}
]
""")
};
return response;
};

var queues = (await rabbitMQQuery.GetPage(1, default)).Item1;
Approver.Verify(queues);
}

sealed class TestableRabbitMQQuery(
TimeProvider timeProvider,
TransportSettings transportSettings,
HttpClient customHttpClient)
: RabbitMQQuery(NullLogger<RabbitMQQuery>.Instance, timeProvider, transportSettings)
{
protected override HttpClient CreateHttpClient(NetworkCredential defaultCredential, string apiUrl) => customHttpClient;
}

sealed class FakeHttpHandler : HttpClientHandler
{
public Func<HttpRequestMessage, HttpResponseMessage> SendCallback { get; set; }

protected override HttpResponseMessage Send(HttpRequestMessage request, CancellationToken cancellationToken) => SendCallback(request);

protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) => Task.FromResult(SendCallback(request));
}
}
Loading