Skip to content

Commit

Permalink
Improve Akka.Persistence.TestKit (#7324)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Aug 20, 2024
1 parent 355439e commit 19162bd
Show file tree
Hide file tree
Showing 21 changed files with 1,046 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
// </copyright>
//-----------------------------------------------------------------------

using System.Threading;
using FluentAssertions.Extensions;

namespace Akka.Persistence.TestKit.Tests
{
using System;
Expand Down Expand Up @@ -58,6 +61,28 @@ public async Task delay_must_call_next_interceptor_after_specified_delay()
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration - epsilon);
}

[Fact]
public async Task cancelable_delay_must_call_next_interceptor_immediately_after_cancellation()
{
var totalDuration = 400.Milliseconds();
var delayDuration = 200.Milliseconds();
var epsilon = TimeSpan.FromMilliseconds(50);
using var cts = new CancellationTokenSource();
var probe = new InterceptorProbe();
var delay = new JournalInterceptors.CancelableDelay(totalDuration, probe, cts.Token);

var startedAt = DateTime.Now;
var task = delay.InterceptAsync(null);
await Task.Delay(delayDuration - epsilon);

probe.WasCalled.Should().BeFalse();
cts.Cancel();
await task;

probe.WasCalled.Should().BeTrue();
probe.CalledAt.Should().BeOnOrAfter(startedAt + delayDuration - epsilon);
}

[Fact]
public async Task on_type_must_call_next_interceptor_when_message_is_exactly_awaited_type()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
// </copyright>
//-----------------------------------------------------------------------

using System.Threading;
using FluentAssertions.Extensions;

namespace Akka.Persistence.TestKit.Tests
{
using System;
Expand Down Expand Up @@ -48,6 +51,28 @@ public async Task delay_must_call_next_interceptor_after_specified_delay()
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration - epsilon);
}

[Fact]
public async Task cancelable_delay_must_call_next_interceptor_immediately_after_cancellation()
{
var totalDuration = 400.Milliseconds();
var delayDuration = 200.Milliseconds();
var epsilon = TimeSpan.FromMilliseconds(50);
using var cts = new CancellationTokenSource();
var probe = new InterceptorProbe();
var delay = new SnapshotStoreInterceptors.CancelableDelay(totalDuration, probe, cts.Token);

var startedAt = DateTime.Now;
var task = delay.InterceptAsync(null, null);
await Task.Delay(delayDuration - epsilon);

probe.WasCalled.Should().BeFalse();
cts.Cancel();
await task;

probe.WasCalled.Should().BeTrue();
probe.CalledAt.Should().BeOnOrAfter(startedAt + delayDuration - epsilon);
}

