Skip to content

Commit

Permalink
Merge pull request #1742 from rabbitmq/lukebakken/minor-naming-change
Browse files Browse the repository at this point in the history
Address flaky integration tests
  • Loading branch information
lukebakken authored Dec 10, 2024
2 parents a8dc8ce + cf830aa commit 103c39e
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 41 deletions.
Binary file modified .ci/windows/toxiproxy/toxiproxy-cli.exe
Binary file not shown.
Binary file modified .ci/windows/toxiproxy/toxiproxy-server.exe
Binary file not shown.
10 changes: 5 additions & 5 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,7 @@ jobs:
- name: Integration Tests
timeout-minutes: 25
run: |
$tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; `
Start-Sleep -Seconds 1; `
Receive-Job -Job $tx; `
& "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; `
Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; `
dotnet test `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
Expand All @@ -82,7 +79,10 @@ jobs:
"${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1
- name: Maybe upload RabbitMQ logs
- name: Maybe collect Toxiproxy logs
if: failure()
run: Get-Job | Where-Object { $_.HasMoreData } | Receive-Job | Out-File -Append -LiteralPath $env:APPDATA\RabbitMQ\log\toxiproxy-log.txt
- name: Maybe upload RabbitMQ and Toxiproxy logs
if: failure()
uses: actions/upload-artifact@v4
with:
Expand Down
30 changes: 18 additions & 12 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,9 @@ public abstract class IntegrationFixture : IAsyncLifetime
public static readonly TimeSpan RecoveryInterval = TimeSpan.FromSeconds(2);
public static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
public static readonly TimeSpan RequestedConnectionTimeout = TimeSpan.FromSeconds(1);
public static readonly Random S_Random;

