Skip to content

Commit

Permalink
Refactoring to ensure that channels are drained successfully even if …
Browse files Browse the repository at this point in the history
…cancellation token throws. Minor perf improvments as spotted. Added/fixed comments.

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
  • Loading branch information
WhitWaldo committed Sep 25, 2024
1 parent 521396f commit ef418c0
Showing 1 changed file with 100 additions and 99 deletions.
199 changes: 100 additions & 99 deletions src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// limitations under the License.
// ------------------------------------------------------------------------

using System.Threading;
using System.Threading.Channels;
using Dapr.AppCallback.Autogen.Grpc.v1;
using Grpc.Core;
Expand Down Expand Up @@ -39,11 +40,11 @@ public sealed class PublishSubscribeReceiver : IAsyncDisposable
/// <summary>
/// A channel used to decouple the messages received from the sidecar to their consumption.
/// </summary>
private readonly Channel<TopicMessage> channel = Channel.CreateUnbounded<TopicMessage>();
private readonly Channel<TopicMessage> topicMessagesChannel = Channel.CreateUnbounded<TopicMessage>();
/// <summary>
/// Maintains the various acknowledgements for each message.
/// </summary>
private readonly Channel<TopicAcknowledgement> acknowledgements = Channel.CreateUnbounded<TopicAcknowledgement>();
private readonly Channel<TopicAcknowledgement> acknowledgementsChannel = Channel.CreateUnbounded<TopicAcknowledgement>();
/// <summary>
/// The stream connection between this instance and the Dapr sidecar.
/// </summary>
Expand All @@ -61,9 +62,13 @@ public sealed class PublishSubscribeReceiver : IAsyncDisposable
/// </summary>
private readonly P.Dapr.DaprClient client;
/// <summary>
/// Flag that prevents the developer from accidentally subscribing more than once from the same receiver.
/// Flag that prevents the developer from accidentally initializing the subscription more than once from the same receiver.
/// </summary>
private bool hasInitialized;
private bool hasInitialized = false;
/// <summary>
/// Flag that ensures the instance is only disposed a single time.
/// </summary>
private bool isDisposed = false;

/// <summary>
/// Constructs a new instance of a <see cref="PublishSubscribeReceiver"/> instance.
Expand Down Expand Up @@ -95,59 +100,30 @@ public async Task SubscribeAsync(CancellationToken cancellationToken = default)
hasInitialized = true;

var stream = await GetStreamAsync(cancellationToken);

//Retrieve the messages from the sidecar and write to the messages channel
_ = FetchDataFromSidecarAsync(stream, channel.Writer, cancellationToken);
var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken);

//Process the messages as they're written to either channel
var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(cancellationToken);
var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken);

try
{
//Processes each acknowledgement from the acknowledgement channel reader as it's populated
await foreach (var acknowledgement in acknowledgements.Reader.ReadAllAsync(cancellationToken))
{
await ProcessAcknowledgementAsync(acknowledgement);
}

//Read the messages one-by-one out of the messages channel
while (await channel.Reader.WaitToReadAsync(cancellationToken))
{
while (channel.Reader.TryRead(out var message))
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(options.MessageHandlingPolicy.TimeoutDuration);

//Evaluate the message and return an acknowledgement result
var messageAction = await messageHandler(message, cts.Token);

try
{
//Share the result with the sidecar
await AcknowledgeMessageAsync(message.Id, messageAction);
}
catch (OperationCanceledException)
{
//Acknowledge the message using the configured default response action
await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction);
}
}
}
await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask);
}
catch (OperationCanceledException)
{
//Drain the remaining messages with the default action in the order in which they were queued
while (channel.Reader.TryRead(out var message))
{
await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction);
}
await DisposeAsync();
}
}

/// <summary>
/// Retrieves or creates the bidirectional stream to the DaprClient for transacting pub/sub subscriptions.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
private async
Task<AsyncDuplexStreamingCall<P.SubscribeTopicEventsRequestAlpha1, P.SubscribeTopicEventsResponseAlpha1>>
GetStreamAsync(CancellationToken cancellationToken)
/// <returns>The stream connection.</returns>
private async Task<AsyncDuplexStreamingCall<P.SubscribeTopicEventsRequestAlpha1, P.SubscribeTopicEventsResponseAlpha1>> GetStreamAsync(CancellationToken cancellationToken)
{
await semaphore.WaitAsync(cancellationToken);

Expand Down Expand Up @@ -179,59 +155,97 @@ private async Task AcknowledgeMessageAsync(string messageId, TopicResponseAction
};

var acknowledgement = new TopicAcknowledgement(messageId, action);
await acknowledgements.Writer.WriteAsync(acknowledgement);
await acknowledgementsChannel.Writer.WriteAsync(acknowledgement);
}

