Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Toxiproxy manager change #1744

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions .github/workflows/build-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,17 @@ jobs:
- name: Integration Tests
timeout-minutes: 25
run: |
Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; `
Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" | Out-File -LiteralPath $env:APPDATA\RabbitMQ\log\toxiproxy-log.txt }; `
dotnet test `
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' `
--environment 'PASSWORD=grapefruit' `
--environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" `
"${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
"${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'; `
Get-Job | Stop-Job -Verbose -PassThru | Remove-Job -Verbose
- name: Check for errors in RabbitMQ logs
run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1
- name: Maybe collect Toxiproxy logs
if: failure()
run: Get-Job | Where-Object { $_.HasMoreData } | Receive-Job | Out-File -Append -LiteralPath $env:APPDATA\RabbitMQ\log\toxiproxy-log.txt
- name: Maybe upload RabbitMQ and Toxiproxy logs
if: failure()
uses: actions/upload-artifact@v4
Expand Down
13 changes: 3 additions & 10 deletions projects/Test/Common/IntegrationFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -443,19 +443,12 @@ protected async Task WithTemporaryChannelAsync(Func<IChannel, Task> action)

protected string GenerateExchangeName()
{
return $"{_testDisplayName}-exchange-{Now}-{GenerateShortUuid()}";
return $"{_testDisplayName}-exchange-{Now}-{Guid.NewGuid()}";
}

protected string GenerateQueueName(bool useGuid = false)
protected string GenerateQueueName()
{
if (useGuid)
{
return $"{_testDisplayName}-queue-{Now}-{Guid.NewGuid()}";
}
else
{
return $"{_testDisplayName}-queue-{Now}-{GenerateShortUuid()}";
}
return $"{_testDisplayName}-queue-{Now}-{Guid.NewGuid()}";
}

protected Task WithTemporaryNonExclusiveQueueAsync(Func<IChannel, string, Task> action)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public async Task TestQueueRecoveryWithManyQueues()
int n = 1024;
for (int i = 0; i < n; i++)
{
QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(useGuid: true), false, false, false);
QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false);
qs.Add(q.QueueName);
}
await CloseAndWaitForRecoveryAsync();
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Integration/TestBasicPublish.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,11 @@ public async Task TestMaxInboundMessageBodySize()
using var cts = new CancellationTokenSource(WaitSpan);
using CancellationTokenRegistration ctr = cts.Token.Register(() => tcs.SetCanceled());

const ushort maxMsgSize = 8192;
const ushort maxMsgSize = 768;

int count = 0;
byte[] msg0 = _encoding.GetBytes("hi");
byte[] msg1 = GetRandomBody(maxMsgSize * 2);
byte[] msg1 = GetRandomBody(maxMsgSize * 20);

ConnectionFactory cf = CreateConnectionFactory();
cf.AutomaticRecoveryEnabled = false;
Expand Down
46 changes: 22 additions & 24 deletions projects/Test/Integration/TestToxiproxy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ namespace Test.Integration
public class TestToxiproxy : IntegrationFixture
{
private readonly TimeSpan _heartbeatTimeout = TimeSpan.FromSeconds(1);
private ToxiproxyManager _toxiproxyManager;
private int _proxyPort;

public TestToxiproxy(ITestOutputHelper output) : base(output)
{
Expand All @@ -59,7 +61,15 @@ public override Task InitializeAsync()
Assert.Null(_conn);
Assert.Null(_channel);

return Task.CompletedTask;
_toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
_proxyPort = _toxiproxyManager.ProxyPort;
return _toxiproxyManager.InitializeAsync();
}

public override async Task DisposeAsync()
{
await _toxiproxyManager.DisposeAsync();
await base.DisposeAsync();
}

[SkippableFact]
Expand All @@ -68,11 +78,8 @@ public async Task TestCloseConnection()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");

using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
await pm.InitializeAsync();

ConnectionFactory cf = CreateConnectionFactory();
cf.Port = pm.ProxyPort;
cf.Port = _proxyPort;
cf.AutomaticRecoveryEnabled = true;
cf.NetworkRecoveryInterval = TimeSpan.FromSeconds(1);
cf.RequestedHeartbeat = TimeSpan.FromSeconds(1);
Expand Down Expand Up @@ -172,11 +179,11 @@ async Task PublishLoop()

Assert.True(await messagePublishedTcs.Task);

Task disableProxyTask = pm.DisableAsync();
Task disableProxyTask = _toxiproxyManager.DisableAsync();

await Task.WhenAll(disableProxyTask, connectionShutdownTcs.Task);

Task enableProxyTask = pm.EnableAsync();
Task enableProxyTask = _toxiproxyManager.EnableAsync();

Task whenAllTask = Task.WhenAll(enableProxyTask, recoverySucceededTcs.Task);
await whenAllTask.WaitAsync(TimeSpan.FromSeconds(15));
Expand All @@ -193,11 +200,8 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");

using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
await pm.InitializeAsync();

ConnectionFactory cf = CreateConnectionFactory();
cf.Port = pm.ProxyPort;
cf.Port = _proxyPort;
cf.RequestedHeartbeat = _heartbeatTimeout;
cf.AutomaticRecoveryEnabled = false;

Expand Down Expand Up @@ -229,7 +233,7 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
timeoutToxic.Attributes.Timeout = 0;
timeoutToxic.Toxicity = 1.0;

Task<TimeoutToxic> addToxicTask = pm.AddToxicAsync(timeoutToxic);
Task<TimeoutToxic> addToxicTask = _toxiproxyManager.AddToxicAsync(timeoutToxic);

await Assert.ThrowsAsync<AlreadyClosedException>(() =>
{
Expand All @@ -243,11 +247,8 @@ public async Task TestTcpReset_GH1464()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");

using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
await pm.InitializeAsync();

ConnectionFactory cf = CreateConnectionFactory();
cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort);
cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort);
cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
cf.AutomaticRecoveryEnabled = true;

Expand Down Expand Up @@ -283,11 +284,11 @@ public async Task TestTcpReset_GH1464()
resetPeerToxic.Attributes.Timeout = 500;
resetPeerToxic.Toxicity = 1.0;

Task<ResetPeerToxic> addToxicTask = pm.AddToxicAsync(resetPeerToxic);
Task<ResetPeerToxic> addToxicTask = _toxiproxyManager.AddToxicAsync(resetPeerToxic);

await Task.WhenAll(addToxicTask, connectionShutdownTcs.Task);

await pm.RemoveToxicAsync(toxicName);
await _toxiproxyManager.RemoveToxicAsync(toxicName);

await recoveryTask;
}
Expand All @@ -302,11 +303,8 @@ public async Task TestPublisherConfirmationThrottling()
const int MaxOutstandingConfirms = 8;
const int BatchSize = MaxOutstandingConfirms * 2;

