From 85524a94b8314d708fa9d9a0b2459eafdc88dc4f Mon Sep 17 00:00:00 2001 From: Daniel Marbach Date: Thu, 23 Mar 2023 14:44:50 +0100 Subject: [PATCH] Processing a control message causes the outbox to throw a null reference exception (#549) (#550) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Processing a control message causes the outbox to throw a null reference exception * Update src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs --------- Co-authored-by: Andreas Öhlund --- .../When_using_outbox_control_message.cs | 82 +++++++++++++++++++ .../Outbox/OutboxBehavior.cs | 4 +- .../Outbox/OutboxPersister.cs | 16 +++- .../Outbox/SetAsDispatchedHolderExtensions.cs | 19 +++++ ..._no_container_information_is_configured.cs | 59 +++++++++++++ .../When_no_partition_key_is_configured.cs | 59 +++++++++++++ 6 files changed, 236 insertions(+), 3 deletions(-) create mode 100644 src/NServiceBus.Persistence.CosmosDB.PhysicalOutbox.AcceptanceTests/When_using_outbox_control_message.cs create mode 100644 src/NServiceBus.Persistence.CosmosDB/Outbox/SetAsDispatchedHolderExtensions.cs create mode 100644 src/SharedAcceptanceTests.RequirePartitionKey/When_no_container_information_is_configured.cs create mode 100644 src/SharedAcceptanceTests.RequirePartitionKey/When_no_partition_key_is_configured.cs diff --git a/src/NServiceBus.Persistence.CosmosDB.PhysicalOutbox.AcceptanceTests/When_using_outbox_control_message.cs b/src/NServiceBus.Persistence.CosmosDB.PhysicalOutbox.AcceptanceTests/When_using_outbox_control_message.cs new file mode 100644 index 00000000..099384dd --- /dev/null +++ b/src/NServiceBus.Persistence.CosmosDB.PhysicalOutbox.AcceptanceTests/When_using_outbox_control_message.cs @@ -0,0 +1,82 @@ +namespace NServiceBus.AcceptanceTests +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.AcceptanceTesting.Support; + using NServiceBus.Features; + using NServiceBus.Pipeline; + using NServiceBus.Routing; + using NServiceBus.Transport; + using NServiceBus.Unicast.Transport; + using NUnit.Framework; + + [TestFixture] + public class When_using_outbox_control_message : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_work() + { + var runSettings = new RunSettings(); + runSettings.DoNotRegisterDefaultPartitionKeyProvider(); + + var context = await Scenario.Define() + .WithEndpoint() + .Done(c => c.ProcessedControlMessage) + .Run(runSettings) + .ConfigureAwait(false); + + Assert.True(context.ProcessedControlMessage); + } + + public class Context : ScenarioContext + { + public bool ProcessedControlMessage { get; set; } + } + + public class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() => + EndpointSetup((config, runDescriptor) => + { + config.EnableOutbox(); + config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + config.RegisterStartupTask(); + config.Pipeline.Register(new ControlMessageBehavior(runDescriptor.ScenarioContext as Context), "Checks that the control message was processed successfully"); + }); + + class ControlMessageSender : FeatureStartupTask + { + public ControlMessageSender(IMessageDispatcher dispatcher) => this.dispatcher = dispatcher; + + protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken = default) + { + var controlMessage = ControlMessageFactory.Create(MessageIntent.Subscribe); + var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(Endpoint)))); + + return dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), cancellationToken); + } + + protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask; + + readonly IMessageDispatcher dispatcher; + } + + class ControlMessageBehavior : Behavior + { + public ControlMessageBehavior(Context testContext) => this.testContext = testContext; + + public override async Task Invoke(IIncomingPhysicalMessageContext context, Func next) + { + await next(); + + testContext.ProcessedControlMessage = true; + } + + readonly Context testContext; + } + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs b/src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs index a24916ca..66d3dfcb 100644 --- a/src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs +++ b/src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs @@ -62,7 +62,7 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func(out var partitionKey)) { - throw new Exception("For the outbox to work the following information must be provided at latest up to the incoming physical or logical message stage. A partition key via `context.Extensions.Set(yourPartitionKey)`."); + throw new Exception("For the outbox to work a partition key must be provided at latest up to the incoming physical or logical message stage. Set one via '{nameof(CosmosPersistenceConfig.TransactionInformation)}'."); } var containerHolder = containerHolderResolver.ResolveAndSetIfAvailable(context.Extensions); @@ -74,6 +74,8 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func Get(string messageId, ContextBag context, Cance if (!context.TryGet(out var partitionKey)) { - // we return null here to enable outbox work at logical stage - return null; + // because of the transactional session we cannot assume the incoming message is always present + if (!context.TryGet(out var incomingMessage) || + !incomingMessage.Headers.ContainsKey(Headers.ControlMessageHeader)) + { + // we return null here to enable outbox work at logical stage + return null; + } + + partitionKey = new PartitionKey(messageId); + context.Set(partitionKey); } + setAsDispatchedHolder.ThrowIfContainerIsNotSet(); setAsDispatchedHolder.PartitionKey = partitionKey; var outboxRecord = await setAsDispatchedHolder.ContainerHolder.Container.ReadOutboxRecord(messageId, partitionKey, serializer, context, cancellationToken: cancellationToken) @@ -74,6 +85,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context public async Task SetAsDispatched(string messageId, ContextBag context, CancellationToken cancellationToken = default) { var setAsDispatchedHolder = context.Get(); + setAsDispatchedHolder.ThrowIfContainerIsNotSet(); var partitionKey = setAsDispatchedHolder.PartitionKey; var containerHolder = setAsDispatchedHolder.ContainerHolder; diff --git a/src/NServiceBus.Persistence.CosmosDB/Outbox/SetAsDispatchedHolderExtensions.cs b/src/NServiceBus.Persistence.CosmosDB/Outbox/SetAsDispatchedHolderExtensions.cs new file mode 100644 index 00000000..e1f404ea --- /dev/null +++ b/src/NServiceBus.Persistence.CosmosDB/Outbox/SetAsDispatchedHolderExtensions.cs @@ -0,0 +1,19 @@ +#nullable enable + +namespace NServiceBus.Persistence.CosmosDB +{ + using System; + + static class SetAsDispatchedHolderExtensions + { + public static void ThrowIfContainerIsNotSet(this SetAsDispatchedHolder setAsDispatchedHolder) + { + if (setAsDispatchedHolder.ContainerHolder is { Container: not null }) + { + return; + } + + throw new Exception($"For the outbox to work a container must be configured. Either configure a default one using '{nameof(CosmosPersistenceConfig.DefaultContainer)}' or set one via '{nameof(CosmosPersistenceConfig.TransactionInformation)}'."); + } + } +} \ No newline at end of file diff --git a/src/SharedAcceptanceTests.RequirePartitionKey/When_no_container_information_is_configured.cs b/src/SharedAcceptanceTests.RequirePartitionKey/When_no_container_information_is_configured.cs new file mode 100644 index 00000000..3b7b753c --- /dev/null +++ b/src/SharedAcceptanceTests.RequirePartitionKey/When_no_container_information_is_configured.cs @@ -0,0 +1,59 @@ +namespace NServiceBus.AcceptanceTests +{ + using System.Linq; + using System.Threading.Tasks; + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.AcceptanceTesting.Support; + using NUnit.Framework; + + [TestFixture] + public class When_no_container_information_is_configured : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_throw_meaningful_exception() + { + var runSettings = new RunSettings(); + runSettings.DoNotRegisterDefaultContainerInformationProvider(); + + var context = await Scenario.Define() + .WithEndpoint(b => + { + b.DoNotFailOnErrorMessages(); + b.When(s => s.SendLocal(new MyMessage())); + }) + .Done(c => c.FailedMessages.Any()) + .Run(runSettings); + + var failure = context.FailedMessages.FirstOrDefault() + .Value.First(); + + Assert.That(failure.Exception.Message, Does.Contain("container")); + } + + class Context : ScenarioContext + { + } + + class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() => + EndpointSetup((config, runDescriptor) => + { + config.EnableOutbox(); + config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + }); + + class MyMessageHandler : IHandleMessages + { + public Task Handle(MyMessage message, IMessageHandlerContext context) + { + Assert.Fail("Should not be called"); + return Task.CompletedTask; + } + } + } + + class MyMessage : IMessage { } + } +} \ No newline at end of file diff --git a/src/SharedAcceptanceTests.RequirePartitionKey/When_no_partition_key_is_configured.cs b/src/SharedAcceptanceTests.RequirePartitionKey/When_no_partition_key_is_configured.cs new file mode 100644 index 00000000..29e9b86d --- /dev/null +++ b/src/SharedAcceptanceTests.RequirePartitionKey/When_no_partition_key_is_configured.cs @@ -0,0 +1,59 @@ +namespace NServiceBus.AcceptanceTests +{ + using System.Linq; + using System.Threading.Tasks; + using AcceptanceTesting; + using EndpointTemplates; + using NServiceBus.AcceptanceTesting.Support; + using NUnit.Framework; + + [TestFixture] + public class When_no_partition_key_is_configured : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_throw_meaningful_exception() + { + var runSettings = new RunSettings(); + runSettings.DoNotRegisterDefaultPartitionKeyProvider(); + + var context = await Scenario.Define() + .WithEndpoint(b => + { + b.DoNotFailOnErrorMessages(); + b.When(s => s.SendLocal(new MyMessage())); + }) + .Done(c => c.FailedMessages.Any()) + .Run(runSettings); + + var failure = context.FailedMessages.FirstOrDefault() + .Value.First(); + + Assert.That(failure.Exception.Message, Does.Contain("partition key")); + } + + class Context : ScenarioContext + { + } + + class Endpoint : EndpointConfigurationBuilder + { + public Endpoint() => + EndpointSetup((config, runDescriptor) => + { + config.EnableOutbox(); + config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly; + }); + + class MyMessageHandler : IHandleMessages + { + public Task Handle(MyMessage message, IMessageHandlerContext context) + { + Assert.Fail("Should not be called"); + return Task.CompletedTask; + } + } + } + + class MyMessage : IMessage { } + } +} \ No newline at end of file