Skip to content

Commit

Permalink
Processing a control message causes the outbox to throw a null refere…
Browse files Browse the repository at this point in the history
…nce exception (#549) (#550)

* Processing a control message causes the outbox to throw a null reference exception

* Update src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxBehavior.cs



---------

Co-authored-by: Andreas Öhlund <andreas.ohlund@particular.net>
  • Loading branch information
danielmarbach and andreasohlund authored Mar 23, 2023
1 parent 64775ee commit 85524a9
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
namespace NServiceBus.AcceptanceTests
{
using System;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NServiceBus.AcceptanceTesting.Support;
using NServiceBus.Features;
using NServiceBus.Pipeline;
using NServiceBus.Routing;
using NServiceBus.Transport;
using NServiceBus.Unicast.Transport;
using NUnit.Framework;

[TestFixture]
public class When_using_outbox_control_message : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_work()
{
var runSettings = new RunSettings();
runSettings.DoNotRegisterDefaultPartitionKeyProvider();

var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>()
.Done(c => c.ProcessedControlMessage)
.Run(runSettings)
.ConfigureAwait(false);

Assert.True(context.ProcessedControlMessage);
}

public class Context : ScenarioContext
{
public bool ProcessedControlMessage { get; set; }
}

public class Endpoint : EndpointConfigurationBuilder
{
public Endpoint() =>
EndpointSetup<DefaultServer>((config, runDescriptor) =>
{
config.EnableOutbox();
config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
config.RegisterStartupTask<ControlMessageSender>();
config.Pipeline.Register(new ControlMessageBehavior(runDescriptor.ScenarioContext as Context), "Checks that the control message was processed successfully");
});

class ControlMessageSender : FeatureStartupTask
{
public ControlMessageSender(IMessageDispatcher dispatcher) => this.dispatcher = dispatcher;

protected override Task OnStart(IMessageSession session, CancellationToken cancellationToken = default)
{
var controlMessage = ControlMessageFactory.Create(MessageIntent.Subscribe);
var messageOperation = new TransportOperation(controlMessage, new UnicastAddressTag(AcceptanceTesting.Customization.Conventions.EndpointNamingConvention(typeof(Endpoint))));

return dispatcher.Dispatch(new TransportOperations(messageOperation), new TransportTransaction(), cancellationToken);
}

protected override Task OnStop(IMessageSession session, CancellationToken cancellationToken = default) => Task.CompletedTask;

readonly IMessageDispatcher dispatcher;
}

class ControlMessageBehavior : Behavior<IIncomingPhysicalMessageContext>
{
public ControlMessageBehavior(Context testContext) => this.testContext = testContext;

public override async Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
await next();

testContext.ProcessedControlMessage = true;
}

readonly Context testContext;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
// Outbox operating at the logical stage
if (!context.Extensions.TryGet<PartitionKey>(out var partitionKey))
{
throw new Exception("For the outbox to work the following information must be provided at latest up to the incoming physical or logical message stage. A partition key via `context.Extensions.Set<PartitionKey>(yourPartitionKey)`.");
throw new Exception("For the outbox to work a partition key must be provided at latest up to the incoming physical or logical message stage. Set one via '{nameof(CosmosPersistenceConfig.TransactionInformation)}'.");
}

var containerHolder = containerHolderResolver.ResolveAndSetIfAvailable(context.Extensions);
Expand All @@ -74,6 +74,8 @@ public async Task Invoke(IIncomingLogicalMessageContext context, Func<IIncomingL
outboxTransaction.PartitionKey = partitionKey;
outboxTransaction.StorageSession.ContainerHolder = containerHolder;

setAsDispatchedHolder.ThrowIfContainerIsNotSet();

var outboxRecord = await containerHolder.Container.ReadOutboxRecord(context.MessageId, outboxTransaction.PartitionKey.Value, serializer, context.Extensions, context.CancellationToken)
.ConfigureAwait(false);

Expand Down
16 changes: 14 additions & 2 deletions src/NServiceBus.Persistence.CosmosDB/Outbox/OutboxPersister.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
using Microsoft.Azure.Cosmos;
using Newtonsoft.Json;
using Outbox;
using Transport;
using Headers = NServiceBus.Headers;

class OutboxPersister : IOutboxStorage
{
Expand Down Expand Up @@ -39,10 +41,19 @@ public async Task<OutboxMessage> Get(string messageId, ContextBag context, Cance

if (!context.TryGet<PartitionKey>(out var partitionKey))
{
// we return null here to enable outbox work at logical stage
return null;
// because of the transactional session we cannot assume the incoming message is always present
if (!context.TryGet<IncomingMessage>(out var incomingMessage) ||
!incomingMessage.Headers.ContainsKey(Headers.ControlMessageHeader))
{
// we return null here to enable outbox work at logical stage
return null;
}

partitionKey = new PartitionKey(messageId);
context.Set(partitionKey);
}

setAsDispatchedHolder.ThrowIfContainerIsNotSet();
setAsDispatchedHolder.PartitionKey = partitionKey;

var outboxRecord = await setAsDispatchedHolder.ContainerHolder.Container.ReadOutboxRecord(messageId, partitionKey, serializer, context, cancellationToken: cancellationToken)
Expand Down Expand Up @@ -74,6 +85,7 @@ public Task Store(OutboxMessage message, IOutboxTransaction transaction, Context
public async Task SetAsDispatched(string messageId, ContextBag context, CancellationToken cancellationToken = default)
{
var setAsDispatchedHolder = context.Get<SetAsDispatchedHolder>();
setAsDispatchedHolder.ThrowIfContainerIsNotSet();

var partitionKey = setAsDispatchedHolder.PartitionKey;
var containerHolder = setAsDispatchedHolder.ContainerHolder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#nullable enable

namespace NServiceBus.Persistence.CosmosDB
{
using System;

static class SetAsDispatchedHolderExtensions
{
public static void ThrowIfContainerIsNotSet(this SetAsDispatchedHolder setAsDispatchedHolder)
{
if (setAsDispatchedHolder.ContainerHolder is { Container: not null })
{
return;
}

throw new Exception($"For the outbox to work a container must be configured. Either configure a default one using '{nameof(CosmosPersistenceConfig.DefaultContainer)}' or set one via '{nameof(CosmosPersistenceConfig.TransactionInformation)}'.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace NServiceBus.AcceptanceTests
{
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NServiceBus.AcceptanceTesting.Support;
using NUnit.Framework;

[TestFixture]
public class When_no_container_information_is_configured : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_throw_meaningful_exception()
{
var runSettings = new RunSettings();
runSettings.DoNotRegisterDefaultContainerInformationProvider();

var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(b =>
{
b.DoNotFailOnErrorMessages();
b.When(s => s.SendLocal(new MyMessage()));
})
.Done(c => c.FailedMessages.Any())
.Run(runSettings);

var failure = context.FailedMessages.FirstOrDefault()
.Value.First();

Assert.That(failure.Exception.Message, Does.Contain("container"));
}

class Context : ScenarioContext
{
}

class Endpoint : EndpointConfigurationBuilder
{
public Endpoint() =>
EndpointSetup<DefaultServer>((config, runDescriptor) =>
{
config.EnableOutbox();
config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
});

class MyMessageHandler : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
Assert.Fail("Should not be called");
return Task.CompletedTask;
}
}
}

class MyMessage : IMessage { }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
namespace NServiceBus.AcceptanceTests
{
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using NServiceBus.AcceptanceTesting.Support;
using NUnit.Framework;

[TestFixture]
public class When_no_partition_key_is_configured : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_throw_meaningful_exception()
{
var runSettings = new RunSettings();
runSettings.DoNotRegisterDefaultPartitionKeyProvider();

var context = await Scenario.Define<Context>()
.WithEndpoint<Endpoint>(b =>
{
b.DoNotFailOnErrorMessages();
b.When(s => s.SendLocal(new MyMessage()));
})
.Done(c => c.FailedMessages.Any())
.Run(runSettings);

var failure = context.FailedMessages.FirstOrDefault()
.Value.First();

Assert.That(failure.Exception.Message, Does.Contain("partition key"));
}

class Context : ScenarioContext
{
}

class Endpoint : EndpointConfigurationBuilder
{
public Endpoint() =>
EndpointSetup<DefaultServer>((config, runDescriptor) =>
{
config.EnableOutbox();
config.ConfigureTransport().TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
});

class MyMessageHandler : IHandleMessages<MyMessage>
{
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
Assert.Fail("Should not be called");
return Task.CompletedTask;
}
}
}

class MyMessage : IMessage { }
}
}

0 comments on commit 85524a9

Please sign in to comment.