using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
await pm.InitializeAsync();

ConnectionFactory cf = CreateConnectionFactory();
cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort);
cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort);
cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
cf.AutomaticRecoveryEnabled = true;

Expand Down Expand Up @@ -371,7 +369,7 @@ public async Task TestPublisherConfirmationThrottling()

await Task.Delay(TimeSpan.FromSeconds(1));

Task<BandwidthToxic> addToxicTask = pm.AddToxicAsync(bandwidthToxic);
Task<BandwidthToxic> addToxicTask = _toxiproxyManager.AddToxicAsync(bandwidthToxic);

while (true)
{
Expand All @@ -387,7 +385,7 @@ public async Task TestPublisherConfirmationThrottling()
}

await addToxicTask.WaitAsync(WaitSpan);
await pm.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan);
await _toxiproxyManager.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan);

await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
await publishTask.WaitAsync(WaitSpan);
Expand Down
56 changes: 21 additions & 35 deletions projects/Test/Integration/ToxiproxyManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Integration
{
public class ToxiproxyManager : IDisposable
public class ToxiproxyManager : IAsyncDisposable
{
private const string ProxyNamePrefix = "rmq";
private const ushort ProxyPortStart = 55669;
Expand All @@ -21,7 +21,7 @@ public class ToxiproxyManager : IDisposable
private readonly Client _proxyClient;
private readonly Proxy _proxy;

private bool _disposedValue;
private bool _disposedValue = false;

public ToxiproxyManager(string testDisplayName, bool isRunningInCI, bool isWindows)
{
Expand All @@ -35,11 +35,11 @@ public ToxiproxyManager(string testDisplayName, bool isRunningInCI, bool isWindo
_proxyPort = Interlocked.Increment(ref s_proxyPort);

/*
string now = DateTime.UtcNow.ToString("o", System.Globalization.CultureInfo.InvariantCulture);
Console.WriteLine("{0} [DEBUG] {1} _proxyPort {2}", now, testDisplayName, _proxyPort);
*/

_proxyConnection = new Connection(resetAllToxicsAndProxiesOnClose: true);
* Note:
* Do NOT set resetAllToxicsAndProxiesOnClose to true, because it will
* clear proxies being used by parallel TFM test runs
*/
_proxyConnection = new Connection(resetAllToxicsAndProxiesOnClose: false);
_proxyClient = _proxyConnection.Client();

// to start, assume everything is on localhost
Expand Down Expand Up @@ -70,17 +70,9 @@ public ToxiproxyManager(string testDisplayName, bool isRunningInCI, bool isWindo

public async Task InitializeAsync()
{
string proxyName = $"{ProxyNamePrefix}-{_testDisplayName}-{Util.Now}-{Util.GenerateShortUuid()}";
string proxyName = $"{ProxyNamePrefix}-{_testDisplayName}-{Util.Now}-{Guid.NewGuid()}";
_proxy.Name = proxyName;

try
{
await _proxyClient.DeleteAsync(_proxy);
}
catch
{
}

ushort retryCount = 5;
do
{
Expand Down Expand Up @@ -128,30 +120,24 @@ public Task DisableAsync()
return _proxyClient.UpdateAsync(_proxy);
}

public void Dispose()
{
// Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
Dispose(disposing: true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
public async ValueTask DisposeAsync()
{
if (!_disposedValue)
{
if (disposing)
try
{
try
{
_proxyClient.DeleteAsync(_proxy).GetAwaiter().GetResult();
_proxyConnection.Dispose();
}
catch
{
}
await _proxyClient.DeleteAsync(_proxy);
_proxyConnection.Dispose();
}
catch (Exception ex)
{
string now = DateTime.Now.ToString("o", CultureInfo.InvariantCulture);
Console.Error.WriteLine("{0} [ERROR] error disposing proxy '{1}': {2}", now, _proxy.Name, ex);
}
finally
{
_disposedValue = true;
}

_disposedValue = true;
}
}
}
Expand Down
Loading