Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Address flaky integration tests #1742

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified .ci/windows/toxiproxy/toxiproxy-cli.exe
Binary file not shown.
Binary file modified .ci/windows/toxiproxy/toxiproxy-server.exe
Binary file not shown.
10 changes: 5 additions & 5 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,7 @@ jobs:
- name: Integration Tests
timeout-minutes: 25
run: |
$tx = Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; `
Start-Sleep -Seconds 1; `
Receive-Job -Job $tx; `
& "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; `
Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; `
dotnet test `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
Expand All @@ -82,7 +79,10 @@ jobs:
"${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1
- name: Maybe upload RabbitMQ logs
- name: Maybe collect Toxiproxy logs
if: failure()
run: Get-Job | Where-Object { $_.HasMoreData } | Receive-Job | Out-File -Append -LiteralPath $env:APPDATA\RabbitMQ\log\toxiproxy-log.txt
- name: Maybe upload RabbitMQ and Toxiproxy logs
if: failure()
uses: actions/upload-artifact@v4
with:
Expand Down
30 changes: 18 additions & 12 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,9 @@ public abstract class IntegrationFixture : IAsyncLifetime
public static readonly TimeSpan RecoveryInterval = TimeSpan.FromSeconds(2);
public static readonly TimeSpan TestTimeout = TimeSpan.FromSeconds(5);
public static readonly TimeSpan RequestedConnectionTimeout = TimeSpan.FromSeconds(1);
public static readonly Random S_Random;