[Fact]
public async Task on_condition_must_accept_sync_lambda()
{
Expand Down
20 changes: 16 additions & 4 deletions src/core/Akka.Persistence.TestKit.Xunit2/PersistenceTestKit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Akka.Persistence.TestKit
/// This class represents an Akka.NET Persistence TestKit that uses <a href="https://xunit.github.io/">xUnit</a>
/// as its testing framework.
/// </summary>
public abstract class PersistenceTestKit : TestKit
public class PersistenceTestKit : TestKit
{
/// <summary>
/// Create a new instance of the <see cref="PersistenceTestKit"/> class.
Expand All @@ -30,7 +30,7 @@ public abstract class PersistenceTestKit : TestKit
/// <param name="setup">Test ActorSystem configuration</param>
/// <param name="actorSystemName">Optional: The name of the actor system</param>
/// <param name="output">TBD</param>
protected PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = null, ITestOutputHelper output = null)
public PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = null, ITestOutputHelper output = null)
: base(GetConfig(setup), actorSystemName, output)
{
var persistenceExtension = Persistence.Instance.Apply(Sys);
Expand All @@ -49,7 +49,7 @@ protected PersistenceTestKit(ActorSystemSetup setup, string actorSystemName = nu
/// <param name="config">Test ActorSystem configuration</param>
/// <param name="actorSystemName">Optional: The name of the actor system</param>
/// <param name="output">TBD</param>
protected PersistenceTestKit(Config config, string actorSystemName = null, ITestOutputHelper output = null)
public PersistenceTestKit(Config config, string actorSystemName = null, ITestOutputHelper output = null)
: base(GetConfig(config), actorSystemName, output)
{
var persistenceExtension = Persistence.Instance.Apply(Sys);
Expand All @@ -61,13 +61,25 @@ protected PersistenceTestKit(Config config, string actorSystemName = null, ITest
Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef);
}

public PersistenceTestKit(ActorSystem actorSystem, ITestOutputHelper output = null)
: base(actorSystem, output)
{
var persistenceExtension = Persistence.Instance.Apply(Sys);

JournalActorRef = persistenceExtension.JournalFor(null);
Journal = TestJournal.FromRef(JournalActorRef);

SnapshotsActorRef = persistenceExtension.SnapshotStoreFor(null);
Snapshots = TestSnapshotStore.FromRef(SnapshotsActorRef);
}

/// <summary>
/// Create a new instance of the <see cref="PersistenceTestKit"/> class.
/// A new system with the default configuration will be created.
/// </summary>
/// <param name="actorSystemName">Optional: The name of the actor system</param>
/// <param name="output">TBD</param>
protected PersistenceTestKit(string actorSystemName = null, ITestOutputHelper output = null)
public PersistenceTestKit(string actorSystemName = null, ITestOutputHelper output = null)
: this(Config.Empty, actorSystemName, output)
{
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=connection/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=journal/@EntryIndexedValue">True</s:Boolean>
<s:Boolean x:Key="/Default/CodeInspection/NamespaceProvider/NamespaceFoldersToSkip/=snapshotstore/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
107 changes: 107 additions & 0 deletions src/core/Akka.Persistence.TestKit/ConnectionInterceptors.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// -----------------------------------------------------------------------
// <copyright file="ConnectionInterceptors.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Akka.Persistence.TestKit;

public static class ConnectionInterceptors
{
public sealed class Noop : IConnectionInterceptor
{
public static readonly IConnectionInterceptor Instance = new Noop();

public Task InterceptAsync() => Task.FromResult(true);
}

public sealed class Failure : IConnectionInterceptor
{
public static readonly IConnectionInterceptor Instance = new Failure();

public Task InterceptAsync() => throw new TestConnectionException();
}

public sealed class Delay : IConnectionInterceptor
{
public Delay(TimeSpan delay, IConnectionInterceptor next)
{
_delay = delay;
_next = next;
}

private readonly TimeSpan _delay;
private readonly IConnectionInterceptor _next;

public async Task InterceptAsync()
{
await Task.Delay(_delay);
await _next.InterceptAsync();
}
}

public sealed class OnCondition : IConnectionInterceptor
{
public OnCondition(Func<Task<bool>> predicate, IConnectionInterceptor next, bool negate = false)
{
_predicate = predicate;
_next = next;
_negate = negate;
}

public OnCondition(Func<bool> predicate, IConnectionInterceptor next, bool negate = false)
{
_predicate = () => Task.FromResult(predicate());
_next = next;
_negate = negate;
}

private readonly Func<Task<bool>> _predicate;
private readonly IConnectionInterceptor _next;
private readonly bool _negate;

public async Task InterceptAsync()
{
var result = await _predicate();
if ((_negate && !result) || (!_negate && result))
{
await _next.InterceptAsync();
}
}
}

public sealed class CancelableDelay: IConnectionInterceptor
{
public CancelableDelay(TimeSpan delay, IConnectionInterceptor next, CancellationToken cancellationToken)
{
_delay = delay;
_next = next;
_cancellationToken = cancellationToken;
}

private readonly TimeSpan _delay;
private readonly IConnectionInterceptor _next;
private readonly CancellationToken _cancellationToken;

public async Task InterceptAsync()
{
try
{
await Task.Delay(_delay, _cancellationToken);
}
catch (OperationCanceledException)
{
// no-op
}
catch (TimeoutException)
{
// no-op
}
await _next.InterceptAsync();
}
}
}
14 changes: 14 additions & 0 deletions src/core/Akka.Persistence.TestKit/IConnectionInterceptor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// -----------------------------------------------------------------------
// <copyright file="IConnectionInterceptor.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Tasks;

namespace Akka.Persistence.TestKit;

public interface IConnectionInterceptor
{
Task InterceptAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// -----------------------------------------------------------------------
// <copyright file="IConnectionBehaviorSetter.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2024 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------

using System.Threading.Tasks;

namespace Akka.Persistence.TestKit;

public interface IJournalConnectionBehaviorSetter
{
Task SetInterceptorAsync(IConnectionInterceptor interceptor);
}
2 changes: 2 additions & 0 deletions src/core/Akka.Persistence.TestKit/Journal/ITestJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,7 @@ public interface ITestJournal
/// List of interceptors to alter recovery behavior of proxied journal.
/// </summary>
JournalRecoveryBehavior OnRecovery { get; }

JournalConnectionBehavior OnConnect { get; }
}
}
Loading

0 comments on commit 19162bd

Please sign in to comment.