diff --git a/RabbitMQDotNetClient.sln b/RabbitMQDotNetClient.sln
index 4307f8500..deb687a47 100644
--- a/RabbitMQDotNetClient.sln
+++ b/RabbitMQDotNetClient.sln
@@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1647", "projects\Applica
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublisherConfirms", "projects\Applications\PublisherConfirms\PublisherConfirms.csproj", "{13149F73-2CDB-4ECF-BF2C-403860045751}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GH-1749", "projects\Applications\GH-1749\GH-1749.csproj", "{725D9986-ACD1-424E-AF4C-2BEB407D2BD9}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -108,6 +110,10 @@ Global
{13149F73-2CDB-4ECF-BF2C-403860045751}.Debug|Any CPU.Build.0 = Debug|Any CPU
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.ActiveCfg = Release|Any CPU
{13149F73-2CDB-4ECF-BF2C-403860045751}.Release|Any CPU.Build.0 = Release|Any CPU
+ {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {725D9986-ACD1-424E-AF4C-2BEB407D2BD9}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -123,6 +129,7 @@ Global
{AB5B7C53-D7EC-4985-A6DE-70178E4B688A} = {EFD4BED5-13A5-4D9C-AADF-CAB7E1573704}
{64ED07BF-4D77-47CD-AF4F-5B4525686FA1} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
{13149F73-2CDB-4ECF-BF2C-403860045751} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
+ {725D9986-ACD1-424E-AF4C-2BEB407D2BD9} = {D21B282C-49E6-4A30-887B-9626D94B8D69}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3C6A0C44-FA63-4101-BBF9-2598641167D1}
diff --git a/projects/Applications/GH-1647/GH-1647.csproj b/projects/Applications/GH-1647/GH-1647.csproj
index f2591d159..f08f12982 100644
--- a/projects/Applications/GH-1647/GH-1647.csproj
+++ b/projects/Applications/GH-1647/GH-1647.csproj
@@ -9,7 +9,7 @@
-
+
diff --git a/projects/Applications/GH-1749/GH-1749.csproj b/projects/Applications/GH-1749/GH-1749.csproj
new file mode 100644
index 000000000..9e44f6447
--- /dev/null
+++ b/projects/Applications/GH-1749/GH-1749.csproj
@@ -0,0 +1,19 @@
+
+
+
+ Exe
+ net8.0
+ GH_1749
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
diff --git a/projects/Applications/GH-1749/Program.cs b/projects/Applications/GH-1749/Program.cs
new file mode 100644
index 000000000..a1cdf9cbb
--- /dev/null
+++ b/projects/Applications/GH-1749/Program.cs
@@ -0,0 +1,216 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//---------------------------------------------------------------------------
+
+#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
+
+using System.Runtime.ExceptionServices;
+using RabbitMQ.Client;
+using RabbitMQ.Client.Events;
+
+namespace GH_1749
+{
+ class GH1749Consumer : AsyncDefaultBasicConsumer
+ {
+ public GH1749Consumer(IChannel channel) : base(channel)
+ {
+ }
+
+ protected override Task OnCancelAsync(string[] consumerTags, CancellationToken cancellationToken = default)
+ {
+ Console.WriteLine("{0} [INFO] OnCancelAsync, tags[0]: {1}", Util.Now, consumerTags[0]);
+ return base.OnCancelAsync(consumerTags, cancellationToken);
+ }
+ }
+
+ static class Program
+ {
+ const string DefaultHostName = "localhost";
+ const string ConnectionClientProvidedName = "GH_1749";
+ static readonly CancellationTokenSource s_cancellationTokenSource = new();
+ static readonly CancellationToken s_cancellationToken = s_cancellationTokenSource.Token;
+
+ static Util? s_util;
+
+ static async Task Main(string[] args)
+ {
+ string hostname = DefaultHostName;
+ if (args.Length > 0)
+ {
+ hostname = args[0];
+ }
+
+ s_util = new Util(hostname, "guest", "guest");
+
+ AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException;
+
+ ConnectionFactory connectionFactory = new()
+ {
+ HostName = hostname,
+ AutomaticRecoveryEnabled = true,
+ UserName = "guest",
+ Password = "guest",
+ ClientProvidedName = ConnectionClientProvidedName
+ };
+
+ var channelOptions = new CreateChannelOptions(publisherConfirmationsEnabled: true, publisherConfirmationTrackingEnabled: true);
+ await using var connection = await connectionFactory.CreateConnectionAsync();
+
+ connection.RecoverySucceededAsync += (object sender, AsyncEventArgs ea) =>
+ {
+ Console.WriteLine("{0} [INFO] saw RecoverySucceededAsync, event: {1}", Now, ea);
+ _ = CloseConnectionAsync();
+ return Task.CompletedTask;
+ };
+
+ connection.CallbackExceptionAsync += Connection_CallbackExceptionAsync;
+
+ connection.ConnectionBlockedAsync += Connection_ConnectionBlockedAsync;
+ connection.ConnectionUnblockedAsync += Connection_ConnectionUnblockedAsync;
+
+ connection.ConnectionRecoveryErrorAsync += Connection_ConnectionRecoveryErrorAsync;
+
+ connection.ConnectionShutdownAsync += (object sender, ShutdownEventArgs ea) =>
+ {
+ Console.WriteLine("{0} [INFO] saw ConnectionShutdownAsync, event: {1}", Now, ea);
+ return Task.CompletedTask;
+ };
+
+ connection.ConsumerTagChangeAfterRecoveryAsync += Connection_ConsumerTagChangeAfterRecoveryAsync;
+ connection.QueueNameChangedAfterRecoveryAsync += Connection_QueueNameChangedAfterRecoveryAsync;
+
+ connection.RecoveringConsumerAsync += Connection_RecoveringConsumerAsync;
+
+ await using var channel = await connection.CreateChannelAsync(options: channelOptions);
+
+ channel.CallbackExceptionAsync += Channel_CallbackExceptionAsync;
+ channel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
+
+ QueueDeclareOk queue = await channel.QueueDeclareAsync();
+
+ var consumer = new GH1749Consumer(channel);
+ await channel.BasicConsumeAsync(queue.QueueName, true, consumer);
+
+ _ = CloseConnectionAsync();
+
+ Console.WriteLine("{0} [INFO] consumer is running", Util.Now);
+ Console.ReadLine();
+ }
+
+ static async Task CloseConnectionAsync()
+ {
+ if (s_util is null)
+ {
+ throw new NullReferenceException("s_util");
+ }
+
+ try
+ {
+ Console.WriteLine("{0} [INFO] start closing connection: {1}", Now, ConnectionClientProvidedName);
+ await s_util.CloseConnectionAsync(ConnectionClientProvidedName);
+ Console.WriteLine("{0} [INFO] done closing connection: {1}", Now, ConnectionClientProvidedName);
+ }
+ catch (Exception ex)
+ {
+ Console.Error.WriteLine("{0} [ERROR] error while closing connection: {1}", Now, ex);
+ }
+ }
+
+ private static string Now => Util.Now;
+
+ private static Task Channel_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea)
+ {
+ Console.WriteLine("{0} [INFO] channel saw CallbackExceptionAsync, event: {1}", Now, ea);
+ Console.WriteLine("{0} [INFO] channel CallbackExceptionAsync, exception: {1}", Now, ea.Exception);
+ return Task.CompletedTask;
+ }
+
+ private static Task Channel_ChannelShutdownAsync(object sender, ShutdownEventArgs ea)
+ {
+ Console.WriteLine("{0} [INFO] saw ChannelShutdownAsync, event: {1}", Now, ea);
+ return Task.CompletedTask;
+ }
+
+ private static void CurrentDomain_FirstChanceException(object? sender, FirstChanceExceptionEventArgs e)
+ {
+ if (e.Exception is ObjectDisposedException)
+ {
+ Console.WriteLine("{0} [INFO] saw FirstChanceException, exception: {1}", Now, e.Exception);
+ }
+ }
+
+ private static Task Connection_CallbackExceptionAsync(object sender, CallbackExceptionEventArgs ea)
+ {
+ Console.WriteLine("{0} [INFO] connection saw CallbackExceptionAsync, event: {1}", Now, ea);
+ Console.WriteLine("{0} [INFO] connection CallbackExceptionAsync, exception: {1}", Now, ea.Exception);
+ return Task.CompletedTask;
+ }
+
+ private static Task Connection_ConnectionBlockedAsync(object sender, ConnectionBlockedEventArgs ea)
+ {
+ Console.WriteLine("{0} [INFO] saw ConnectionBlockedAsync, event: {1}", Now, ea);
+ return Task.CompletedTask;
+ }
+
+ private static Task Connection_ConnectionUnblockedAsync(object sender, AsyncEventArgs ea)
+ {
+ Console.WriteLine("{0} [INFO] saw ConnectionUnlockedAsync, event: {1}", Now, ea);
+ return Task.CompletedTask;
+ }
+
+ private static Task Connection_ConnectionRecoveryErrorAsync(object sender, ConnectionRecoveryErrorEventArgs ea)
+ {
+ Console.WriteLine("{0} [INFO] saw ConnectionRecoveryErrorAsync, event: {1}", Now, ea);
+ Console.WriteLine("{0} [INFO] ConnectionRecoveryErrorAsync, exception: {1}", Now, ea.Exception);
+ return Task.CompletedTask;
+ }
+
+ private static Task Connection_ConsumerTagChangeAfterRecoveryAsync(object sender, ConsumerTagChangedAfterRecoveryEventArgs ea)
+ {
+ Console.WriteLine("{0} [INFO] saw ConsumerTagChangeAfterRecoveryAsync, event: {1}", Now, ea);
+ Console.WriteLine("{0} [INFO] ConsumerTagChangeAfterRecoveryAsync, tags: {1} {2}", Now, ea.TagBefore, ea.TagAfter);
+ return Task.CompletedTask;
+ }
+
+ private static Task Connection_QueueNameChangedAfterRecoveryAsync(object sender, QueueNameChangedAfterRecoveryEventArgs ea)
+ {
+ Console.WriteLine("{0} [INFO] saw QueueNameChangedAfterRecoveryAsync, event: {1}", Now, ea);
+ Console.WriteLine("{0} [INFO] QueueNameChangedAfterRecoveryAsync, queue names: {1} {2}", Now, ea.NameBefore, ea.NameAfter);
+ return Task.CompletedTask;
+ }
+
+ private static Task Connection_RecoveringConsumerAsync(object sender, RecoveringConsumerEventArgs ea)
+ {
+ Console.WriteLine("{0} [INFO] saw RecoveringConsumerAsync, event: {1}, tag: {2}", Now, ea, ea.ConsumerTag);
+ return Task.CompletedTask;
+ }
+ }
+}
+
diff --git a/projects/Applications/GH-1749/Util.cs b/projects/Applications/GH-1749/Util.cs
new file mode 100644
index 000000000..1cc2debda
--- /dev/null
+++ b/projects/Applications/GH-1749/Util.cs
@@ -0,0 +1,140 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+//
+// The APL v2.0:
+//
+//---------------------------------------------------------------------------
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//---------------------------------------------------------------------------
+//
+// The MPL v2.0:
+//
+//---------------------------------------------------------------------------
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this
+// file, You can obtain one at https://mozilla.org/MPL/2.0/.
+//
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
+//---------------------------------------------------------------------------
+
+#pragma warning disable CA2007 // Consider calling ConfigureAwait on the awaited task
+
+using System.Globalization;
+using EasyNetQ.Management.Client;
+
+namespace GH_1749
+{
+ public class Util : IDisposable
+ {
+ private static readonly Random s_random = Random.Shared;
+ private readonly ManagementClient _managementClient;
+
+ public Util() : this("localhost", "guest", "guest")
+ {
+ }
+
+ public Util(string hostname, string managementUsername, string managementPassword)
+ {
+ if (string.IsNullOrEmpty(managementUsername))
+ {
+ managementUsername = "guest";
+ }
+
+ if (string.IsNullOrEmpty(managementPassword))
+ {
+ throw new ArgumentNullException(nameof(managementPassword));
+ }
+
+ var managementUri = new Uri($"http://{hostname}:15672");
+ _managementClient = new ManagementClient(managementUri, managementUsername, managementPassword);
+ }
+
+ public static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture);
+
+ public static Random S_Random
+ {
+ get
+ {
+ return s_random;
+ }
+ }
+
+ public async Task CloseConnectionAsync(string connectionClientProvidedName)
+ {
+ ushort tries = 1;
+ EasyNetQ.Management.Client.Model.Connection? connectionToClose = null;
+ do
+ {
+ IReadOnlyList connections;
+ try
+ {
+ do
+ {
+ ushort delayMilliseconds = (ushort)(tries * 2 * 100);
+ if (delayMilliseconds > 1000)
+ {
+ delayMilliseconds = 1000;
+ }
+
+ await Task.Delay(TimeSpan.FromMilliseconds(delayMilliseconds));
+
+ connections = await _managementClient.GetConnectionsAsync();
+ } while (connections.Count == 0);
+
+ connectionToClose = connections.Where(c0 =>
+ {
+ if (c0.ClientProperties.ContainsKey("connection_name"))
+ {
+ object? maybeConnectionName = c0.ClientProperties["connection_name"];
+ if (maybeConnectionName is string connectionNameStr)
+ {
+ return string.Equals(connectionNameStr, connectionClientProvidedName, StringComparison.InvariantCultureIgnoreCase);
+ }
+ }
+
+ return false;
+ }).FirstOrDefault();
+ }
+ catch (ArgumentNullException)
+ {
+ // Sometimes we see this in GitHub CI
+ tries++;
+ continue;
+ }
+
+ if (connectionToClose != null)
+ {
+ try
+ {
+ await _managementClient.CloseConnectionAsync(connectionToClose);
+ return;
+ }
+ catch (UnexpectedHttpStatusCodeException)
+ {
+ tries++;
+ }
+ }
+ } while (tries <= 10);
+
+ if (connectionToClose == null)
+ {
+ throw new InvalidOperationException(
+ $"{Now} [ERROR] could not find/delete connection: '{connectionClientProvidedName}'");
+ }
+ }
+
+ public void Dispose() => _managementClient.Dispose();
+ }
+}
diff --git a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs
index 92c273957..76ceaae20 100644
--- a/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs
+++ b/projects/RabbitMQ.Client/ConsumerDispatching/ConsumerDispatcherChannelBase.cs
@@ -45,7 +45,7 @@ internal abstract class ConsumerDispatcherChannelBase : ConsumerDispatcherBase,
private readonly Task _worker;
private readonly ushort _concurrency;
private bool _quiesce = false;
- private bool _disposed;
+ private bool _disposedValue;
internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
{
@@ -85,7 +85,7 @@ internal ConsumerDispatcherChannelBase(Impl.Channel channel, ushort concurrency)
public ValueTask HandleBasicConsumeOkAsync(IAsyncBasicConsumer consumer, string consumerTag, CancellationToken cancellationToken)
{
- if (false == _disposed && false == _quiesce)
+ if (false == _disposedValue && false == _quiesce)
{
AddConsumer(consumer, consumerTag);
WorkStruct work = WorkStruct.CreateConsumeOk(consumer, consumerTag);
@@ -101,7 +101,7 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag,
string exchange, string routingKey, IReadOnlyBasicProperties basicProperties, RentedMemory body,
CancellationToken cancellationToken)
{
- if (false == _disposed && false == _quiesce)
+ if (false == _disposedValue && false == _quiesce)
{
IAsyncBasicConsumer consumer = GetConsumerOrDefault(consumerTag);
var work = WorkStruct.CreateDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body);
@@ -115,7 +115,7 @@ public ValueTask HandleBasicDeliverAsync(string consumerTag, ulong deliveryTag,
public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken cancellationToken)
{
- if (false == _disposed && false == _quiesce)
+ if (false == _disposedValue && false == _quiesce)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancelOk(consumer, consumerTag);
@@ -129,7 +129,7 @@ public ValueTask HandleBasicCancelOkAsync(string consumerTag, CancellationToken
public ValueTask HandleBasicCancelAsync(string consumerTag, CancellationToken cancellationToken)
{
- if (false == _disposed && false == _quiesce)
+ if (false == _disposedValue && false == _quiesce)
{
IAsyncBasicConsumer consumer = GetAndRemoveConsumer(consumerTag);
WorkStruct work = WorkStruct.CreateCancel(consumer, consumerTag);
@@ -148,7 +148,7 @@ public void Quiesce()
public async Task WaitForShutdownAsync()
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -202,6 +202,15 @@ protected override Task InternalShutdownAsync()
protected abstract Task ProcessChannelAsync();
+ protected enum WorkType : byte
+ {
+ Shutdown,
+ Cancel,
+ CancelOk,
+ Deliver,
+ ConsumeOk
+ }
+
protected readonly struct WorkStruct : IDisposable
{
public readonly IAsyncBasicConsumer Consumer;
@@ -276,42 +285,35 @@ public static WorkStruct CreateDeliver(IAsyncBasicConsumer consumer, string cons
public void Dispose() => Body.Dispose();
}
- protected enum WorkType : byte
+ public void Dispose()
{
- Shutdown,
- Cancel,
- CancelOk,
- Deliver,
- ConsumeOk
+ // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
+ Dispose(disposing: true);
+ GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
- if (!_disposed)
+ if (_disposedValue)
{
- try
- {
- if (disposing)
- {
- Quiesce();
- }
- }
- catch
- {
- // CHOMP
- }
- finally
+ return;
+ }
+
+ try
+ {
+ if (disposing)
{
- _disposed = true;
+ Quiesce();
}
}
- }
-
- public void Dispose()
- {
- // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
- Dispose(disposing: true);
- GC.SuppressFinalize(this);
+ catch
+ {
+ // CHOMP
+ }
+ finally
+ {
+ _disposedValue = true;
+ }
}
}
}
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
index d98734f19..92f2fd781 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
@@ -47,7 +47,10 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private AutorecoveringConnection _connection;
private RecoveryAwareChannel _innerChannel;
- private bool _disposed;
+
+ private bool _disposedValue;
+ private bool _isDisposing;
+ private readonly object _isDisposingLock = new();
private ushort _prefetchCountConsumer;
private ushort _prefetchCountGlobal;
@@ -143,7 +146,7 @@ public IAsyncBasicConsumer? DefaultConsumer
public bool IsClosed => !IsOpen;
- public bool IsOpen => !_disposed && _innerChannel.IsOpen;
+ public bool IsOpen => !_disposedValue && _innerChannel.IsOpen;
public string? CurrentQueue => InnerChannel.CurrentQueue;
@@ -155,7 +158,7 @@ internal async Task AutomaticallyRecoverAsync(AutorecoveringConnection con
throw new InvalidOperationException("recordedEntitiesSemaphore must be held");
}
- if (_disposed)
+ if (_disposedValue)
{
return false;
}
@@ -192,7 +195,7 @@ await newChannel.TxSelectAsync(cancellationToken)
* with the resulting basic.ack never getting sent out.
*/
- if (_disposed)
+ if (_disposedValue)
{
await newChannel.AbortAsync(CancellationToken.None)
.ConfigureAwait(false);
@@ -252,23 +255,47 @@ await _connection.DeleteRecordedChannelAsync(this,
public override string ToString()
=> InnerChannel.ToString();
- public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
+ public void Dispose()
+ {
+ if (_disposedValue)
+ {
+ return;
+ }
+
+ DisposeAsync().AsTask().GetAwaiter().GetResult();
+ }
public async ValueTask DisposeAsync()
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
- if (IsOpen)
+ lock (_isDisposingLock)
{
- await this.AbortAsync()
- .ConfigureAwait(false);
+ if (_isDisposing)
+ {
+ return;
+ }
+ _isDisposing = true;
}
- _recordedConsumerTags.Clear();
- _disposed = true;
+ try
+ {
+ if (IsOpen)
+ {
+ await this.AbortAsync()
+ .ConfigureAwait(false);
+ }
+
+ _recordedConsumerTags.Clear();
+ }
+ finally
+ {
+ _disposedValue = true;
+ _isDisposing = false;
+ }
}
public ValueTask GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);
@@ -477,7 +504,7 @@ public Task TxSelectAsync(CancellationToken cancellationToken)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ThrowIfDisposed()
{
- if (_disposed)
+ if (_disposedValue)
{
ThrowDisposed();
}
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs
index ef8f21c35..c8063e949 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recording.cs
@@ -53,7 +53,7 @@ internal sealed partial class AutorecoveringConnection
internal async ValueTask RecordExchangeAsync(RecordedExchange exchange,
bool recordedEntitiesSemaphoreHeld)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -85,7 +85,7 @@ private void DoRecordExchange(in RecordedExchange exchange)
internal async ValueTask DeleteRecordedExchangeAsync(string exchangeName,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -133,7 +133,7 @@ await DeleteAutoDeleteExchangeAsync(binding.Source,
internal async ValueTask DeleteAutoDeleteExchangeAsync(string exchangeName,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -187,7 +187,7 @@ bool AnyBindingsOnExchange(string exchange)
internal async ValueTask RecordQueueAsync(RecordedQueue queue,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -219,7 +219,7 @@ private void DoRecordQueue(RecordedQueue queue)
internal async ValueTask DeleteRecordedQueueAsync(string queueName,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -268,7 +268,7 @@ await DeleteAutoDeleteExchangeAsync(binding.Source,
internal async ValueTask RecordBindingAsync(RecordedBinding binding,
bool recordedEntitiesSemaphoreHeld)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -300,7 +300,7 @@ private void DoRecordBinding(in RecordedBinding binding)
internal async ValueTask DeleteRecordedBindingAsync(RecordedBinding rb,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -332,7 +332,7 @@ private void DoDeleteRecordedBinding(in RecordedBinding rb)
internal async ValueTask RecordConsumerAsync(RecordedConsumer consumer,
bool recordedEntitiesSemaphoreHeld)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -369,7 +369,7 @@ private void DoRecordConsumer(in RecordedConsumer consumer)
internal async ValueTask DeleteRecordedConsumerAsync(string consumerTag,
bool recordedEntitiesSemaphoreHeld)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -466,7 +466,7 @@ private void DoAddRecordedChannel(AutorecoveringChannel channel)
internal async Task DeleteRecordedChannelAsync(AutorecoveringChannel channel,
bool channelsSemaphoreHeld, bool recordedEntitiesSemaphoreHeld)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs
index 7eaed0479..da1eb2b61 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs
@@ -285,7 +285,7 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
private async ValueTask RecoverExchangesAsync(IConnection connection,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -337,7 +337,7 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
private async Task RecoverQueuesAsync(IConnection connection,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -451,7 +451,7 @@ void UpdateConsumerQueue(string oldName, string newName)
private async ValueTask RecoverBindingsAsync(IConnection connection,
bool recordedEntitiesSemaphoreHeld, CancellationToken cancellationToken)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -503,7 +503,7 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
internal async ValueTask RecoverConsumersAsync(AutorecoveringChannel channelToRecover, IChannel channelToUse,
bool recordedEntitiesSemaphoreHeld = false, CancellationToken cancellationToken = default)
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
diff --git a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
index f38de44a1..37eab3613 100644
--- a/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
+++ b/projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
@@ -49,7 +49,7 @@ internal sealed partial class AutorecoveringConnection : IConnection
private readonly IEndpointResolver _endpoints;
private Connection _innerConnection;
- private bool _disposed;
+ private bool _disposedValue;
private Connection InnerConnection
{
@@ -268,11 +268,19 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca
return autorecoveringChannel;
}
- public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
+ public void Dispose()
+ {
+ if (_disposedValue)
+ {
+ return;
+ }
+
+ DisposeAsync().AsTask().GetAwaiter().GetResult();
+ }
public async ValueTask DisposeAsync()
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
@@ -281,6 +289,11 @@ public async ValueTask DisposeAsync()
{
await _innerConnection.DisposeAsync()
.ConfigureAwait(false);
+
+ _channels.Clear();
+ _recordedEntitiesSemaphore.Dispose();
+ _channelsSemaphore.Dispose();
+ _recoveryCancellationTokenSource.Dispose();
}
catch (OperationInterruptedException)
{
@@ -288,11 +301,7 @@ await _innerConnection.DisposeAsync()
}
finally
{
- _channels.Clear();
- _recordedEntitiesSemaphore.Dispose();
- _channelsSemaphore.Dispose();
- _recoveryCancellationTokenSource.Dispose();
- _disposed = true;
+ _disposedValue = true;
}
}
@@ -302,7 +311,7 @@ private void EnsureIsOpen()
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ThrowIfDisposed()
{
- if (_disposed)
+ if (_disposedValue)
{
ThrowDisposed();
}
diff --git a/projects/RabbitMQ.Client/Impl/Channel.cs b/projects/RabbitMQ.Client/Impl/Channel.cs
index 4e73a9a08..d5300b9c4 100644
--- a/projects/RabbitMQ.Client/Impl/Channel.cs
+++ b/projects/RabbitMQ.Client/Impl/Channel.cs
@@ -61,6 +61,10 @@ internal partial class Channel : IChannel, IRecoverable
internal readonly IConsumerDispatcher ConsumerDispatcher;
+ private bool _disposedValue;
+ private bool _isDisposing;
+ private readonly object _isDisposingLock = new();
+
public Channel(ISession session, CreateChannelOptions createChannelOptions)
{
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
@@ -514,47 +518,106 @@ public override string ToString()
void IDisposable.Dispose()
{
+ if (_disposedValue)
+ {
+ return;
+ }
+
Dispose(true);
}
+ public async ValueTask DisposeAsync()
+ {
+ if (_disposedValue)
+ {
+ return;
+ }
+
+ await DisposeAsyncCore(true)
+ .ConfigureAwait(false);
+
+ Dispose(false);
+ }
+
protected virtual void Dispose(bool disposing)
{
+ if (_disposedValue)
+ {
+ return;
+ }
+
if (disposing)
{
- if (IsOpen)
+ lock (_isDisposingLock)
{
- this.AbortAsync().GetAwaiter().GetResult();
+ if (_isDisposing)
+ {
+ return;
+ }
+ _isDisposing = true;
}
- ConsumerDispatcher.Dispose();
- _rpcSemaphore.Dispose();
- _confirmSemaphore.Dispose();
- _outstandingPublisherConfirmationsRateLimiter?.Dispose();
- }
- }
-
- public async ValueTask DisposeAsync()
- {
- await DisposeAsyncCore()
- .ConfigureAwait(false);
+ try
+ {
+ if (IsOpen)
+ {
+ this.AbortAsync().GetAwaiter().GetResult();
+ }
- Dispose(false);
+ ConsumerDispatcher.Dispose();
+ _rpcSemaphore.Dispose();
+ _confirmSemaphore.Dispose();
+ _outstandingPublisherConfirmationsRateLimiter?.Dispose();
+ }
+ finally
+ {
+ _disposedValue = true;
+ _isDisposing = false;
+ }
+ }
}
- protected virtual async ValueTask DisposeAsyncCore()
+ protected virtual async ValueTask DisposeAsyncCore(bool disposing)
{
- if (IsOpen)
+ if (_disposedValue)
{
- await this.AbortAsync().ConfigureAwait(false);
+ return;
}
- ConsumerDispatcher.Dispose();
- _rpcSemaphore.Dispose();
- _confirmSemaphore.Dispose();
- if (_outstandingPublisherConfirmationsRateLimiter is not null)
+ if (disposing)
{
- await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
- .ConfigureAwait(false);
+ lock (_isDisposingLock)
+ {
+ if (_isDisposing)
+ {
+ return;
+ }
+ _isDisposing = true;
+ }
+
+ try
+ {
+ if (IsOpen)
+ {
+ await this.AbortAsync()
+ .ConfigureAwait(false);
+ }
+
+ ConsumerDispatcher.Dispose();
+ _rpcSemaphore.Dispose();
+ _confirmSemaphore.Dispose();
+
+ if (_outstandingPublisherConfirmationsRateLimiter is not null)
+ {
+ await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
+ .ConfigureAwait(false);
+ }
+ }
+ finally
+ {
+ _disposedValue = true;
+ _isDisposing = false;
+ }
}
}
diff --git a/projects/RabbitMQ.Client/Impl/Connection.cs b/projects/RabbitMQ.Client/Impl/Connection.cs
index af26bb7ac..3272bf1f8 100644
--- a/projects/RabbitMQ.Client/Impl/Connection.cs
+++ b/projects/RabbitMQ.Client/Impl/Connection.cs
@@ -45,7 +45,10 @@ namespace RabbitMQ.Client.Framing
{
internal sealed partial class Connection : IConnection
{
- private bool _disposed;
+ private bool _disposedValue;
+ private bool _isDisposing;
+ private readonly object _isDisposingLock = new();
+
private volatile bool _closed;
private readonly ConnectionConfig _config;
@@ -485,15 +488,32 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio
return _frameHandler.WriteAsync(frames, cancellationToken);
}
- public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
+ public void Dispose()
+ {
+ if (_disposedValue)
+ {
+ return;
+ }
+
+ DisposeAsync().AsTask().GetAwaiter().GetResult();
+ }
public async ValueTask DisposeAsync()
{
- if (_disposed)
+ if (_disposedValue)
{
return;
}
+ lock (_isDisposingLock)
+ {
+ if (_isDisposing)
+ {
+ return;
+ }
+ _isDisposing = true;
+ }
+
try
{
if (IsOpen)
@@ -514,14 +534,15 @@ await _channel0.DisposeAsync()
}
finally
{
- _disposed = true;
+ _disposedValue = true;
+ _isDisposing = false;
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ThrowIfDisposed()
{
- if (_disposed)
+ if (_disposedValue)
{
ThrowObjectDisposedException();
}
diff --git a/projects/Test/Integration/TestChannelShutdown.cs b/projects/Test/Integration/TestChannelShutdown.cs
index 37af0ec19..b62f8b058 100644
--- a/projects/Test/Integration/TestChannelShutdown.cs
+++ b/projects/Test/Integration/TestChannelShutdown.cs
@@ -30,6 +30,8 @@
//---------------------------------------------------------------------------
using System;
+using System.Collections.Generic;
+using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Impl;
@@ -61,5 +63,52 @@ public async Task TestConsumerDispatcherShutdown()
await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown");
Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync");
}
+
+ [Fact]
+ public async Task TestConcurrentDisposeAsync_GH1749()
+ {
+ bool sawCallbackException = false;
+ int channelShutdownCount = 0;
+
+ _channel.CallbackExceptionAsync += (channel, ea) =>
+ {
+ sawCallbackException = true;
+ return Task.CompletedTask;
+ };
+
+ _channel.ChannelShutdownAsync += (channel, args) =>
+ {
+ Interlocked.Increment(ref channelShutdownCount);
+ return Task.CompletedTask;
+ };
+
+ var disposeTasks = new List
+ {
+ _channel.DisposeAsync(),
+ _channel.DisposeAsync(),
+ _channel.DisposeAsync()
+ };
+
+ foreach (ValueTask vt in disposeTasks)
+ {
+ await vt;
+ }
+
+ Assert.Equal(1, channelShutdownCount);
+ Assert.False(sawCallbackException);
+
+ disposeTasks.Clear();
+ disposeTasks.Add(_conn.DisposeAsync());
+ disposeTasks.Add(_conn.DisposeAsync());
+ disposeTasks.Add(_conn.DisposeAsync());
+
+ foreach (ValueTask vt in disposeTasks)
+ {
+ await vt;
+ }
+
+ _channel = null;
+ _conn = null;
+ }
}
}
diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs
index 1629dc69b..507ac12e5 100644
--- a/projects/Test/Integration/TestToxiproxy.cs
+++ b/projects/Test/Integration/TestToxiproxy.cs
@@ -41,12 +41,14 @@
using Xunit;
using Xunit.Abstractions;
+#nullable enable
+
namespace Test.Integration
{
public class TestToxiproxy : IntegrationFixture
{
private readonly TimeSpan _heartbeatTimeout = TimeSpan.FromSeconds(1);
- private ToxiproxyManager _toxiproxyManager;
+ private ToxiproxyManager? _toxiproxyManager;
private int _proxyPort;
public TestToxiproxy(ITestOutputHelper output) : base(output)
@@ -61,14 +63,24 @@ public override Task InitializeAsync()
Assert.Null(_conn);
Assert.Null(_channel);
- _toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
- _proxyPort = ToxiproxyManager.ProxyPort;
- return _toxiproxyManager.InitializeAsync();
+ if (AreToxiproxyTestsEnabled)
+ {
+ _toxiproxyManager = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
+ _proxyPort = ToxiproxyManager.ProxyPort;
+ return _toxiproxyManager.InitializeAsync();
+ }
+ else
+ {
+ return Task.CompletedTask;
+ }
}
public override async Task DisposeAsync()
{
- await _toxiproxyManager.DisposeAsync();
+ if (_toxiproxyManager is not null)
+ {
+ await _toxiproxyManager.DisposeAsync();
+ }
await base.DisposeAsync();
}
@@ -77,6 +89,7 @@ public override async Task DisposeAsync()
public async Task TestCloseConnection()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
+ Assert.NotNull(_toxiproxyManager);
ConnectionFactory cf = CreateConnectionFactory();
cf.Port = _proxyPort;
@@ -199,6 +212,7 @@ async Task PublishLoop()
public async Task TestThatStoppedSocketResultsInHeartbeatTimeout()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
+ Assert.NotNull(_toxiproxyManager);
ConnectionFactory cf = CreateConnectionFactory();
cf.Port = _proxyPort;
@@ -246,6 +260,7 @@ await Assert.ThrowsAsync(() =>
public async Task TestTcpReset_GH1464()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
+ Assert.NotNull(_toxiproxyManager);
ConnectionFactory cf = CreateConnectionFactory();
cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), _proxyPort);
@@ -298,6 +313,7 @@ public async Task TestTcpReset_GH1464()
public async Task TestPublisherConfirmationThrottling()
{
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
+ Assert.NotNull(_toxiproxyManager);
const int TotalMessageCount = 64;
const int MaxOutstandingConfirms = 8;
@@ -397,7 +413,7 @@ private bool AreToxiproxyTestsEnabled
{
get
{
- string s = Environment.GetEnvironmentVariable("RABBITMQ_TOXIPROXY_TESTS");
+ string? s = Environment.GetEnvironmentVariable("RABBITMQ_TOXIPROXY_TESTS");
if (string.IsNullOrEmpty(s))
{