static IntegrationFixture()
{

#if NET
S_Random = Random.Shared;
#else
S_Random = new Random();
#endif
s_isRunningInCI = InitIsRunningInCI();
s_isVerbose = InitIsVerbose();

Expand Down Expand Up @@ -450,12 +443,19 @@ protected async Task WithTemporaryChannelAsync(Func<IChannel, Task> action)

protected string GenerateExchangeName()
{
return $"{_testDisplayName}-exchange-{Guid.NewGuid()}";
return $"{_testDisplayName}-exchange-{Now}-{GenerateShortUuid()}";
}

protected string GenerateQueueName()
protected string GenerateQueueName(bool useGuid = false)
{
return $"{_testDisplayName}-queue-{Guid.NewGuid()}";
if (useGuid)
{
return $"{_testDisplayName}-queue-{Now}-{Guid.NewGuid()}";
}
else
{
return $"{_testDisplayName}-queue-{Now}-{GenerateShortUuid()}";
}
}

protected Task WithTemporaryNonExclusiveQueueAsync(Func<IChannel, string, Task> action)
Expand Down Expand Up @@ -540,7 +540,7 @@ protected ConnectionFactory CreateConnectionFactory(
{
return new ConnectionFactory
{
ClientProvidedName = $"{_testDisplayName}:{Util.Now}:{GetConnectionIdx()}",
ClientProvidedName = $"{_testDisplayName}:{Now}:{GetConnectionIdx()}",
ContinuationTimeout = WaitSpan,
HandshakeContinuationTimeout = WaitSpan,
ConsumerDispatchConcurrency = consumerDispatchConcurrency
Expand Down Expand Up @@ -631,10 +631,16 @@ protected static string GetUniqueString(ushort length)
protected static byte[] GetRandomBody(ushort size = 1024)
{
byte[] body = new byte[size];
S_Random.NextBytes(body);
Util.S_Random.NextBytes(body);
return body;
}

protected static string Now => Util.Now;

protected static string GenerateShortUuid() => Util.GenerateShortUuid();

protected static int RandomNext(int min, int max) => Util.S_Random.Next(min, max);

protected static Task WaitForRecoveryAsync(IConnection conn)
{
TaskCompletionSource<bool> tcs = PrepareForRecovery((AutorecoveringConnection)conn);
Expand Down
22 changes: 22 additions & 0 deletions projects/Test/Common/Util.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ namespace Test
{
public class Util : IDisposable
{
#if NET
private static readonly Random s_random = Random.Shared;
#else
[ThreadStatic]
private static Random s_random;
#endif

private readonly ITestOutputHelper _output;
private readonly ManagementClient _managementClient;
private static readonly bool s_isWindows = false;
Expand Down Expand Up @@ -41,6 +48,21 @@ public Util(ITestOutputHelper output, string managementUsername, string manageme
_managementClient = new ManagementClient(managementUri, managementUsername, managementPassword);
}

public static Random S_Random
{
get
{
#if NET
return s_random;
#else
s_random ??= new Random();
return s_random;
#endif
}
}

public static string GenerateShortUuid() => S_Random.Next().ToString("x");

public static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture);

public static bool IsWindows => s_isWindows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task TestQueueRecoveryWithManyQueues()
int n = 1024;
for (int i = 0; i < n; i++)
{
QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false);
QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(useGuid: true), false, false, false);
qs.Add(q.QueueName);
}
await CloseAndWaitForRecoveryAsync();
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ await _channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName,

private async Task ValidateConsumerDispatchConcurrency()
{
ushort expectedConsumerDispatchConcurrency = (ushort)S_Random.Next(3, 10);
ushort expectedConsumerDispatchConcurrency = (ushort)RandomNext(3, 10);
AutorecoveringChannel autorecoveringChannel = (AutorecoveringChannel)_channel;
Assert.Equal(ConsumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Assert.Equal(_consumerDispatchConcurrency, autorecoveringChannel.ConsumerDispatcher.Concurrency);
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/Integration/TestConcurrentAccessBase.cs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected async Task TestConcurrentOperationsAsync(Func<Task> action, int iterat
{
for (int j = 0; j < iterations; j++)
{
await Task.Delay(S_Random.Next(1, 10));
await Task.Delay(RandomNext(1, 10));
tasks.Add(action());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public async Task TestConsumerWorkServiceRecovery()
await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync();
await using (IChannel ch = await c.CreateChannelAsync())
{
string q = (await ch.QueueDeclareAsync("dotnet-client.recovery.consumer_work_pool1",
string q = (await ch.QueueDeclareAsync(GenerateQueueName(),
false, false, false)).QueueName;
var cons = new AsyncEventingBasicConsumer(ch);
await ch.BasicConsumeAsync(q, true, cons);
Expand Down Expand Up @@ -143,7 +143,7 @@ public async Task TestConsumerWorkServiceRecovery()
[Fact]
public async Task TestConsumerRecoveryOnClientNamedQueueWithOneRecovery()
{
const string q0 = "dotnet-client.recovery.queue1";
string q0 = GenerateQueueName();
// connection #1
await using AutorecoveringConnection c = await CreateAutorecoveringConnectionAsync();
await using (IChannel ch = await c.CreateChannelAsync())
Expand Down Expand Up @@ -364,8 +364,7 @@ public async Task TestTopologyRecoveryConsumerFilter()
[Fact]
public async Task TestRecoveryWithTopologyDisabled()
{
string queueName = GenerateQueueName() + "-dotnet-client.test.recovery.q2";

string queueName = GenerateQueueName();
await using AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryDisabledAsync();
await using (IChannel ch = await conn.CreateChannelAsync())
{
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Integration/TestExchangeDeclare.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async Task f()
{
try
{
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
string exchangeName = GenerateExchangeName();
await _channel.ExchangeDeclareAsync(exchange: exchangeName, type: "fanout", false, false);
await _channel.ExchangeBindAsync(destination: ex_destination, source: exchangeName, routingKey: "unused");
Expand All @@ -87,7 +87,7 @@ async Task f()
{
try
{
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
await _channel.ExchangeUnbindAsync(destination: ex_destination, source: exchangeName, routingKey: "unused",
noWait: false, arguments: null);
await _channel.ExchangeDeleteAsync(exchange: exchangeName, ifUnused: false);
Expand Down
5 changes: 4 additions & 1 deletion projects/Test/Integration/TestFloodPublishing.cs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ public async Task TestUnthrottledFloodPublishing()

Assert.True(_conn.IsOpen);
Assert.False(sawUnexpectedShutdown);
_output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed);
if (IsVerbose)
{
_output.WriteLine("[INFO] published {0} messages in {1}", publishCount, stopwatch.Elapsed);
}
}

[Fact]
Expand Down
8 changes: 4 additions & 4 deletions projects/Test/Integration/TestQueueDeclare.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async Task f()
{
// sleep for a random amount of time to increase the chances
// of thread interleaving. MK.
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
string queueName = GenerateQueueName();
QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: queueName,
durable: false, exclusive: true, autoDelete: false);
Expand Down Expand Up @@ -136,7 +136,7 @@ async Task f()
string qname = q;
try
{
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));

QueueDeclareOk r = await _channel.QueueDeclarePassiveAsync(qname);
Assert.Equal(qname, r.QueueName);
Expand Down Expand Up @@ -176,7 +176,7 @@ public async Task TestConcurrentQueueDeclare()
{
// sleep for a random amount of time to increase the chances
// of thread interleaving. MK.
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
string q = GenerateQueueName();
await _channel.QueueDeclareAsync(q, false, false, false);
queueNames.Add(q);
Expand All @@ -201,7 +201,7 @@ public async Task TestConcurrentQueueDeclare()
{
try
{
await Task.Delay(S_Random.Next(5, 50));
await Task.Delay(RandomNext(5, 50));
await _channel.QueueDeleteAsync(queueName);
}
catch (NotSupportedException e)
Expand Down
22 changes: 15 additions & 7 deletions projects/Test/Integration/TestToxiproxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,11 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()

Assert.True(await tcs.Task);

var timeoutToxic = new TimeoutToxic();
string toxicName = $"rmq-localhost-timeout-{Now}-{GenerateShortUuid()}";
var timeoutToxic = new TimeoutToxic
{
Name = toxicName
};
timeoutToxic.Attributes.Timeout = 0;
timeoutToxic.Toxicity = 1.0;

Expand Down Expand Up @@ -271,9 +275,11 @@ public async Task TestTcpReset_GH1464()

Assert.True(await channelCreatedTcs.Task);

const string toxicName = "rmq-localhost-reset_peer";
var resetPeerToxic = new ResetPeerToxic();
resetPeerToxic.Name = toxicName;
string toxicName = $"rmq-localhost-reset_peer-{Now}-{GenerateShortUuid()}";
var resetPeerToxic = new ResetPeerToxic
{
Name = toxicName
};
resetPeerToxic.Attributes.Timeout = 500;
resetPeerToxic.Toxicity = 1.0;

Expand Down Expand Up @@ -354,9 +360,11 @@ public async Task TestPublisherConfirmationThrottling()

await channelCreatedTcs.Task;

const string toxicName = "rmq-localhost-bandwidth";
var bandwidthToxic = new BandwidthToxic();
bandwidthToxic.Name = toxicName;
string toxicName = $"rmq-localhost-bandwidth-{Now}-{GenerateShortUuid()}";
var bandwidthToxic = new BandwidthToxic
{
Name = toxicName
};
bandwidthToxic.Attributes.Rate = 0;
bandwidthToxic.Toxicity = 1.0;
bandwidthToxic.Stream = ToxicDirection.DownStream;
Expand Down
31 changes: 28 additions & 3 deletions projects/Test/Integration/ToxiproxyManager.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
using System;
using System.Globalization;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Test;
using Toxiproxy.Net;
using Toxiproxy.Net.Toxics;

namespace Integration
{
public class ToxiproxyManager : IDisposable
{
private const string ProxyNamePrefix = "rmq-";
private const string ProxyNamePrefix = "rmq";
private const ushort ProxyPortStart = 55669;
private static int s_proxyPort = ProxyPortStart;

Expand Down Expand Up @@ -68,7 +70,8 @@ public ToxiproxyManager(string testDisplayName, bool isRunningInCI, bool isWindo

public async Task InitializeAsync()
{
_proxy.Name = $"{ProxyNamePrefix}{_testDisplayName}";
string proxyName = $"{ProxyNamePrefix}-{_testDisplayName}-{Util.Now}-{Util.GenerateShortUuid()}";
_proxy.Name = proxyName;

try
{
Expand All @@ -78,7 +81,29 @@ public async Task InitializeAsync()
{
}

await _proxyClient.AddAsync(_proxy);
ushort retryCount = 5;
do
{
try
{
await _proxyClient.AddAsync(_proxy);
return;
}
catch (Exception ex)
{
if (retryCount == 0)
{
throw;
}
else
{
string now = DateTime.Now.ToString("o", CultureInfo.InvariantCulture);
Console.Error.WriteLine("{0} [ERROR] error initializing proxy '{1}': {2}", now, proxyName, ex);
}
}
--retryCount;
await Task.Delay(TimeSpan.FromSeconds(1));
} while (retryCount >= 0);
}

public Task<T> AddToxicAsync<T>(T toxic) where T : ToxicBase
Expand Down
Loading