Skip to content

Commit

Permalink
Merge pull request #511 from Particular/always-delay
Browse files Browse the repository at this point in the history
Always delay moving of delayed messages even when exceptions occur to honor the ProcessingInterval
  • Loading branch information
danielmarbach authored Oct 16, 2019
2 parents 4904053 + d2f5ac2 commit a42f956
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 7 deletions.
18 changes: 13 additions & 5 deletions src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public DelayedMessageHandler(DelayedMessageTable table, SqlConnectionFactory con
this.connectionFactory = connectionFactory;
this.interval = interval;
this.batchSize = batchSize;
message = $"Scheduling next attempt to move matured delayed messages to input queue in {interval}";
}

public void Start()
Expand Down Expand Up @@ -43,28 +44,35 @@ async Task MoveMaturedDelayedMessages()
{
using (var transaction = connection.BeginTransaction())
{
await table.MoveMaturedMessages(batchSize, connection, transaction).ConfigureAwait(false);
await table.MoveMaturedMessages(batchSize, connection, transaction, cancellationToken).ConfigureAwait(false);
transaction.Commit();
}
}
Logger.DebugFormat("Scheduling next attempt to move matured delayed messages to input queue in {0}", interval);
await Task.Delay(interval, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Graceful shutdown
return;
}
catch (SqlException e) when (cancellationToken.IsCancellationRequested)
{
Logger.Debug("Exception thrown while performing cancellation", e);
return;
}
catch (Exception e)
{
Logger.Fatal("Exception thrown while performing cancellation", e);
Logger.Fatal("Exception thrown while moving matured delayed messages", e);
}
finally
{
Logger.DebugFormat(message);
await Task.Delay(interval, cancellationToken).IgnoreCancellation()
.ConfigureAwait(false);
}
}
}

string message;
DelayedMessageTable table;
SqlConnectionFactory connectionFactory;
TimeSpan interval;
Expand All @@ -75,4 +83,4 @@ async Task MoveMaturedDelayedMessages()

static ILog Logger = LogManager.GetLogger<DelayedMessageHandler>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace NServiceBus.Transport.SQLServer
{
using System;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Transport;

Expand All @@ -25,12 +26,12 @@ public async Task Store(OutgoingMessage message, DateTime dueTime, string destin
}
}

public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, SqlTransaction transaction)
public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken)
{
using (var command = new SqlCommand(moveMaturedCommand, connection, transaction))
{
command.Parameters.AddWithValue("BatchSize", batchSize);
await command.ExecuteNonQueryAsync().ConfigureAwait(false);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}

Expand Down
12 changes: 12 additions & 0 deletions src/NServiceBus.SqlServer/TaskEx.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.Transport.SQLServer
{
using System;
using System.Threading.Tasks;

static class TaskEx
Expand All @@ -11,6 +12,17 @@ public static void Ignore(this Task task)
{
}

public static async Task IgnoreCancellation(this Task task)
{
try
{
await task.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}

//TODO: remove when we update to 4.6 and can use Task.CompletedTask
public static readonly Task CompletedTask = Task.FromResult(0);

Expand Down

0 comments on commit a42f956

Please sign in to comment.