From ef418c061cca6c95c008ef9c52e5a621d7906718 Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Wed, 25 Sep 2024 12:56:24 -0500 Subject: [PATCH] Refactoring to ensure that channels are drained successfully even if cancellation token throws. Minor perf improvments as spotted. Added/fixed comments. Signed-off-by: Whit Waldo --- .../PublishSubscribeReceiver.cs | 199 +++++++++--------- 1 file changed, 100 insertions(+), 99 deletions(-) diff --git a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs index 7d5a64ff4..aa9359e40 100644 --- a/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs +++ b/src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs @@ -11,6 +11,7 @@ // limitations under the License. // ------------------------------------------------------------------------ +using System.Threading; using System.Threading.Channels; using Dapr.AppCallback.Autogen.Grpc.v1; using Grpc.Core; @@ -39,11 +40,11 @@ public sealed class PublishSubscribeReceiver : IAsyncDisposable /// /// A channel used to decouple the messages received from the sidecar to their consumption. /// - private readonly Channel channel = Channel.CreateUnbounded(); + private readonly Channel topicMessagesChannel = Channel.CreateUnbounded(); /// /// Maintains the various acknowledgements for each message. /// - private readonly Channel acknowledgements = Channel.CreateUnbounded(); + private readonly Channel acknowledgementsChannel = Channel.CreateUnbounded(); /// /// The stream connection between this instance and the Dapr sidecar. /// @@ -61,9 +62,13 @@ public sealed class PublishSubscribeReceiver : IAsyncDisposable /// private readonly P.Dapr.DaprClient client; /// - /// Flag that prevents the developer from accidentally subscribing more than once from the same receiver. + /// Flag that prevents the developer from accidentally initializing the subscription more than once from the same receiver. /// - private bool hasInitialized; + private bool hasInitialized = false; + /// + /// Flag that ensures the instance is only disposed a single time. + /// + private bool isDisposed = false; /// /// Constructs a new instance of a instance. @@ -95,48 +100,21 @@ public async Task SubscribeAsync(CancellationToken cancellationToken = default) hasInitialized = true; var stream = await GetStreamAsync(cancellationToken); + //Retrieve the messages from the sidecar and write to the messages channel - _ = FetchDataFromSidecarAsync(stream, channel.Writer, cancellationToken); + var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken); + + //Process the messages as they're written to either channel + var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(cancellationToken); + var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken); try { - //Processes each acknowledgement from the acknowledgement channel reader as it's populated - await foreach (var acknowledgement in acknowledgements.Reader.ReadAllAsync(cancellationToken)) - { - await ProcessAcknowledgementAsync(acknowledgement); - } - - //Read the messages one-by-one out of the messages channel - while (await channel.Reader.WaitToReadAsync(cancellationToken)) - { - while (channel.Reader.TryRead(out var message)) - { - using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); - cts.CancelAfter(options.MessageHandlingPolicy.TimeoutDuration); - - //Evaluate the message and return an acknowledgement result - var messageAction = await messageHandler(message, cts.Token); - - try - { - //Share the result with the sidecar - await AcknowledgeMessageAsync(message.Id, messageAction); - } - catch (OperationCanceledException) - { - //Acknowledge the message using the configured default response action - await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction); - } - } - } + await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask); } catch (OperationCanceledException) { - //Drain the remaining messages with the default action in the order in which they were queued - while (channel.Reader.TryRead(out var message)) - { - await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction); - } + await DisposeAsync(); } } @@ -144,10 +122,8 @@ public async Task SubscribeAsync(CancellationToken cancellationToken = default) /// Retrieves or creates the bidirectional stream to the DaprClient for transacting pub/sub subscriptions. /// /// Cancellation token. - /// - private async - Task> - GetStreamAsync(CancellationToken cancellationToken) + /// The stream connection. + private async Task> GetStreamAsync(CancellationToken cancellationToken) { await semaphore.WaitAsync(cancellationToken); @@ -179,59 +155,97 @@ private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction }; var acknowledgement = new TopicAcknowledgement(messageId, action); - await acknowledgements.Writer.WriteAsync(acknowledgement); + await acknowledgementsChannel.Writer.WriteAsync(acknowledgement); } /// - /// Retrieves the subscription stream data from the Dapr sidecar. + /// Processes each acknowledgement from the acknowledgement channel reader as it's populated. /// - /// The stream connection to and from the Dream sidecar instance. - /// The channel writer instance. /// Cancellation token. - private async Task FetchDataFromSidecarAsync(AsyncDuplexStreamingCall stream, ChannelWriter channelWriter, CancellationToken cancellationToken) + private async Task ProcessAcknowledgementChannelMessagesAsync(CancellationToken cancellationToken) { - try - { - var initialRequest = new P.SubscribeTopicEventsRequestInitialAlpha1() - { - PubsubName = pubSubName, - DeadLetterTopic = options.DeadLetterTopic ?? string.Empty, - Topic = topicName - }; - - if (options?.Metadata.Count > 0) + var messageStream = await GetStreamAsync(cancellationToken); + await foreach (var acknowledgement in acknowledgementsChannel.Reader.ReadAllAsync(cancellationToken)) + { + await messageStream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1 { - foreach (var (key, value) in options.Metadata) + EventProcessed = new() { - initialRequest.Metadata.Add(key, value); + Id = acknowledgement.MessageId, + Status = new() { Status = acknowledgement.Action } } - } + }, cancellationToken); + } + } + + /// + /// Processes each topic messages from the channel as it's populated. + /// + /// Cancellation token. + private async Task ProcessTopicChannelMessagesAsync(CancellationToken cancellationToken) + { + await foreach (var message in topicMessagesChannel.Reader.ReadAllAsync(cancellationToken)) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(options.MessageHandlingPolicy.TimeoutDuration); - await stream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1 { InitialRequest = initialRequest }, cancellationToken); + //Evaluate the message and return an acknowledgement result + var messageAction = await messageHandler(message, cts.Token); - await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken)) + try { - var message = new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type, response.EventMessage.SpecVersion, response.EventMessage.DataContentType, response.EventMessage.Topic, response.EventMessage.PubsubName) - { - Path = response.EventMessage.Path, - Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value) - }; - - await channelWriter.WriteAsync(message, cancellationToken); + //Share the result with the sidecar + await AcknowledgeMessageAsync(message.Id, messageAction); + } + catch (OperationCanceledException) + { + //Acknowledge the message using the configured default response action + await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction); } } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + } + + /// + /// Retrieves the subscription stream data from the Dapr sidecar. + /// + /// The stream connection to and from the Dapr sidecar instance. + /// The channel writer instance. + /// Cancellation token. + private async Task FetchDataFromSidecarAsync( + AsyncDuplexStreamingCall stream, + ChannelWriter channelWriter, CancellationToken cancellationToken) + { + //Build out the initial topic events request + var initialRequest = new P.SubscribeTopicEventsRequestInitialAlpha1() { - //Ignore our own cancellation - } - catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled && - cancellationToken.IsCancellationRequested) + PubsubName = pubSubName, DeadLetterTopic = options.DeadLetterTopic ?? string.Empty, Topic = topicName + }; + + if (options?.Metadata.Count > 0) { - //Ignore a remote cancellation due to our own cancellation + foreach (var (key, value) in options.Metadata) + { + initialRequest.Metadata.Add(key, value); + } } - finally + + //Send this request to the Dapr sidecar + await stream.RequestStream.WriteAsync( + new P.SubscribeTopicEventsRequestAlpha1 { InitialRequest = initialRequest }, cancellationToken); + + //Each time a message is received from the stream, push it into the topic messages channel + await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken)) { - channel.Writer.Complete(); + var message = + new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type, + response.EventMessage.SpecVersion, response.EventMessage.DataContentType, + response.EventMessage.Topic, response.EventMessage.PubsubName) + { + Path = response.EventMessage.Path, + Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value) + }; + + await channelWriter.WriteAsync(message, cancellationToken); } } @@ -241,13 +255,17 @@ private async Task FetchDataFromSidecarAsync(AsyncDuplexStreamingCall public async ValueTask DisposeAsync() { + if (isDisposed) + return; + isDisposed = true; + //Stop processing new events - channel.Writer.Complete(); + topicMessagesChannel.Writer.Complete(); try { //Process any remaining messages on the channel - await channel.Reader.Completion; + await topicMessagesChannel.Reader.Completion; } catch (OperationCanceledException) { @@ -255,11 +273,11 @@ public async ValueTask DisposeAsync() } //Flush the remaining acknowledgements, but start by marking the writer as complete - acknowledgements.Writer.Complete(); + acknowledgementsChannel.Writer.Complete(); try { //Process any remaining acknowledgements on the channel - await acknowledgements.Reader.Completion; + await acknowledgementsChannel.Reader.Completion; } catch (OperationCanceledException) { @@ -267,23 +285,6 @@ public async ValueTask DisposeAsync() } } - /// - /// Processes each of the acknowledgement messages. - /// - /// Information about the message and action to take on it. - /// - private async Task ProcessAcknowledgementAsync(TopicAcknowledgement acknowledgement) - { - var messageStream = await GetStreamAsync(CancellationToken.None); - await messageStream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1 - { - EventProcessed = new() - { - Id = acknowledgement.MessageId, Status = new() { Status = acknowledgement.Action } - } - }); - } - /// /// Reflects the action to take on a given message identifier. ///