Skip to content

Commit

Permalink
Merge pull request #451 from Particular/release-4.1.0
Browse files Browse the repository at this point in the history
Release 4.1.0
  • Loading branch information
tmasternak authored Jun 20, 2018
2 parents 1aa1d48 + f52a303 commit b210908
Show file tree
Hide file tree
Showing 15 changed files with 396 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
namespace NServiceBus.SqlServer.AcceptanceTests
{
using System;
using System.Data.SqlClient;
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;
using Transport.SQLServer;

public class When_passing_custom_transaction_via_sendoptions : NServiceBusAcceptanceTest
{
static string ConnectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString");

[Test]
public async Task Should_be_used_by_send_operations()
{
var context = await Scenario.Define<MyContext>()
.WithEndpoint<AnEndpoint>(c => c.When(async bus =>
{
using (var connection = new SqlConnection(ConnectionString))
{
connection.Open();
using (var rolledbackTransaction = connection.BeginTransaction())
{
var options = new SendOptions();
options.UseCustomSqlConnectionAndTransaction(connection, rolledbackTransaction);
await bus.Send(new FromRolledbackTransaction(), options);
rolledbackTransaction.Rollback();
}
using (var commitedTransaction = connection.BeginTransaction())
{
var options = new SendOptions();
options.UseCustomSqlConnectionAndTransaction(connection, commitedTransaction);
await bus.Send(new FromCommitedTransaction(), options);
commitedTransaction.Commit();
}
}
}))
.Done(c => c.ReceivedFromCommitedTransaction)
.Run(TimeSpan.FromMinutes(1));

Assert.IsFalse(context.ReceivedFromRolledbackTransaction);
}

class FromCommitedTransaction : IMessage
{
}

class FromRolledbackTransaction : IMessage
{
}

class MyContext : ScenarioContext
{
public bool ReceivedFromCommitedTransaction { get; set; }
public bool ReceivedFromRolledbackTransaction { get; set; }
}

class AnEndpoint : EndpointConfigurationBuilder
{
public AnEndpoint()
{
EndpointSetup<DefaultServer>(c =>
{
c.LimitMessageProcessingConcurrencyTo(1);
var routing = c.ConfigureTransport().Routing();
var anEndpointName = AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(AnEndpoint));
routing.RouteToEndpoint(typeof(FromCommitedTransaction), anEndpointName);
routing.RouteToEndpoint(typeof(FromRolledbackTransaction), anEndpointName);
});
}

class ReplyHandler : IHandleMessages<FromRolledbackTransaction>,
IHandleMessages<FromCommitedTransaction>
{
public MyContext Context { get; set; }

public Task Handle(FromRolledbackTransaction message, IMessageHandlerContext context)
{
Context.ReceivedFromRolledbackTransaction = true;

return Task.FromResult(0);
}

public Task Handle(FromCommitedTransaction message, IMessageHandlerContext context)
{
Context.ReceivedFromCommitedTransaction = true;

return Task.FromResult(0);
}
}
}


}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
namespace NServiceBus.SqlServer.AcceptanceTests
{
using System.Threading.Tasks;
using AcceptanceTesting;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;
using Transport.SQLServer;

public class When_using_computed_message_body_column : NServiceBusAcceptanceTest
{
[Test]
public async Task Simple_send_is_received()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(b => b.When((session, c) => session.SendLocal(new MyMessage())))
.Done(c => c.WasCalled)
.Run();

Assert.IsTrue(context.WasCalled);
}
class Context : ScenarioContext
{
public bool WasCalled { get; set; }
}

class Endpoint : EndpointConfigurationBuilder
{
public Endpoint()
{
EndpointSetup<DefaultServer>(config =>
{
var transportConfig = config.UseTransport<SqlServerTransport>();
transportConfig.CreateMessageBodyComputedColumn();
});
}

public class MyMessageHandler : IHandleMessages<MyMessage>
{
public Context Context { get; set; }

public Task Handle(MyMessage message, IMessageHandlerContext context)
{
Context.WasCalled = true;
return Task.FromResult(0);
}
}
}

public class MyMessage : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,44 @@ namespace NServiceBus.Transport.SQLServer
public void ProcessingInterval(System.TimeSpan interval) { }
public void TableSuffix(string suffix) { }
}
public class static SendOptionsExtensions
{
public static void UseCustomSqlConnectionAndTransaction(this NServiceBus.SendOptions options, System.Data.SqlClient.SqlConnection connection, System.Data.SqlClient.SqlTransaction transaction) { }
}
[System.ObsoleteAttribute("Not for public use.")]
public class static SqlConstants
{
public static readonly string AddMessageBodyStringColumn;
public static readonly string CheckHeadersColumnType;
public static readonly string CheckIfExpiresIndexIsPresent;
public const string CreateDelayedMessageStoreText = @"
IF EXISTS (
SELECT *
FROM {1}.sys.objects
WHERE object_id = OBJECT_ID(N'{0}')
AND type in (N'U'))
RETURN
EXEC sp_getapplock @Resource = '{0}_lock', @LockMode = 'Exclusive'
IF EXISTS (
SELECT *
FROM {1}.sys.objects
WHERE object_id = OBJECT_ID(N'{0}')
AND type in (N'U'))
BEGIN
EXEC sp_releaseapplock @Resource = '{0}_lock'
RETURN
END
CREATE TABLE {0} (
Headers nvarchar(max) NOT NULL,
Body varbinary(max),
Due datetime NOT NULL,
RowVersion bigint IDENTITY(1,1) NOT NULL
);
CREATE NONCLUSTERED INDEX [Index_Due] ON {0}
(
[Due]
)
EXEC sp_releaseapplock @Resource = '{0}_lock'";
public static readonly string CreateQueueText;
public static readonly string PeekText;
public static readonly string PurgeBatchOfExpiredMessagesText;
Expand All @@ -36,6 +69,7 @@ namespace NServiceBus.Transport.SQLServer
}
public class static SqlServerTransportSettingsExtensions
{
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> CreateMessageBodyComputedColumn(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions) { }
public static NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> DefaultSchema(this NServiceBus.TransportExtensions<NServiceBus.SqlServerTransport> transportExtensions, string schemaName) { }
[System.ObsoleteAttribute("Multi-instance mode has been deprecated. Use Transport Bridge and/or multi-catalo" +
"g addressing instead. The member currently throws a NotImplementedException. Wil" +
Expand Down
2 changes: 2 additions & 0 deletions src/NServiceBus.SqlServer/Configuration/SettingsKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ class SettingsKeys
public const string PurgeBatchSizeKey = "SqlServer.PurgeBatchSize";
public const string PurgeEnableKey = "SqlServer.PurgeExpiredOnStartup";

public const string CreateMessageBodyComputedColumn = "SqlServer.CreateMessageBodyComputedColumn";

public const string SchemaPropertyKey = "Schema";
public const string CatalogPropertyKey = "Catalog";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ namespace NServiceBus.Transport.SQLServer

class DelayedDeliveryQueueCreator : ICreateQueues
{
public DelayedDeliveryQueueCreator(SqlConnectionFactory connectionFactory, ICreateQueues queueCreator, CanonicalQueueAddress delayedMessageTable)
public DelayedDeliveryQueueCreator(SqlConnectionFactory connectionFactory, ICreateQueues queueCreator, CanonicalQueueAddress delayedMessageTable, bool createMessageBodyComputedColumn = false)
{
this.connectionFactory = connectionFactory;
this.queueCreator = queueCreator;
this.delayedMessageTable = delayedMessageTable;
this.createMessageBodyComputedColumn = createMessageBodyComputedColumn;
}

public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string identity)
Expand All @@ -19,12 +20,13 @@ public async Task CreateQueueIfNecessary(QueueBindings queueBindings, string ide
using (var connection = await connectionFactory.OpenNewConnection().ConfigureAwait(false))
using (var transaction = connection.BeginTransaction())
{
await CreateDelayedMessageQueue(delayedMessageTable, connection, transaction).ConfigureAwait(false);
await CreateDelayedMessageQueue(delayedMessageTable, connection, transaction, createMessageBodyComputedColumn).ConfigureAwait(false);

transaction.Commit();
}
}

static async Task CreateDelayedMessageQueue(CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction)
static async Task CreateDelayedMessageQueue(CanonicalQueueAddress canonicalQueueAddress, SqlConnection connection, SqlTransaction transaction, bool createMessageBodyComputedColumn)
{
#pragma warning disable 618
var sql = string.Format(SqlConstants.CreateDelayedMessageStoreText, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName);
Expand All @@ -36,10 +38,26 @@ static async Task CreateDelayedMessageQueue(CanonicalQueueAddress canonicalQueue
{
await command.ExecuteNonQueryAsync().ConfigureAwait(false);
}
if (createMessageBodyComputedColumn)
{
#pragma warning disable 618
var bodyStringSql = string.Format(SqlConstants.AddMessageBodyStringColumn, canonicalQueueAddress.QualifiedTableName, canonicalQueueAddress.QuotedCatalogName);
#pragma warning restore 618
using (var command = new SqlCommand(bodyStringSql, connection, transaction)
{
CommandType = CommandType.Text
})
{
await command.ExecuteNonQueryAsync().ConfigureAwait(false);
}

}
}


SqlConnectionFactory connectionFactory;
ICreateQueues queueCreator;
CanonicalQueueAddress delayedMessageTable;
bool createMessageBodyComputedColumn;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ async Task MoveMaturedDelayedMessages()
}
catch (SqlException e) when (cancellationToken.IsCancellationRequested)
{
Logger.Debug("Exception thown while performing cancellation", e);
Logger.Debug("Exception thrown while performing cancellation", e);
}
catch (Exception e)
{
Logger.Fatal("Exception thown while performing cancellation", e);
Logger.Fatal("Exception thrown while performing cancellation", e);
}
}
}
Expand All @@ -75,4 +75,4 @@ async Task MoveMaturedDelayedMessages()

static ILog Logger = LogManager.GetLogger<DelayedMessageHandler>();
}
}
}
10 changes: 3 additions & 7 deletions src/NServiceBus.SqlServer/Queuing/MessageRow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public static MessageRow From(Dictionary<string, string> headers, byte[] body, T
id = Guid.NewGuid(),
correlationId = TryGetHeaderValue(headers, Headers.CorrelationId, s => s),
replyToAddress = TryGetHeaderValue(headers, Headers.ReplyToAddress, s => s),
recoverable = true,
timeToBeReceived = toBeReceived == TimeSpan.MaxValue ? null : (int?)toBeReceived.TotalMilliseconds,
headers = DictionarySerializer.Serialize(headers),
bodyBytes = body
Expand All @@ -39,7 +38,6 @@ public void PrepareSendCommand(SqlCommand command)
AddParameter(command, "Id", SqlDbType.UniqueIdentifier, id);
AddParameter(command, "CorrelationId", SqlDbType.VarChar, correlationId);
AddParameter(command, "ReplyToAddress", SqlDbType.VarChar, replyToAddress);
AddParameter(command, "Recoverable", SqlDbType.Bit, recoverable);
AddParameter(command, "TimeToBeReceivedMs", SqlDbType.Int, timeToBeReceived);
AddParameter(command, "Headers", SqlDbType.NVarChar, headers);
AddParameter(command, "Body", SqlDbType.VarBinary, bodyBytes, -1);
Expand All @@ -53,10 +51,9 @@ static async Task<MessageRow> ReadRow(SqlDataReader dataReader)
id = await dataReader.GetFieldValueAsync<Guid>(0).ConfigureAwait(false),
correlationId = await GetNullableAsync<string>(dataReader, 1).ConfigureAwait(false),
replyToAddress = await GetNullableAsync<string>(dataReader, 2).ConfigureAwait(false),
recoverable = await dataReader.GetFieldValueAsync<bool>(3).ConfigureAwait(false),
expired = await dataReader.GetFieldValueAsync<int>(4).ConfigureAwait(false) == 1,
headers = await GetHeaders(dataReader, 5).ConfigureAwait(false),
bodyBytes = await GetBody(dataReader, 6).ConfigureAwait(false)
expired = await dataReader.GetFieldValueAsync<int>(3).ConfigureAwait(false) == 1,
headers = await GetHeaders(dataReader, 4).ConfigureAwait(false),
bodyBytes = await GetBody(dataReader, 5).ConfigureAwait(false)
};
}

Expand Down Expand Up @@ -139,7 +136,6 @@ void AddParameter(SqlCommand command, string name, SqlDbType type, object value,
Guid id;
string correlationId;
string replyToAddress;
bool recoverable;
bool expired;
int? timeToBeReceived;
string headers;
Expand Down
Loading

0 comments on commit b210908

Please sign in to comment.