From 5e38afd67231cc01060c218db612c6abd08d36cc Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 16 Oct 2019 09:57:05 +0200 Subject: [PATCH 1/6] Pass corresponding dateparts to the sql layer --- .../DelayedDelivery/StoreDelayedMessageCommand.cs | 6 +++++- src/NServiceBus.SqlServer/Queuing/SqlConstants.cs | 9 ++++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/StoreDelayedMessageCommand.cs b/src/NServiceBus.SqlServer/DelayedDelivery/StoreDelayedMessageCommand.cs index b89d4ee21..31ea71c0f 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/StoreDelayedMessageCommand.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/StoreDelayedMessageCommand.cs @@ -27,7 +27,11 @@ public void PrepareSendCommand(SqlCommand command) { AddParameter(command, "Headers", SqlDbType.NVarChar, headers); AddParameter(command, "Body", SqlDbType.VarBinary, bodyBytes); - AddParameter(command, "DueAfterMs", SqlDbType.BigInt, dueAfter.TotalMilliseconds); + AddParameter(command, "DueAfterDays", SqlDbType.Int, dueAfter.Days); + AddParameter(command, "DueAfterHours", SqlDbType.Int, dueAfter.Hours); + AddParameter(command, "DueAfterMinutes", SqlDbType.Int, dueAfter.Minutes); + AddParameter(command, "DueAfterSeconds", SqlDbType.Int, dueAfter.Seconds); + AddParameter(command, "DueAfterMilliseconds", SqlDbType.Int, dueAfter.Milliseconds); } void AddParameter(SqlCommand command, string name, SqlDbType type, object value) diff --git a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs index f85e403c4..e2686b91e 100644 --- a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs +++ b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs @@ -44,6 +44,13 @@ THEN DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()) END, IF ( (512 & @@OPTIONS) = 512 ) SET @NOCOUNT = 'ON' SET NOCOUNT ON; +DECLARE @DueAfter DATETIME = GETUTCDATE(); +SET @DueAfter = DATEADD(ms, @DueAfterMilliseconds, @DueAfter); +SET @DueAfter = DATEADD(s, @DueAfterSeconds, @DueAfter); +SET @DueAfter = DATEADD(n, @DueAfterMinutes, @DueAfter); +SET @DueAfter = DATEADD(hh, @DueAfterHours, @DueAfter); +SET @DueAfter = DATEADD(d, @DueAfterDays, @DueAfter); + INSERT INTO {0} ( Headers, Body, @@ -51,7 +58,7 @@ THEN DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()) END, VALUES ( @Headers, @Body, - DATEADD(ms, @DueAfterMs, GETUTCDATE())); + @DueAfter); IF(@NOCOUNT = 'ON') SET NOCOUNT ON; IF(@NOCOUNT = 'OFF') SET NOCOUNT OFF;"; From 654a9308a6a4f7edb15faa33b52f44c846ce5283 Mon Sep 17 00:00:00 2001 From: Christoffer Rydberg Date: Wed, 11 Sep 2019 10:29:06 +0200 Subject: [PATCH 2/6] Add delay in MoveMaturedDelayedMessages on exception --- .../DelayedDelivery/DelayedMessageHandler.cs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs index 3d07e8f07..41b5ded87 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs @@ -54,13 +54,16 @@ async Task MoveMaturedDelayedMessages() { // Graceful shutdown } - catch (SqlException e) when (cancellationToken.IsCancellationRequested) + catch (Exception e) when (cancellationToken.IsCancellationRequested) { Logger.Debug("Exception thrown while performing cancellation", e); } catch (Exception e) { - Logger.Fatal("Exception thrown while performing cancellation", e); + Logger.Fatal("Exception thrown while moving matured delayed messages", e); + + Logger.DebugFormat("Scheduling next attempt to move matured delayed messages to input queue in {0}", interval); + await Task.Delay(interval, cancellationToken).ConfigureAwait(false); } } } From 4e1f54f41c07d5b02cb83af0220d9769f8c6d647 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 16 Oct 2019 10:56:39 +0200 Subject: [PATCH 3/6] Delay in the finally block --- .../DelayedDelivery/DelayedMessageHandler.cs | 13 +++++++------ src/NServiceBus.SqlServer/TaskEx.cs | 12 ++++++++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs index 41b5ded87..13437fd2c 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs @@ -47,23 +47,24 @@ async Task MoveMaturedDelayedMessages() transaction.Commit(); } } - Logger.DebugFormat("Scheduling next attempt to move matured delayed messages to input queue in {0}", interval); - await Task.Delay(interval, cancellationToken).ConfigureAwait(false); } catch (OperationCanceledException) { // Graceful shutdown } - catch (Exception e) when (cancellationToken.IsCancellationRequested) + catch (SqlException e) when (cancellationToken.IsCancellationRequested) { Logger.Debug("Exception thrown while performing cancellation", e); } catch (Exception e) { Logger.Fatal("Exception thrown while moving matured delayed messages", e); - + } + finally + { Logger.DebugFormat("Scheduling next attempt to move matured delayed messages to input queue in {0}", interval); - await Task.Delay(interval, cancellationToken).ConfigureAwait(false); + await Task.Delay(interval, cancellationToken).IgnoreCancellation() + .ConfigureAwait(false); } } } @@ -78,4 +79,4 @@ async Task MoveMaturedDelayedMessages() static ILog Logger = LogManager.GetLogger(); } -} +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/TaskEx.cs b/src/NServiceBus.SqlServer/TaskEx.cs index 22f7199ae..d71c2a4b9 100644 --- a/src/NServiceBus.SqlServer/TaskEx.cs +++ b/src/NServiceBus.SqlServer/TaskEx.cs @@ -1,5 +1,6 @@ namespace NServiceBus.Transport.SQLServer { + using System; using System.Threading.Tasks; static class TaskEx @@ -11,6 +12,17 @@ public static void Ignore(this Task task) { } + public static async Task IgnoreCancellation(this Task task) + { + try + { + await task.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + } + } + //TODO: remove when we update to 4.6 and can use Task.CompletedTask public static readonly Task CompletedTask = Task.FromResult(0); From 617e508ce99ca47349ad76b91a0a390d266661d5 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 16 Oct 2019 11:00:12 +0200 Subject: [PATCH 4/6] Only log when really needed and not on shutdown scenarios --- .../DelayedDelivery/DelayedMessageHandler.cs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs index 13437fd2c..fcc055ad7 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs @@ -62,7 +62,10 @@ async Task MoveMaturedDelayedMessages() } finally { - Logger.DebugFormat("Scheduling next attempt to move matured delayed messages to input queue in {0}", interval); + if (!cancellationToken.IsCancellationRequested && Logger.IsDebugEnabled) + { + Logger.DebugFormat("Scheduling next attempt to move matured delayed messages to input queue in {0}", interval); + } await Task.Delay(interval, cancellationToken).IgnoreCancellation() .ConfigureAwait(false); } From 06dd6205ee282334b3243e67c02cc469594b36e9 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 16 Oct 2019 11:12:58 +0200 Subject: [PATCH 5/6] Cancel move on shutdown. This is fine because we operate in a transaction --- .../DelayedDelivery/DelayedMessageHandler.cs | 4 +++- .../DelayedDelivery/DelayedMessageTable.cs | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs index fcc055ad7..005344669 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs @@ -43,7 +43,7 @@ async Task MoveMaturedDelayedMessages() { using (var transaction = connection.BeginTransaction()) { - await table.MoveMaturedMessages(batchSize, connection, transaction).ConfigureAwait(false); + await table.MoveMaturedMessages(batchSize, connection, transaction, cancellationToken).ConfigureAwait(false); transaction.Commit(); } } @@ -51,10 +51,12 @@ async Task MoveMaturedDelayedMessages() catch (OperationCanceledException) { // Graceful shutdown + return; } catch (SqlException e) when (cancellationToken.IsCancellationRequested) { Logger.Debug("Exception thrown while performing cancellation", e); + return; } catch (Exception e) { diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs index 7e9745d44..ef8112804 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageTable.cs @@ -2,6 +2,7 @@ namespace NServiceBus.Transport.SQLServer { using System; using System.Data.SqlClient; + using System.Threading; using System.Threading.Tasks; using Transport; @@ -25,12 +26,12 @@ public async Task Store(OutgoingMessage message, TimeSpan dueAfter, string desti } } - public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, SqlTransaction transaction) + public async Task MoveMaturedMessages(int batchSize, SqlConnection connection, SqlTransaction transaction, CancellationToken cancellationToken) { using (var command = new SqlCommand(moveMaturedCommand, connection, transaction)) { command.Parameters.AddWithValue("BatchSize", batchSize); - await command.ExecuteNonQueryAsync().ConfigureAwait(false); + await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false); } } From 51d708e8ebe76acd40324765cc1e7ce40b31fe47 Mon Sep 17 00:00:00 2001 From: danielmarbach Date: Wed, 16 Oct 2019 11:13:40 +0200 Subject: [PATCH 6/6] Don't compute format every time given the interval is fixed --- .../DelayedDelivery/DelayedMessageHandler.cs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs index 005344669..192e71260 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs @@ -14,6 +14,7 @@ public DelayedMessageHandler(DelayedMessageTable table, SqlConnectionFactory con this.connectionFactory = connectionFactory; this.interval = interval; this.batchSize = batchSize; + message = $"Scheduling next attempt to move matured delayed messages to input queue in {interval}"; } public void Start() @@ -64,16 +65,14 @@ async Task MoveMaturedDelayedMessages() } finally { - if (!cancellationToken.IsCancellationRequested && Logger.IsDebugEnabled) - { - Logger.DebugFormat("Scheduling next attempt to move matured delayed messages to input queue in {0}", interval); - } + Logger.DebugFormat(message); await Task.Delay(interval, cancellationToken).IgnoreCancellation() .ConfigureAwait(false); } } } + string message; DelayedMessageTable table; SqlConnectionFactory connectionFactory; TimeSpan interval;