static IntegrationFixture()
{

#if NET
S_Random = Random.Shared;
#else
S_Random = new Random();
#endif
s_isRunningInCI = InitIsRunningInCI();
s_isVerbose = InitIsVerbose();

Expand Down Expand Up @@ -450,12 +443,19 @@ protected async Task WithTemporaryChannelAsync(Func<IChannel, Task> action)

protected string GenerateExchangeName()
{
return $"{_testDisplayName}-exchange-{Guid.NewGuid()}";
return $"{_testDisplayName}-exchange-{Now}-{GenerateShortUuid()}";
}

protected string GenerateQueueName()
protected string GenerateQueueName(bool useGuid = false)
{
return $"{_testDisplayName}-queue-{Guid.NewGuid()}";
if (useGuid)
{
return $"{_testDisplayName}-queue-{Now}-{Guid.NewGuid()}";
}
else
{
return $"{_testDisplayName}-queue-{Now}-{GenerateShortUuid()}";
}
}

protected Task WithTemporaryNonExclusiveQueueAsync(Func<IChannel, string, Task> action)
Expand Down Expand Up @@ -540,7 +540,7 @@ protected ConnectionFactory CreateConnectionFactory(
{
return new ConnectionFactory
{
ClientProvidedName = $"{_testDisplayName}:{Util.Now}:{GetConnectionIdx()}",
ClientProvidedName = $"{_testDisplayName}:{Now}:{GetConnectionIdx()}",
ContinuationTimeout = WaitSpan,
HandshakeContinuationTimeout = WaitSpan,
ConsumerDispatchConcurrency = consumerDispatchConcurrency
Expand Down Expand Up @@ -631,10 +631,16 @@ protected static string GetUniqueString(ushort length)
protected static byte[] GetRandomBody(ushort size = 1024)
{
byte[] body = new byte[size];
S_Random.NextBytes(body);
Util.S_Random.NextBytes(body);
return body;
}

protected static string Now => Util.Now;

protected static string GenerateShortUuid() => Util.GenerateShortUuid();

protected static int RandomNext(int min, int max) => Util.S_Random.Next(min, max);

protected static Task WaitForRecoveryAsync(IConnection conn)
{
TaskCompletionSource<bool> tcs = PrepareForRecovery((AutorecoveringConnection)conn);
Expand Down
22 changes: 22 additions & 0 deletions projects/Test/Common/Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ namespace Test
{
public class Util : IDisposable
{
#if NET
private static readonly Random s_random = Random.Shared;
#else
[ThreadStatic]
private static Random s_random;
#endif

private readonly ITestOutputHelper _output;
private readonly ManagementClient _managementClient;
private static readonly bool s_isWindows = false;
Expand Down Expand Up @@ -41,6 +48,21 @@ public Util(ITestOutputHelper output, string managementUsername, string manageme
_managementClient = new ManagementClient(managementUri, managementUsername, managementPassword);
}

public static Random S_Random
{
get
{
#if NET
return s_random;
#else
s_random ??= new Random();
return s_random;
#endif
}
}

public static string GenerateShortUuid() => S_Random.Next().ToString("x");

public static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture);

public static bool IsWindows => s_isWindows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task TestQueueRecoveryWithManyQueues()
int n = 1024;
for (int i = 0; i < n; i++)
{
QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false);
QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(useGuid: true), false, false, false);
qs.Add(q.QueueName);
}
await CloseAndWaitForRecoveryAsync();
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,

private async Task ValidateConsumerDispatchConcurrency()
{
ushort expectedConsumerDispatchConcurrency = (ushort)S_Random.Next(3, 10);
ushort expectedConsumerDispatchConcurrency = (ushort)RandomNext(3, 10);
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/Integration/TestConcurrentAccessBase.cs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected async Task TestConcurrentOperationsAsync(Func<Task> action, int iterat
{
for (int j = 0; j < iterations; j++)
{
await Task.Delay(S_Random.Next(1, 10));
await Task.Delay(RandomNext(1, 10));
tasks.Add(action());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public async Task TestConsumerWorkServiceRecovery()
await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync();
await using (IChannel ch = await c.CreateChannelAsync())
{
string q = (await ch.QueueDeclareAsync("dotnet-client.recovery.consumer_work_pool1",
string q = (await ch.QueueDeclareAsync(GenerateQueueName(),
false, false, false)).QueueName;
var cons = new AsyncEventingBasicConsumer(ch);
await ch.BasicConsumeAsync(q, true, cons);
Expand Down Expand Up @@ -143,7 +143,7 @@ public async Task TestConsumerWorkServiceRecovery()
[Fact]
public async Task TestConsumerRecoveryOnClientNamedQueueWithOneRecovery()
{
const string q0 = "dotnet-client.recovery.queue1";
string q0 = GenerateQueueName();
// connection #1
await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync();
await using (IChannel ch = await c.CreateChannelAsync())
Expand Down Expand Up @@ -364,8 +364,7 @@ public async Task TestTopologyRecoveryConsumerFilter()
[Fact]
public async Task TestRecoveryWithTopologyDisabled()
{
string queueName = GenerateQueueName() + "-dotnet-client.test.recovery.q2";

string queueName = GenerateQueueName();
await using AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryDisabledAsync();
await using (IChannel ch = await conn.CreateChannelAsync())
{
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Integration/TestExchangeDeclare.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async Task f()
{
try
{
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
string exchangeName = GenerateExchangeName();
await _channel.ExchangeDeclareAsync(exchange: exchangeName, type: "fanout", false, false);
await _channel.ExchangeBindAsync(destination: ex_destination, source: exchangeName, routingKey: "unused");
Expand All @@ -87,7 +87,7 @@ async Task f()
{
try
{
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
await _channel.ExchangeUnbindAsync(destination: ex_destination, source: exchangeName, routingKey: "unused",
noWait: false, arguments: null);
await _channel.ExchangeDeleteAsync(exchange: exchangeName, ifUnused: false);
Expand Down
5 changes: 4 additions & 1 deletion projects/Test/Integration/TestFloodPublishing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ public async Task TestUnthrottledFloodPublishing()

Assert.True(_conn.IsOpen);
Assert.False(sawUnexpectedShutdown);
_output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed);
if (IsVerbose)
{
_output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed);
}
}

[Fact]
Expand Down
8 changes: 4 additions & 4 deletions projects/Test/Integration/TestQueueDeclare.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async Task f()
{
// sleep for a random amount of time to increase the chances
// of thread interleaving. MK.
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
string queueName = GenerateQueueName();
QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: queueName,
durable: false, exclusive: true, autoDelete: false);
Expand Down Expand Up @@ -136,7 +136,7 @@ async Task f()
string qname = q;
try
{
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));

QueueDeclareOk r = await _channel.QueueDeclarePassiveAsync(qname);
Assert.Equal(qname, r.QueueName);
Expand Down Expand Up @@ -176,7 +176,7 @@ public async Task TestConcurrentQueueDeclare()
{
// sleep for a random amount of time to increase the chances
// of thread interleaving. MK.
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
string q = GenerateQueueName();
await _channel.QueueDeclareAsync(q, false, false, false);
queueNames.Add(q);
Expand All @@ -201,7 +201,7 @@ public async Task TestConcurrentQueueDeclare()
{
try
{
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
await _channel.QueueDeleteAsync(queueName);
}
catch (NotSupportedException e)
Expand Down
22 changes: 15 additions & 7 deletions projects/Test/Integration/TestToxiproxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,11 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()

Assert.True(await tcs.Task);

var timeoutToxic = new TimeoutToxic();
string toxicName = $"rmq-localhost-timeout-{Now}-{GenerateShortUuid()}";
var timeoutToxic = new TimeoutToxic
{
Name = toxicName
};
timeoutToxic.Attributes.Timeout = 0;
timeoutToxic.Toxicity = 1.0;

Expand Down Expand Up @@ -271,9 +275,11 @@ public async Task TestTcpReset_GH1464()

Assert.True(await channelCreatedTcs.Task);

const string toxicName = "rmq-localhost-reset_peer";
var resetPeerToxic = new ResetPeerToxic();
resetPeerToxic.Name = toxicName;
string toxicName = $"rmq-localhost-reset_peer-{Now}-{GenerateShortUuid()}";
var resetPeerToxic = new ResetPeerToxic
{
Name = toxicName
};
resetPeerToxic.Attributes.Timeout = 500;
resetPeerToxic.Toxicity = 1.0;

Expand Down Expand Up @@ -354,9 +360,11 @@ public async Task TestPublisherConfirmationThrottling()

await channelCreatedTcs.Task;

const string toxicName = "rmq-localhost-bandwidth";
var bandwidthToxic = new BandwidthToxic();
bandwidthToxic.Name = toxicName;
string toxicName = $"rmq-localhost-bandwidth-{Now}-{GenerateShortUuid()}";
var bandwidthToxic = new BandwidthToxic
{
Name = toxicName
};
bandwidthToxic.Attributes.Rate = 0;
bandwidthToxic.Toxicity = 1.0;
bandwidthToxic.Stream = ToxicDirection.DownStream;
Expand Down
31 changes: 28 additions & 3 deletions projects/Test/Integration/ToxiproxyManager.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
using System;
using System.Globalization;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Test;
using Toxiproxy.Net;
using Toxiproxy.Net.Toxics;

namespace Integration
{
public class ToxiproxyManager : IDisposable
{
private const string ProxyNamePrefix = "rmq-";
private const string ProxyNamePrefix = "rmq";
private const ushort ProxyPortStart = 55669;
private static int s_proxyPort = ProxyPortStart;

Expand Down Expand Up @@ -68,7 +70,8 @@ public ToxiproxyManager(string testDisplayName, bool isRunningInCI, bool isWindo

public async Task InitializeAsync()
{
_proxy.Name = $"{ProxyNamePrefix}{_testDisplayName}";
string proxyName = $"{ProxyNamePrefix}-{_testDisplayName}-{Util.Now}-{Util.GenerateShortUuid()}";
_proxy.Name = proxyName;

try
{
Expand All @@ -78,7 +81,29 @@ public async Task InitializeAsync()
{
}

await _proxyClient.AddAsync(_proxy);
ushort retryCount = 5;
do
{
try
{
await _proxyClient.AddAsync(_proxy);
return;
}
catch (Exception ex)
{
if (retryCount == 0)
{
throw;
}
else
{
string now = DateTime.Now.ToString("o", CultureInfo.InvariantCulture);
Console.Error.WriteLine("{0} [ERROR] error initializing proxy '{1}': {2}", now, proxyName, ex);
}
}
--retryCount;
await Task.Delay(TimeSpan.FromSeconds(1));
} while (retryCount >= 0);
}

public Task<T> AddToxicAsync<T>(T toxic) where T : ToxicBase
Expand Down

0 comments on commit 103c39e

Please sign in to comment.