/// <summary>
/// Retrieves the subscription stream data from the Dapr sidecar.
/// Processes each acknowledgement from the acknowledgement channel reader as it's populated.
/// </summary>
/// <param name="stream">The stream connection to and from the Dream sidecar instance.</param>
/// <param name="channelWriter">The channel writer instance.</param>
/// <param name="cancellationToken">Cancellation token.</param>
private async Task FetchDataFromSidecarAsync(AsyncDuplexStreamingCall<P.SubscribeTopicEventsRequestAlpha1, P.SubscribeTopicEventsResponseAlpha1> stream, ChannelWriter<TopicMessage> channelWriter, CancellationToken cancellationToken)
private async Task ProcessAcknowledgementChannelMessagesAsync(CancellationToken cancellationToken)
{
try
{
var initialRequest = new P.SubscribeTopicEventsRequestInitialAlpha1()
{
PubsubName = pubSubName,
DeadLetterTopic = options.DeadLetterTopic ?? string.Empty,
Topic = topicName
};

if (options?.Metadata.Count > 0)
var messageStream = await GetStreamAsync(cancellationToken);
await foreach (var acknowledgement in acknowledgementsChannel.Reader.ReadAllAsync(cancellationToken))
{
await messageStream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1
{
foreach (var (key, value) in options.Metadata)
EventProcessed = new()
{
initialRequest.Metadata.Add(key, value);
Id = acknowledgement.MessageId,
Status = new() { Status = acknowledgement.Action }
}
}
}, cancellationToken);
}
}

/// <summary>
/// Processes each topic messages from the channel as it's populated.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
private async Task ProcessTopicChannelMessagesAsync(CancellationToken cancellationToken)
{
await foreach (var message in topicMessagesChannel.Reader.ReadAllAsync(cancellationToken))
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(options.MessageHandlingPolicy.TimeoutDuration);

await stream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1 { InitialRequest = initialRequest }, cancellationToken);
//Evaluate the message and return an acknowledgement result
var messageAction = await messageHandler(message, cts.Token);

await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken))
try
{
var message = new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type, response.EventMessage.SpecVersion, response.EventMessage.DataContentType, response.EventMessage.Topic, response.EventMessage.PubsubName)
{
Path = response.EventMessage.Path,
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value)
};

await channelWriter.WriteAsync(message, cancellationToken);
//Share the result with the sidecar
await AcknowledgeMessageAsync(message.Id, messageAction);
}
catch (OperationCanceledException)
{
//Acknowledge the message using the configured default response action
await AcknowledgeMessageAsync(message.Id, options.MessageHandlingPolicy.DefaultResponseAction);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
}

/// <summary>
/// Retrieves the subscription stream data from the Dapr sidecar.
/// </summary>
/// <param name="stream">The stream connection to and from the Dapr sidecar instance.</param>
/// <param name="channelWriter">The channel writer instance.</param>
/// <param name="cancellationToken">Cancellation token.</param>
private async Task FetchDataFromSidecarAsync(
AsyncDuplexStreamingCall<P.SubscribeTopicEventsRequestAlpha1, P.SubscribeTopicEventsResponseAlpha1> stream,
ChannelWriter<TopicMessage> channelWriter, CancellationToken cancellationToken)
{
//Build out the initial topic events request
var initialRequest = new P.SubscribeTopicEventsRequestInitialAlpha1()
{
//Ignore our own cancellation
}
catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled &&
cancellationToken.IsCancellationRequested)
PubsubName = pubSubName, DeadLetterTopic = options.DeadLetterTopic ?? string.Empty, Topic = topicName
};

if (options?.Metadata.Count > 0)
{
//Ignore a remote cancellation due to our own cancellation
foreach (var (key, value) in options.Metadata)
{
initialRequest.Metadata.Add(key, value);
}
}
finally

//Send this request to the Dapr sidecar
await stream.RequestStream.WriteAsync(
new P.SubscribeTopicEventsRequestAlpha1 { InitialRequest = initialRequest }, cancellationToken);

//Each time a message is received from the stream, push it into the topic messages channel
await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken))
{
channel.Writer.Complete();
var message =
new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type,
response.EventMessage.SpecVersion, response.EventMessage.DataContentType,
response.EventMessage.Topic, response.EventMessage.PubsubName)
{
Path = response.EventMessage.Path,
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value)
};

await channelWriter.WriteAsync(message, cancellationToken);
}
}

Expand All @@ -241,49 +255,36 @@ private async Task FetchDataFromSidecarAsync(AsyncDuplexStreamingCall<P.Subscrib
/// <returns></returns>
public async ValueTask DisposeAsync()
{
if (isDisposed)
return;
isDisposed = true;

//Stop processing new events
channel.Writer.Complete();
topicMessagesChannel.Writer.Complete();

try
{
//Process any remaining messages on the channel
await channel.Reader.Completion;
await topicMessagesChannel.Reader.Completion;
}
catch (OperationCanceledException)
{
// Handled
}

//Flush the remaining acknowledgements, but start by marking the writer as complete
acknowledgements.Writer.Complete();
acknowledgementsChannel.Writer.Complete();
try
{
//Process any remaining acknowledgements on the channel
await acknowledgements.Reader.Completion;
await acknowledgementsChannel.Reader.Completion;
}
catch (OperationCanceledException)
{
//Handled
}
}

/// <summary>
/// Processes each of the acknowledgement messages.
/// </summary>
/// <param name="acknowledgement">Information about the message and action to take on it.</param>
/// <returns></returns>
private async Task ProcessAcknowledgementAsync(TopicAcknowledgement acknowledgement)
{
var messageStream = await GetStreamAsync(CancellationToken.None);
await messageStream.RequestStream.WriteAsync(new P.SubscribeTopicEventsRequestAlpha1
{
EventProcessed = new()
{
Id = acknowledgement.MessageId, Status = new() { Status = acknowledgement.Action }
}
});
}

/// <summary>
/// Reflects the action to take on a given message identifier.
/// </summary>
Expand Down

0 comments on commit ef418c0

Please sign in to comment.