diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index a56593167..04b9b3abd 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -69,19 +69,17 @@ jobs: - name: Integration Tests timeout-minutes: 25 run: | - Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" }; ` + Start-Job -Verbose -ScriptBlock { & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-server.exe" | Out-File -LiteralPath $env:APPDATA\RabbitMQ\log\toxiproxy-log.txt }; ` dotnet test ` --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` --environment 'RABBITMQ_TOXIPROXY_TESTS=true' ` --environment 'PASSWORD=grapefruit' ` --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" ` - "${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' + "${{ github.workspace }}\projects\Test\Integration\Integration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'; ` + Get-Job | Stop-Job -Verbose -PassThru | Remove-Job -Verbose - name: Check for errors in RabbitMQ logs run: ${{ github.workspace }}\.ci\windows\gha-log-check.ps1 - - name: Maybe collect Toxiproxy logs - if: failure() - run: Get-Job | Where-Object { $_.HasMoreData } | Receive-Job | Out-File -Append -LiteralPath $env:APPDATA\RabbitMQ\log\toxiproxy-log.txt - name: Maybe upload RabbitMQ and Toxiproxy logs if: failure() uses: actions/upload-artifact@v4 diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index 3a0fdd5dc..8adfbdae9 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -443,19 +443,12 @@ protected async Task WithTemporaryChannelAsync(Func action) protected string GenerateExchangeName() { - return $"{_testDisplayName}-exchange-{Now}-{GenerateShortUuid()}"; + return $"{_testDisplayName}-exchange-{Now}-{Guid.NewGuid()}"; } - protected string GenerateQueueName(bool useGuid = false) + protected string GenerateQueueName() { - if (useGuid) - { - return $"{_testDisplayName}-queue-{Now}-{Guid.NewGuid()}"; - } - else - { - return $"{_testDisplayName}-queue-{Now}-{GenerateShortUuid()}"; - } + return $"{_testDisplayName}-queue-{Now}-{Guid.NewGuid()}"; } protected Task WithTemporaryNonExclusiveQueueAsync(Func action) diff --git a/projects/Test/Integration/ConnectionRecovery/TestQueueRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestQueueRecovery.cs index 5ba3447d3..2795836cc 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestQueueRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestQueueRecovery.cs @@ -52,7 +52,7 @@ public async Task TestQueueRecoveryWithManyQueues() int n = 1024; for (int i = 0; i < n; i++) { - QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(useGuid: true), false, false, false); + QueueDeclareOk q = await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false); qs.Add(q.QueueName); } await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 92b49aca6..139029bd3 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -177,11 +177,11 @@ public async Task TestMaxInboundMessageBodySize() using var cts = new CancellationTokenSource(WaitSpan); using CancellationTokenRegistration ctr = cts.Token.Register(() => tcs.SetCanceled()); - const ushort maxMsgSize = 8192; + const ushort maxMsgSize = 768; int count = 0; byte[] msg0 = _encoding.GetBytes("hi"); - byte[] msg1 = GetRandomBody(maxMsgSize * 2); + byte[] msg1 = GetRandomBody(maxMsgSize * 20); ConnectionFactory cf = CreateConnectionFactory(); cf.AutomaticRecoveryEnabled = false; diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index eec85628d..c94165444 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -46,6 +46,8 @@ namespace Test.Integration public class TestToxiproxy : IntegrationFixture { private readonly TimeSpan _heartbeatTimeout = TimeSpan.FromSeconds(1); + private ToxiproxyManager _toxiproxyManager; + private int _proxyPort; public TestToxiproxy(ITestOutputHelper output) : base(output) { @@ -59,7 +61,15 @@ public override Task InitializeAsync() Assert.Null(_conn); Assert.Null(_channel); - return Task.CompletedTask; + _toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); + _proxyPort = _toxiproxyManager.ProxyPort; + return _toxiproxyManager.InitializeAsync(); + } + + public override async Task DisposeAsync() + { + await _toxiproxyManager.DisposeAsync(); + await base.DisposeAsync(); } [SkippableFact] @@ -68,11 +78,8 @@ public async Task TestCloseConnection() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); - using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); - await pm.InitializeAsync(); - ConnectionFactory cf = CreateConnectionFactory(); - cf.Port = pm.ProxyPort; + cf.Port = _proxyPort; cf.AutomaticRecoveryEnabled = true; cf.NetworkRecoveryInterval = TimeSpan.FromSeconds(1); cf.RequestedHeartbeat = TimeSpan.FromSeconds(1); @@ -172,11 +179,11 @@ async Task PublishLoop() Assert.True(await messagePublishedTcs.Task); - Task disableProxyTask = pm.DisableAsync(); + Task disableProxyTask = _toxiproxyManager.DisableAsync(); await Task.WhenAll(disableProxyTask, connectionShutdownTcs.Task); - Task enableProxyTask = pm.EnableAsync(); + Task enableProxyTask = _toxiproxyManager.EnableAsync(); Task whenAllTask = Task.WhenAll(enableProxyTask, recoverySucceededTcs.Task); await whenAllTask.WaitAsync(TimeSpan.FromSeconds(15)); @@ -193,11 +200,8 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); - using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); - await pm.InitializeAsync(); - ConnectionFactory cf = CreateConnectionFactory(); - cf.Port = pm.ProxyPort; + cf.Port = _proxyPort; cf.RequestedHeartbeat = _heartbeatTimeout; cf.AutomaticRecoveryEnabled = false; @@ -229,7 +233,7 @@ public async Task TestThatStoppedSocketResultsInHeartbeatTimeout() timeoutToxic.Attributes.Timeout = 0; timeoutToxic.Toxicity = 1.0; - Task addToxicTask = pm.AddToxicAsync(timeoutToxic); + Task addToxicTask = _toxiproxyManager.AddToxicAsync(timeoutToxic); await Assert.ThrowsAsync(() => { @@ -243,11 +247,8 @@ public async Task TestTcpReset_GH1464() { Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test"); - using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); - await pm.InitializeAsync(); - ConnectionFactory cf = CreateConnectionFactory(); - cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort); + cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort); cf.RequestedHeartbeat = TimeSpan.FromSeconds(5); cf.AutomaticRecoveryEnabled = true; @@ -283,11 +284,11 @@ public async Task TestTcpReset_GH1464() resetPeerToxic.Attributes.Timeout = 500; resetPeerToxic.Toxicity = 1.0; - Task addToxicTask = pm.AddToxicAsync(resetPeerToxic); + Task addToxicTask = _toxiproxyManager.AddToxicAsync(resetPeerToxic); await Task.WhenAll(addToxicTask, connectionShutdownTcs.Task); - await pm.RemoveToxicAsync(toxicName); + await _toxiproxyManager.RemoveToxicAsync(toxicName); await recoveryTask; } @@ -302,11 +303,8 @@ public async Task TestPublisherConfirmationThrottling() const int MaxOutstandingConfirms = 8; const int BatchSize = MaxOutstandingConfirms * 2; - using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows); - await pm.InitializeAsync(); - ConnectionFactory cf = CreateConnectionFactory(); - cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort); + cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort); cf.RequestedHeartbeat = TimeSpan.FromSeconds(5); cf.AutomaticRecoveryEnabled = true; @@ -371,7 +369,7 @@ public async Task TestPublisherConfirmationThrottling() await Task.Delay(TimeSpan.FromSeconds(1)); - Task addToxicTask = pm.AddToxicAsync(bandwidthToxic); + Task addToxicTask = _toxiproxyManager.AddToxicAsync(bandwidthToxic); while (true) { @@ -387,7 +385,7 @@ public async Task TestPublisherConfirmationThrottling() } await addToxicTask.WaitAsync(WaitSpan); - await pm.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan); + await _toxiproxyManager.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan); await messagesPublishedTcs.Task.WaitAsync(WaitSpan); await publishTask.WaitAsync(WaitSpan); diff --git a/projects/Test/Integration/ToxiproxyManager.cs b/projects/Test/Integration/ToxiproxyManager.cs index 182433a51..21b81245b 100644 --- a/projects/Test/Integration/ToxiproxyManager.cs +++ b/projects/Test/Integration/ToxiproxyManager.cs @@ -9,7 +9,7 @@ namespace Integration { - public class ToxiproxyManager : IDisposable + public class ToxiproxyManager : IAsyncDisposable { private const string ProxyNamePrefix = "rmq"; private const ushort ProxyPortStart = 55669; @@ -21,7 +21,7 @@ public class ToxiproxyManager : IDisposable private readonly Client _proxyClient; private readonly Proxy _proxy; - private bool _disposedValue; + private bool _disposedValue = false; public ToxiproxyManager(string testDisplayName, bool isRunningInCI, bool isWindows) { @@ -35,11 +35,11 @@ public ToxiproxyManager(string testDisplayName, bool isRunningInCI, bool isWindo _proxyPort = Interlocked.Increment(ref s_proxyPort); /* - string now = DateTime.UtcNow.ToString("o", System.Globalization.CultureInfo.InvariantCulture); - Console.WriteLine("{0} [DEBUG] {1} _proxyPort {2}", now, testDisplayName, _proxyPort); - */ - - _proxyConnection = new Connection(resetAllToxicsAndProxiesOnClose: true); + * Note: + * Do NOT set resetAllToxicsAndProxiesOnClose to true, because it will + * clear proxies being used by parallel TFM test runs + */ + _proxyConnection = new Connection(resetAllToxicsAndProxiesOnClose: false); _proxyClient = _proxyConnection.Client(); // to start, assume everything is on localhost @@ -70,17 +70,9 @@ public ToxiproxyManager(string testDisplayName, bool isRunningInCI, bool isWindo public async Task InitializeAsync() { - string proxyName = $"{ProxyNamePrefix}-{_testDisplayName}-{Util.Now}-{Util.GenerateShortUuid()}"; + string proxyName = $"{ProxyNamePrefix}-{_testDisplayName}-{Util.Now}-{Guid.NewGuid()}"; _proxy.Name = proxyName; - try - { - await _proxyClient.DeleteAsync(_proxy); - } - catch - { - } - ushort retryCount = 5; do { @@ -128,30 +120,24 @@ public Task DisableAsync() return _proxyClient.UpdateAsync(_proxy); } - public void Dispose() - { - // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method - Dispose(disposing: true); - GC.SuppressFinalize(this); - } - - protected virtual void Dispose(bool disposing) + public async ValueTask DisposeAsync() { if (!_disposedValue) { - if (disposing) + try { - try - { - _proxyClient.DeleteAsync(_proxy).GetAwaiter().GetResult(); - _proxyConnection.Dispose(); - } - catch - { - } + await _proxyClient.DeleteAsync(_proxy); + _proxyConnection.Dispose(); + } + catch (Exception ex) + { + string now = DateTime.Now.ToString("o", CultureInfo.InvariantCulture); + Console.Error.WriteLine("{0} [ERROR] error disposing proxy '{1}': {2}", now, _proxy.Name, ex); + } + finally + { + _disposedValue = true; } - - _disposedValue = true; } } }