diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs new file mode 100644 index 000000000..70d95cfa0 --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_passing_custom_transaction_via_sendoptions.cs @@ -0,0 +1,109 @@ +namespace NServiceBus.SqlServer.AcceptanceTests +{ + using System; + using System.Data.SqlClient; + using System.Threading.Tasks; + using AcceptanceTesting; + using NServiceBus.AcceptanceTests; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + using Transport.SQLServer; + + public class When_passing_custom_transaction_via_sendoptions : NServiceBusAcceptanceTest + { + static string ConnectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString"); + + [Test] + public async Task Should_be_used_by_send_operations() + { + var context = await Scenario.Define() + .WithEndpoint(c => c.When(async bus => + { + using (var connection = new SqlConnection(ConnectionString)) + { + connection.Open(); + + using (var rolledbackTransaction = connection.BeginTransaction()) + { + var options = new SendOptions(); + + options.UseCustomSqlConnectionAndTransaction(connection, rolledbackTransaction); + + await bus.Send(new FromRolledbackTransaction(), options); + + rolledbackTransaction.Rollback(); + } + + using (var commitedTransaction = connection.BeginTransaction()) + { + var options = new SendOptions(); + + options.UseCustomSqlConnectionAndTransaction(connection, commitedTransaction); + + await bus.Send(new FromCommitedTransaction(), options); + + commitedTransaction.Commit(); + } + } + + })) + .Done(c => c.ReceivedFromCommitedTransaction) + .Run(TimeSpan.FromMinutes(1)); + + Assert.IsFalse(context.ReceivedFromRolledbackTransaction); + } + + class FromCommitedTransaction : IMessage + { + } + + class FromRolledbackTransaction : IMessage + { + } + + class MyContext : ScenarioContext + { + public bool ReceivedFromCommitedTransaction { get; set; } + public bool ReceivedFromRolledbackTransaction { get; set; } + } + + class AnEndpoint : EndpointConfigurationBuilder + { + public AnEndpoint() + { + EndpointSetup(c => + { + c.LimitMessageProcessingConcurrencyTo(1); + + var routing = c.ConfigureTransport().Routing(); + var anEndpointName = AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(AnEndpoint)); + + routing.RouteToEndpoint(typeof(FromCommitedTransaction), anEndpointName); + routing.RouteToEndpoint(typeof(FromRolledbackTransaction), anEndpointName); + }); + } + + class ReplyHandler : IHandleMessages, + IHandleMessages + { + public MyContext Context { get; set; } + + public Task Handle(FromRolledbackTransaction message, IMessageHandlerContext context) + { + Context.ReceivedFromRolledbackTransaction = true; + + return Task.FromResult(0); + } + + public Task Handle(FromCommitedTransaction message, IMessageHandlerContext context) + { + Context.ReceivedFromCommitedTransaction = true; + + return Task.FromResult(0); + } + } + } + + + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer.AcceptanceTests/When_using_computed_message_body_column.cs b/src/NServiceBus.SqlServer.AcceptanceTests/When_using_computed_message_body_column.cs new file mode 100644 index 000000000..a99342560 --- /dev/null +++ b/src/NServiceBus.SqlServer.AcceptanceTests/When_using_computed_message_body_column.cs @@ -0,0 +1,54 @@ +namespace NServiceBus.SqlServer.AcceptanceTests +{ + using System.Threading.Tasks; + using AcceptanceTesting; + using NServiceBus.AcceptanceTests; + using NServiceBus.AcceptanceTests.EndpointTemplates; + using NUnit.Framework; + using Transport.SQLServer; + + public class When_using_computed_message_body_column : NServiceBusAcceptanceTest + { + [Test] + public async Task Simple_send_is_received() + { + var context = await Scenario.Define() + .WithEndpoint(b => b.When((session, c) => session.SendLocal(new MyMessage()))) + .Done(c => c.WasCalled) + .Run(); + + Assert.IsTrue(context.WasCalled); + } + class Context : ScenarioContext + { + public bool WasCalled { get; set; } + } + + class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() + { + EndpointSetup(config => + { + var transportConfig = config.UseTransport(); + transportConfig.CreateMessageBodyComputedColumn(); + }); + } + + public class MyMessageHandler : IHandleMessages + { + public Context Context { get; set; } + + public Task Handle(MyMessage message, IMessageHandlerContext context) + { + Context.WasCalled = true; + return Task.FromResult(0); + } + } + } + + public class MyMessage : IMessage + { + } + } +} diff --git a/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt b/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt index 89729e5c3..029e28793 100644 --- a/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt +++ b/src/NServiceBus.SqlServer.UnitTests/APIApprovals.Approve.approved.txt @@ -22,11 +22,44 @@ namespace NServiceBus.Transport.SQLServer public void ProcessingInterval(System.TimeSpan interval) { } public void TableSuffix(string suffix) { } } + public class static SendOptionsExtensions + { + public static void UseCustomSqlConnectionAndTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlConnection connection, System.Data.SqlClient.SqlTransaction transaction) { } + } [System.ObsoleteAttribute("Not for public use.")] public class static SqlConstants { + public static readonly string AddMessageBodyStringColumn; public static readonly string CheckHeadersColumnType; public static readonly string CheckIfExpiresIndexIsPresent; + public const string CreateDelayedMessageStoreText = @" +IF EXISTS ( + SELECT * + FROM {1}.sys.objects + WHERE object_id = OBJECT_ID(N'{0}') + AND type in (N'U')) +RETURN +EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive' +IF EXISTS ( + SELECT * + FROM {1}.sys.objects + WHERE object_id = OBJECT_ID(N'{0}') + AND type in (N'U')) +BEGIN + EXEC sp_releaseapplock @Resource = '{0}_lock' + RETURN +END +CREATE TABLE {0} ( + Headers nvarchar(max) NOT NULL, + Body varbinary(max), + Due datetime NOT NULL, + RowVersion bigint IDENTITY(1,1) NOT NULL +); +CREATE NONCLUSTERED INDEX [Index_Due] ON {0} +( + [Due] +) +EXEC sp_releaseapplock @Resource = '{0}_lock'"; public static readonly string CreateQueueText; public static readonly string PeekText; public static readonly string PurgeBatchOfExpiredMessagesText; @@ -36,6 +69,7 @@ namespace NServiceBus.Transport.SQLServer } public class static SqlServerTransportSettingsExtensions { + public static NServiceBus.TransportExtensions CreateMessageBodyComputedColumn(this NServiceBus.TransportExtensions transportExtensions) { } public static NServiceBus.TransportExtensions DefaultSchema(this NServiceBus.TransportExtensions transportExtensions, string schemaName) { } [System.ObsoleteAttribute("Multi-instance mode has been deprecated. Use Transport Bridge and/or multi-catalo" + "g addressing instead. The member currently throws a NotImplementedException. Wil" + diff --git a/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs b/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs index eeccc0ef3..a796c6b39 100644 --- a/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs +++ b/src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs @@ -12,6 +12,8 @@ class SettingsKeys public const string PurgeBatchSizeKey = "SqlServer.PurgeBatchSize"; public const string PurgeEnableKey = "SqlServer.PurgeExpiredOnStartup"; + public const string CreateMessageBodyComputedColumn = "SqlServer.CreateMessageBodyComputedColumn"; + public const string SchemaPropertyKey = "Schema"; public const string CatalogPropertyKey = "Catalog"; } diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryQueueCreator.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryQueueCreator.cs index e5ba7ce14..2ec9ae79c 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryQueueCreator.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedDeliveryQueueCreator.cs @@ -6,11 +6,12 @@ namespace NServiceBus.Transport.SQLServer class DelayedDeliveryQueueCreator : ICreateQueues { - public DelayedDeliveryQueueCreator(SqlConnectionFactory connectionFactory, ICreateQueues queueCreator, CanonicalQueueAddress delayedMessageTable) + public DelayedDeliveryQueueCreator(SqlConnectionFactory connectionFactory, ICreateQueues queueCreator, CanonicalQueueAddress delayedMessageTable, bool createMessageBodyComputedColumn = false) { this.connectionFactory = connectionFactory; this.queueCreator = queueCreator; this.delayedMessageTable = delayedMessageTable; + this.createMessageBodyComputedColumn = createMessageBodyComputedColumn; } public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity) @@ -19,12 +20,13 @@ public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string ide using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false)) using (var transaction = connection.BeginTransaction()) { - await CreateDelayedMessageQueue(delayedMessageTable, connection, transaction).ConfigureAwait(false); + await CreateDelayedMessageQueue(delayedMessageTable, connection, transaction, createMessageBodyComputedColumn).ConfigureAwait(false); + transaction.Commit(); } } - static async Task CreateDelayedMessageQueue(CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction) + static async Task CreateDelayedMessageQueue(CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction, bool createMessageBodyComputedColumn) { #pragma warning disable 618 var sql = string.Format(SqlConstants.CreateDelayedMessageStoreText, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName); @@ -36,10 +38,26 @@ static async Task CreateDelayedMessageQueue(CanonicalQueueAddress canonicalQueue { await command.ExecuteNonQueryAsync().ConfigureAwait(false); } + if (createMessageBodyComputedColumn) + { +#pragma warning disable 618 + var bodyStringSql = string.Format(SqlConstants.AddMessageBodyStringColumn, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName); +#pragma warning restore 618 + using (var command = new SqlCommand(bodyStringSql, connection, transaction) + { + CommandType = CommandType.Text + }) + { + await command.ExecuteNonQueryAsync().ConfigureAwait(false); + } + + } } + SqlConnectionFactory connectionFactory; ICreateQueues queueCreator; CanonicalQueueAddress delayedMessageTable; + bool createMessageBodyComputedColumn; } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs index a9d3ba23a..3d07e8f07 100644 --- a/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs +++ b/src/NServiceBus.SqlServer/DelayedDelivery/DelayedMessageHandler.cs @@ -56,11 +56,11 @@ async Task MoveMaturedDelayedMessages() } catch (SqlException e) when (cancellationToken.IsCancellationRequested) { - Logger.Debug("Exception thown while performing cancellation", e); + Logger.Debug("Exception thrown while performing cancellation", e); } catch (Exception e) { - Logger.Fatal("Exception thown while performing cancellation", e); + Logger.Fatal("Exception thrown while performing cancellation", e); } } } @@ -75,4 +75,4 @@ async Task MoveMaturedDelayedMessages() static ILog Logger = LogManager.GetLogger(); } -} \ No newline at end of file +} diff --git a/src/NServiceBus.SqlServer/Queuing/MessageRow.cs b/src/NServiceBus.SqlServer/Queuing/MessageRow.cs index d9337f426..844b7de2f 100644 --- a/src/NServiceBus.SqlServer/Queuing/MessageRow.cs +++ b/src/NServiceBus.SqlServer/Queuing/MessageRow.cs @@ -26,7 +26,6 @@ public static MessageRow From(Dictionary headers, byte[] body, T id = Guid.NewGuid(), correlationId = TryGetHeaderValue(headers, Headers.CorrelationId, s => s), replyToAddress = TryGetHeaderValue(headers, Headers.ReplyToAddress, s => s), - recoverable = true, timeToBeReceived = toBeReceived == TimeSpan.MaxValue ? null : (int?)toBeReceived.TotalMilliseconds, headers = DictionarySerializer.Serialize(headers), bodyBytes = body @@ -39,7 +38,6 @@ public void PrepareSendCommand(SqlCommand command) AddParameter(command, "Id", SqlDbType.UniqueIdentifier, id); AddParameter(command, "CorrelationId", SqlDbType.VarChar, correlationId); AddParameter(command, "ReplyToAddress", SqlDbType.VarChar, replyToAddress); - AddParameter(command, "Recoverable", SqlDbType.Bit, recoverable); AddParameter(command, "TimeToBeReceivedMs", SqlDbType.Int, timeToBeReceived); AddParameter(command, "Headers", SqlDbType.NVarChar, headers); AddParameter(command, "Body", SqlDbType.VarBinary, bodyBytes, -1); @@ -53,10 +51,9 @@ static async Task ReadRow(SqlDataReader dataReader) id = await dataReader.GetFieldValueAsync(0).ConfigureAwait(false), correlationId = await GetNullableAsync(dataReader, 1).ConfigureAwait(false), replyToAddress = await GetNullableAsync(dataReader, 2).ConfigureAwait(false), - recoverable = await dataReader.GetFieldValueAsync(3).ConfigureAwait(false), - expired = await dataReader.GetFieldValueAsync(4).ConfigureAwait(false) == 1, - headers = await GetHeaders(dataReader, 5).ConfigureAwait(false), - bodyBytes = await GetBody(dataReader, 6).ConfigureAwait(false) + expired = await dataReader.GetFieldValueAsync(3).ConfigureAwait(false) == 1, + headers = await GetHeaders(dataReader, 4).ConfigureAwait(false), + bodyBytes = await GetBody(dataReader, 5).ConfigureAwait(false) }; } @@ -139,7 +136,6 @@ void AddParameter(SqlCommand command, string name, SqlDbType type, object value, Guid id; string correlationId; string replyToAddress; - bool recoverable; bool expired; int? timeToBeReceived; string headers; diff --git a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs index f8d5d612d..90a41afe9 100644 --- a/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs +++ b/src/NServiceBus.SqlServer/Queuing/SqlConstants.cs @@ -29,7 +29,7 @@ public static class SqlConstants @Id, @CorrelationId, @ReplyToAddress, - @Recoverable, + 1, CASE WHEN @TimeToBeReceivedMs IS NOT NULL THEN DATEADD(ms, @TimeToBeReceivedMs, GETUTCDATE()) END, @Headers, @@ -70,7 +70,6 @@ DELETE FROM message deleted.Id, deleted.CorrelationId, deleted.ReplyToAddress, - deleted.Recoverable, CASE WHEN deleted.Expires IS NULL THEN 0 ELSE CASE WHEN deleted.Expires > GETUTCDATE() @@ -111,6 +110,40 @@ DELETE FROM message SELECT count(*) Id FROM {0} WITH (READPAST);"; + public static readonly string AddMessageBodyStringColumn = @" +IF NOT EXISTS ( + SELECT * + FROM {1}.sys.objects + WHERE object_id = OBJECT_ID(N'{0}') + AND type in (N'U')) +RETURN + +IF EXISTS ( + SELECT * + FROM {1}.sys.columns + WHERE object_id = OBJECT_ID(N'{0}') + AND name = 'BodyString' +) +RETURN + +EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive' + +IF EXISTS ( + SELECT * + FROM {1}.sys.columns + WHERE object_id = OBJECT_ID(N'{0}') + AND name = 'BodyString' +) +BEGIN + EXEC sp_releaseapplock @Resource = '{0}_lock' + RETURN +END + +ALTER TABLE {0} +ADD BodyString as cast(Body as nvarchar(max)); + +EXEC sp_releaseapplock @Resource = '{0}_lock'"; + public static readonly string CreateQueueText = @" IF EXISTS ( SELECT * @@ -161,7 +194,7 @@ Expires IS NOT NULL EXEC sp_releaseapplock @Resource = '{0}_lock'"; - internal const string CreateDelayedMessageStoreText = @" + public const string CreateDelayedMessageStoreText = @" IF EXISTS ( SELECT * FROM {1}.sys.objects diff --git a/src/NServiceBus.SqlServer/Receiving/MessagePump.cs b/src/NServiceBus.SqlServer/Receiving/MessagePump.cs index 66fea5a0b..4aafbe619 100644 --- a/src/NServiceBus.SqlServer/Receiving/MessagePump.cs +++ b/src/NServiceBus.SqlServer/Receiving/MessagePump.cs @@ -130,7 +130,10 @@ async Task InnerProcessMessages() // We cannot dispose this token source because of potential race conditions of concurrent receives var loopCancellationTokenSource = new CancellationTokenSource(); - for (var i = 0; i < messageCount; i++) + // If the receive circuit breaker is triggered start only one message processing task at a time. + var maximumConcurrentReceives = receiveCircuitBreaker.Triggered ? 1 : messageCount; + + for (var i = 0; i < maximumConcurrentReceives; i++) { if (loopCancellationTokenSource.Token.IsCancellationRequested) { diff --git a/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs b/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs index ae15fe074..eddc57eb6 100644 --- a/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs +++ b/src/NServiceBus.SqlServer/Receiving/QueueCreator.cs @@ -8,10 +8,11 @@ namespace NServiceBus.Transport.SQLServer class QueueCreator : ICreateQueues { - public QueueCreator(SqlConnectionFactory connectionFactory, QueueAddressTranslator addressTranslator) + public QueueCreator(SqlConnectionFactory connectionFactory, QueueAddressTranslator addressTranslator, bool createMessageBodyColumn = false) { this.connectionFactory = connectionFactory; this.addressTranslator = addressTranslator; + this.createMessageBodyColumn = createMessageBodyColumn; } public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity) @@ -21,18 +22,18 @@ public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string ide { foreach (var receivingAddress in queueBindings.ReceivingAddresses) { - await CreateQueue(addressTranslator.Parse(receivingAddress), connection, transaction).ConfigureAwait(false); + await CreateQueue(addressTranslator.Parse(receivingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false); } foreach (var sendingAddress in queueBindings.SendingAddresses) { - await CreateQueue(addressTranslator.Parse(sendingAddress), connection, transaction).ConfigureAwait(false); + await CreateQueue(addressTranslator.Parse(sendingAddress), connection, transaction, createMessageBodyColumn).ConfigureAwait(false); } transaction.Commit(); } } - static async Task CreateQueue(CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction) + static async Task CreateQueue(CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction, bool createMessageBodyColumn) { var sql = string.Format(SqlConstants.CreateQueueText, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName); using (var command = new SqlCommand(sql, connection, transaction) @@ -42,10 +43,23 @@ static async Task CreateQueue(CanonicalQueueAddress canonicalQueueAddress, SqlCo { await command.ExecuteNonQueryAsync().ConfigureAwait(false); } + + if (createMessageBodyColumn) + { + var bodyStringSql = string.Format(SqlConstants.AddMessageBodyStringColumn, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName); + using (var command = new SqlCommand(bodyStringSql, connection, transaction) + { + CommandType = CommandType.Text + }) + { + await command.ExecuteNonQueryAsync().ConfigureAwait(false); + } + } } SqlConnectionFactory connectionFactory; QueueAddressTranslator addressTranslator; + bool createMessageBodyColumn; } } #pragma warning restore 618 \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/Receiving/RepeatedFailuresOvertimeCurcuitBreaker.cs b/src/NServiceBus.SqlServer/Receiving/RepeatedFailuresOvertimeCurcuitBreaker.cs index 0899d0f2b..af1a03705 100644 --- a/src/NServiceBus.SqlServer/Receiving/RepeatedFailuresOvertimeCurcuitBreaker.cs +++ b/src/NServiceBus.SqlServer/Receiving/RepeatedFailuresOvertimeCurcuitBreaker.cs @@ -16,6 +16,8 @@ public RepeatedFailuresOverTimeCircuitBreaker(string name, TimeSpan timeToWaitBe timer = new Timer(CircuitBreakerTriggered); } + public bool Triggered => triggered; + public void Dispose() { //Injected @@ -31,6 +33,7 @@ public void Success() } timer.Change(Timeout.Infinite, Timeout.Infinite); + triggered = false; Logger.InfoFormat("The circuit breaker for {0} is now disarmed", name); } @@ -45,26 +48,32 @@ public Task Failure(Exception exception) Logger.WarnFormat("The circuit breaker for {0} is now in the armed state", name); } - return Task.Delay(TimeSpan.FromSeconds(1)); + var delay = Triggered ? ThrottledDelay : NonThrottledDelay; + return Task.Delay(delay); } void CircuitBreakerTriggered(object state) { if (Interlocked.Read(ref failureCount) > 0) { + triggered = true; Logger.WarnFormat("The circuit breaker for {0} will now be triggered", name); triggerAction(lastException); } } + string name; TimeSpan timeToWaitBeforeTriggering; Timer timer; Action triggerAction; long failureCount; Exception lastException; + volatile bool triggered; static TimeSpan NoPeriodicTriggering = TimeSpan.FromMilliseconds(-1); static ILog Logger = LogManager.GetLogger(); + static TimeSpan NonThrottledDelay = TimeSpan.FromSeconds(1); + static TimeSpan ThrottledDelay = TimeSpan.FromSeconds(10); } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SendOptionsExtensions.cs b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs new file mode 100644 index 000000000..678f922cb --- /dev/null +++ b/src/NServiceBus.SqlServer/SendOptionsExtensions.cs @@ -0,0 +1,27 @@ +namespace NServiceBus.Transport.SQLServer +{ + using System.Data.SqlClient; + using Extensibility; + + /// + /// Adds transport specific settings to SendOptions + /// + public static class SendOptionsExtensions + { + /// + /// Enables providing SqlConnection and SqlTransaction instances that will be used by send operations. The same connection and transaction + /// can be used in more than one send operation. + /// + /// The to extend. + /// Open SqlConnection instance to be used by send operations. + /// SqlTransaction instance that will be used by any operations perfromed by the transport. + public static void UseCustomSqlConnectionAndTransaction(this SendOptions options, SqlConnection connection, SqlTransaction transaction) + { + var transportTransaction = new TransportTransaction(); + transportTransaction.Set(connection); + transportTransaction.Set(transaction); + + options.GetExtensions().Set(transportTransaction); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs index 9fe643ee0..0a7764a00 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportInfrastructure.cs @@ -23,6 +23,12 @@ internal SqlServerTransportInfrastructure(QueueAddressTranslator addressTranslat schemaAndCatalogSettings = settings.GetOrCreate(); delayedDeliverySettings = settings.GetOrDefault(); var timeoutManagerFeatureDisabled = settings.GetOrDefault(typeof(TimeoutManager).FullName) == FeatureState.Disabled; + + diagnostics.Add("NServiceBus.Transport.SqlServer.TimeoutManager", new + { + FeatureEnabled = !timeoutManagerFeatureDisabled + }); + if (delayedDeliverySettings != null && timeoutManagerFeatureDisabled) { delayedDeliverySettings.DisableTimeoutManagerCompatibility(); @@ -53,16 +59,30 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() scopeOptions = new SqlScopeOptions(); } + settings.TryGet(out TransportTransactionMode transactionMode); + diagnostics.Add("NServiceBus.Transport.SqlServer.Transactions", new + { + TransactionMode = transactionMode, + scopeOptions.TransactionOptions.IsolationLevel, + scopeOptions.TransactionOptions.Timeout + }); + if (!settings.TryGet(SettingsKeys.TimeToWaitBeforeTriggering, out TimeSpan waitTimeCircuitBreaker)) { waitTimeCircuitBreaker = TimeSpan.FromSeconds(30); } + diagnostics.Add("NServiceBus.Transport.SqlServer.CircuitBreaker", new + { + TimeToWaitBeforeTriggering = waitTimeCircuitBreaker + }); if (!settings.TryGet(out QueuePeekerOptions queuePeekerOptions)) { queuePeekerOptions = new QueuePeekerOptions(); } + var createMessageBodyComputedColumn = settings.GetOrDefault(SettingsKeys.CreateMessageBodyComputedColumn); + var connectionFactory = CreateConnectionFactory(); Func receiveStrategyFactory = @@ -93,12 +113,12 @@ public override TransportReceiveInfrastructure ConfigureReceiveInfrastructure() }, () => { - var creator = new QueueCreator(connectionFactory, addressTranslator); + var creator = new QueueCreator(connectionFactory, addressTranslator, createMessageBodyComputedColumn); if (delayedDeliverySettings == null) { return creator; } - return new DelayedDeliveryQueueCreator(connectionFactory, creator, delayedMessageStore); + return new DelayedDeliveryQueueCreator(connectionFactory, creator, delayedMessageStore, createMessageBodyComputedColumn); }, () => CheckForAmbientTransactionEnlistmentSupport(connectionFactory, scopeOptions.TransactionOptions)); } @@ -138,6 +158,12 @@ ExpiredMessagesPurger CreateExpiredMessagesPurger(SqlConnectionFactory connectio var purgeBatchSize = settings.HasSetting(SettingsKeys.PurgeBatchSizeKey) ? settings.Get(SettingsKeys.PurgeBatchSizeKey) : null; var enable = settings.GetOrDefault(SettingsKeys.PurgeEnableKey); + diagnostics.Add("NServiceBus.Transport.SqlServer.ExpiredMessagesPurger", new + { + FeatureEnabled = enable, + BatchSize = purgeBatchSize + }); + return new ExpiredMessagesPurger(_ => connectionFactory.OpenNewConnection(), purgeBatchSize, enable); } @@ -235,10 +261,29 @@ CanonicalQueueAddress GetDelayedQueueTableName() public override Task Start() { + foreach (var diagnosticSection in diagnostics) + { + settings.AddStartupDiagnosticsSection(diagnosticSection.Key, diagnosticSection.Value); + } + if (delayedDeliverySettings == null) { + settings.AddStartupDiagnosticsSection("NServiceBus.Transport.SqlServer.DelayedDelivery", new + { + Native = false + }); return Task.FromResult(0); } + + settings.AddStartupDiagnosticsSection("NServiceBus.Transport.SqlServer.DelayedDelivery", new + { + Native = true, + delayedDeliverySettings.Suffix, + delayedDeliverySettings.Interval, + BatchSize = delayedDeliverySettings.MatureBatchSize, + TimoutManager = delayedDeliverySettings.TimeoutManagerDisabled ? "disabled" : "enabled" + }); + var delayedMessageTable = CreateDelayedMessageTable(); delayedMessageHandler = new DelayedMessageHandler(delayedMessageTable, CreateConnectionFactory(), delayedDeliverySettings.Interval, delayedDeliverySettings.MatureBatchSize); delayedMessageHandler.Start(); @@ -287,5 +332,6 @@ public override string MakeCanonicalForm(string transportAddress) EndpointSchemaAndCatalogSettings schemaAndCatalogSettings; DelayedMessageHandler delayedMessageHandler; DelayedDeliverySettings delayedDeliverySettings; + Dictionary diagnostics = new Dictionary(); } } \ No newline at end of file diff --git a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs index be30b1ace..aae6367e7 100644 --- a/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs +++ b/src/NServiceBus.SqlServer/SqlServerTransportSettingsExtensions.cs @@ -6,6 +6,7 @@ using System.Transactions; using Configuration.AdvancedExtensibility; using Features; + using Logging; /// /// Adds extra configuration for the Sql Server transport. @@ -79,6 +80,7 @@ public static TransportExtensions UseCatalogForEndpoint(this var settings = transportExtensions.GetSettings(); settings.Set(SettingsKeys.MultiCatalogEnabled, true); + var schemasConfiguration = settings.GetOrCreate(); schemasConfiguration.SpecifyCatalog(endpointName, catalog); @@ -102,6 +104,7 @@ public static TransportExtensions UseCatalogForQueue(this Tr var settings = transportExtensions.GetSettings(); settings.Set(SettingsKeys.MultiCatalogEnabled, true); + var schemasConfiguration = settings.GetOrCreate(); schemasConfiguration.SpecifyCatalog(queueName, catalog); @@ -122,6 +125,7 @@ public static TransportExtensions TimeToWaitBeforeTriggering Guard.AgainstNegativeAndZero(nameof(waitTime), waitTime); transportExtensions.GetSettings().Set(SettingsKeys.TimeToWaitBeforeTriggering, waitTime); + return transportExtensions; } @@ -151,7 +155,13 @@ public static TransportExtensions TransactionScopeOptions(th { Guard.AgainstNull(nameof(transportExtensions), transportExtensions); + if (isolationLevel != IsolationLevel.ReadCommitted && isolationLevel != IsolationLevel.RepeatableRead) + { + Logger.Warn("TransactionScope should be only used with either the ReadCommited or the RepeatableRead isolation level."); + } + transportExtensions.GetSettings().Set(new SqlScopeOptions(timeout, isolationLevel)); + return transportExtensions; } @@ -199,6 +209,17 @@ public static TransportExtensions PurgeExpiredMessagesOnStar return transportExtensions; } + /// + /// Instructs the transport to create a computed column for inspecting message body contents. + /// + /// The to extend. + public static TransportExtensions CreateMessageBodyComputedColumn(this TransportExtensions transportExtensions) + { + transportExtensions.GetSettings().Set(SettingsKeys.CreateMessageBodyComputedColumn, true); + + return transportExtensions; + } + /// /// Enables multi-instance mode. /// @@ -212,5 +233,7 @@ public static TransportExtensions EnableLegacyMultiInstanceM { throw new NotImplementedException(); } + + static ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/Scripts/Reset-Database.sql b/src/Scripts/Reset-Database.sql index 37f104dc3..57b754dc7 100644 --- a/src/Scripts/Reset-Database.sql +++ b/src/Scripts/Reset-Database.sql @@ -27,7 +27,7 @@ BEGIN EXEC (@SQL) PRINT 'Dropped View: ' + @schema + '.' + @name SELECT @name = NULL - SELECT TOP 1 @name = sys.objects.name, @schema = sys.schemas.name FROM sys.objects INNER JOIN sys.schemas ON sys.objects.schema_id = sys.schemas.schema_id WHERE [type] = 'V' ORDER BY sys.objects.name + SELECT TOP 1 @name = sys.objects.name, @schema = sys.schemas.name FROM sys.objects INNER JOIN sys.schemas ON sys.objects.schema_id = sys.schemas.schema_id WHERE [type] = 'V' ORDER BY sys.objects.name END GO @@ -55,7 +55,7 @@ DECLARE @SQL NVARCHAR(254) SELECT TOP 1 @name = TABLE_NAME, @schema = CONSTRAINT_SCHEMA, @constraint = CONSTRAINT_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE constraint_catalog=DB_NAME() AND CONSTRAINT_TYPE = 'FOREIGN KEY' ORDER BY TABLE_NAME WHILE @name is not null -BEGIN +BEGIN SELECT @SQL = 'ALTER TABLE ' + QUOTENAME(@schema) + '.' + QUOTENAME(RTRIM(@name)) +' DROP CONSTRAINT ' + QUOTENAME(RTRIM(@constraint)) EXEC (@SQL) PRINT 'Dropped FK Constraint: ' + @constraint + ' on ' + @schema + '.' + @name @@ -73,12 +73,12 @@ DECLARE @SQL NVARCHAR(254) SELECT TOP 1 @name = TABLE_NAME, @schema = CONSTRAINT_SCHEMA, @constraint = CONSTRAINT_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE constraint_catalog=DB_NAME() AND CONSTRAINT_TYPE = 'PRIMARY KEY' ORDER BY TABLE_NAME WHILE @name IS NOT NULL -BEGIN +BEGIN SELECT @SQL = 'ALTER TABLE ' + QUOTENAME(@schema) + '.' + QUOTENAME(RTRIM(@name)) +' DROP CONSTRAINT ' + QUOTENAME(RTRIM(@constraint)) EXEC (@SQL) PRINT 'Dropped PK Constraint: ' + @constraint + ' on ' + @schema + '.' + @name SELECT @name = NULL - SELECT TOP 1 @name = TABLE_NAME, @schema = CONSTRAINT_SCHEMA, @constraint = CONSTRAINT_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE constraint_catalog=DB_NAME() AND CONSTRAINT_TYPE = 'PRIMARY KEY' ORDER BY TABLE_NAME + SELECT TOP 1 @name = TABLE_NAME, @schema = CONSTRAINT_SCHEMA, @constraint = CONSTRAINT_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE constraint_catalog=DB_NAME() AND CONSTRAINT_TYPE = 'PRIMARY KEY' ORDER BY TABLE_NAME END GO