Skip to content

Commit

Permalink
Merge pull request #510 from Particular/master
Browse files Browse the repository at this point in the history
Merge master into release
  • Loading branch information
danielmarbach authored Oct 16, 2019
2 parents 9808911 + bc8981d commit 59503ed
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 9 deletions.
18 changes: 13 additions & 5 deletions src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public DelayedMessageHandler(DelayedMessageTable table, SqlConnectionFactory con
this.connectionFactory = connectionFactory;
this.interval = interval;
this.batchSize = batchSize;
message = $"Scheduling next attempt to move matured delayed messages to input queue in {interval}";
}

public void Start()
Expand Down Expand Up @@ -43,28 +44,35 @@ async Task MoveMaturedDelayedMessages()
{
using (var transaction = connection.BeginTransaction())
{
await table.MoveMaturedMessages(batchSize, connection, transaction).ConfigureAwait(false);
await table.MoveMaturedMessages(batchSize, connection, transaction, cancellationToken).ConfigureAwait(false);
transaction.Commit();
}
}
Logger.DebugFormat("Scheduling next attempt to move matured delayed messages to input queue in {0}", interval);
await Task.Delay(interval, cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// Graceful shutdown
return;
}
catch (SqlException e) when (cancellationToken.IsCancellationRequested)
{
Logger.Debug("Exception thrown while performing cancellation", e);
return;
}
catch (Exception e)
{
Logger.Fatal("Exception thrown while performing cancellation", e);
Logger.Fatal("Exception thrown while moving matured delayed messages", e);
}
finally
{
Logger.DebugFormat(message);
await Task.Delay(interval, cancellationToken).IgnoreCancellation()
.ConfigureAwait(false);
}
}
}

string message;
DelayedMessageTable table;
SqlConnectionFactory connectionFactory;
TimeSpan interval;
Expand All @@ -75,4 +83,4 @@ async Task MoveMaturedDelayedMessages()

static ILog Logger = LogManager.GetLogger<DelayedMessageHandler>();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace NServiceBus.Transport.SQLServer
{
using System;
using System.Data.SqlClient;
using System.Threading;
using System.Threading.Tasks;
using Transport;

Expand All @@ -25,12 +26,12 @@ public async Task Store(OutgoingMessage message, TimeSpan dueAfter, string desti
}
}

public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, SqlTransaction transaction)
public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken)
{
using (var command = new SqlCommand(moveMaturedCommand, connection, transaction))
{
command.Parameters.AddWithValue("BatchSize", batchSize);
await command.ExecuteNonQueryAsync().ConfigureAwait(false);
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ public void PrepareSendCommand(SqlCommand command)
{
AddParameter(command, "Headers", SqlDbType.NVarChar, headers);
AddParameter(command, "Body", SqlDbType.VarBinary, bodyBytes);
AddParameter(command, "DueAfterMs", SqlDbType.BigInt, dueAfter.TotalMilliseconds);
AddParameter(command, "DueAfterDays", SqlDbType.Int, dueAfter.Days);
AddParameter(command, "DueAfterHours", SqlDbType.Int, dueAfter.Hours);
AddParameter(command, "DueAfterMinutes", SqlDbType.Int, dueAfter.Minutes);
AddParameter(command, "DueAfterSeconds", SqlDbType.Int, dueAfter.Seconds);
AddParameter(command, "DueAfterMilliseconds", SqlDbType.Int, dueAfter.Milliseconds);
}

void AddParameter(SqlCommand command, string name, SqlDbType type, object value)
Expand Down
9 changes: 8 additions & 1 deletion src/NServiceBus.SqlServer/Queuing/SqlConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,21 @@ THEN DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()) END,
IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON'
SET NOCOUNT ON;
DECLARE @DueAfter DATETIME = GETUTCDATE();
SET @DueAfter = DATEADD(ms, @DueAfterMilliseconds, @DueAfter);
SET @DueAfter = DATEADD(s, @DueAfterSeconds, @DueAfter);
SET @DueAfter = DATEADD(n, @DueAfterMinutes, @DueAfter);
SET @DueAfter = DATEADD(hh, @DueAfterHours, @DueAfter);
SET @DueAfter = DATEADD(d, @DueAfterDays, @DueAfter);
INSERT INTO {0} (
Headers,
Body,
Due)
VALUES (
@Headers,
@Body,
DATEADD(ms, @DueAfterMs, GETUTCDATE()));
@DueAfter);
IF(@NOCOUNT = 'ON') SET NOCOUNT ON;
IF(@NOCOUNT = 'OFF') SET NOCOUNT OFF;";
Expand Down
12 changes: 12 additions & 0 deletions src/NServiceBus.SqlServer/TaskEx.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.Transport.SQLServer
{
using System;
using System.Threading.Tasks;

static class TaskEx
Expand All @@ -11,6 +12,17 @@ public static void Ignore(this Task task)
{
}

public static async Task IgnoreCancellation(this Task task)
{
try
{
await task.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}
}

//TODO: remove when we update to 4.6 and can use Task.CompletedTask
public static readonly Task CompletedTask = Task.FromResult(0);

Expand Down

0 comments on commit 59503ed

Please sign in to comment.