Skip to content

Commit

Permalink
TestKit and Akka.Remote.TestKit: diagnostic improvements and code mod…
Browse files Browse the repository at this point in the history
…ernization (#7321)

* working on cleaning up the MNTR

* added better logging to barrier entry

* more type cleanup

* fixed issues with encoding `RoleName` into `EnterBarrier` and `FailBarrier` messages

* added better pretty-printing for `EnterBarrier` and `FailBarrier`

* cleaning up some mutability warnings

* more cleanup

* final MNTR fixes
  • Loading branch information
Aaronontheweb authored Aug 19, 2024
1 parent 801ce60 commit 355439e
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 304 deletions.
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<FsCheckVersion>2.16.6</FsCheckVersion>
<HoconVersion>2.0.3</HoconVersion>
<ConfigurationManagerVersion>6.0.1</ConfigurationManagerVersion>
<MultiNodeAdapterVersion>1.5.19</MultiNodeAdapterVersion>
<MultiNodeAdapterVersion>1.5.25</MultiNodeAdapterVersion>
<MicrosoftLibVersion>[6.0.*,)</MicrosoftLibVersion>
<MsExtVersion>[6.0.*,)</MsExtVersion>
<AkkaAnalyzerVersion>0.2.5</AkkaAnalyzerVersion>
Expand Down
108 changes: 56 additions & 52 deletions src/core/Akka.Remote.TestKit.Tests/BarrierSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

using System;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit;
using Xunit;
Expand Down Expand Up @@ -110,55 +111,55 @@ public void A_BarrierCoordinator_must_register_clients_and_disconnect_them()
}

[Fact]
public void A_BarrierCoordinator_must_fail_entering_barrier_when_nobody_registered()
public async Task A_BarrierCoordinator_must_fail_entering_barrier_when_nobody_registered()
{
var b = GetBarrier();
b.Tell(new EnterBarrier("bar1", null), TestActor);
ExpectMsg(new ToClient<BarrierResult>(new BarrierResult("bar1", false)), TimeSpan.FromSeconds(300));
b.Tell(new EnterBarrier("bar1", null, new RoleName("b")), TestActor);
await ExpectMsgAsync(new ToClient<BarrierResult>(new BarrierResult("bar1", false)), TimeSpan.FromSeconds(300));
}

[Fact]
public void A_BarrierCoordinator_must_enter_barrier()
public async Task A_BarrierCoordinator_must_enter_barrier()
{
var barrier = GetBarrier();
var a = CreateTestProbe();
var b = CreateTestProbe();
barrier.Tell(new Controller.NodeInfo(A, Address.Parse("akka://sys"), a.Ref));
barrier.Tell(new Controller.NodeInfo(B, Address.Parse("akka://sys"), b.Ref));
a.Send(barrier, new EnterBarrier("bar2", null));
a.Send(barrier, new EnterBarrier("bar2", null, new RoleName("a")));
NoMsg(a, b);
Within(TimeSpan.FromSeconds(2), () =>
await WithinAsync(TimeSpan.FromSeconds(2), async () =>
{
b.Send(barrier, new EnterBarrier("bar2", null));
a.ExpectMsg(new ToClient<BarrierResult>(new BarrierResult("bar2", true)));
b.ExpectMsg(new ToClient<BarrierResult>(new BarrierResult("bar2", true)));
b.Send(barrier, new EnterBarrier("bar2", null, new RoleName("b")));
await a.ExpectMsgAsync(new ToClient<BarrierResult>(new BarrierResult("bar2", true)));
await b.ExpectMsgAsync(new ToClient<BarrierResult>(new BarrierResult("bar2", true)));
});
}

[Fact]
public void A_BarrierCoordinator_must_enter_barrier_with_joining_node()
public async Task A_BarrierCoordinator_must_enter_barrier_with_joining_node()
{
var barrier = GetBarrier();
var a = CreateTestProbe();
var b = CreateTestProbe();
var c = CreateTestProbe();
barrier.Tell(new Controller.NodeInfo(A, Address.Parse("akka://sys"), a.Ref));
barrier.Tell(new Controller.NodeInfo(B, Address.Parse("akka://sys"), b.Ref));
a.Send(barrier, new EnterBarrier("bar3", null));
a.Send(barrier, new EnterBarrier("bar3", null, new RoleName("a")));
barrier.Tell(new Controller.NodeInfo(C, Address.Parse("akka://sys"), c.Ref));
b.Send(barrier, new EnterBarrier("bar3", null));
b.Send(barrier, new EnterBarrier("bar3", null, new RoleName("b")));
NoMsg(a, b, c);
Within(TimeSpan.FromSeconds(2), () =>
await WithinAsync(TimeSpan.FromSeconds(2), async () =>
{
c.Send(barrier, new EnterBarrier("bar3", null));
a.ExpectMsg(new ToClient<BarrierResult>(new BarrierResult("bar3", true)));
b.ExpectMsg(new ToClient<BarrierResult>(new BarrierResult("bar3", true)));
c.ExpectMsg(new ToClient<BarrierResult>(new BarrierResult("bar3", true)));
c.Send(barrier, new EnterBarrier("bar3", null, new RoleName("c")));
await a.ExpectMsgAsync(new ToClient<BarrierResult>(new BarrierResult("bar3", true)));
await b.ExpectMsgAsync(new ToClient<BarrierResult>(new BarrierResult("bar3", true)));
await c.ExpectMsgAsync(new ToClient<BarrierResult>(new BarrierResult("bar3", true)));
});
}

[Fact]
public void A_BarrierCoordinator_must_enter_barrier_with_leaving_node()
public async Task A_BarrierCoordinator_must_enter_barrier_with_leaving_node()
{
var barrier = GetBarrier();
var a = CreateTestProbe();
Expand All @@ -167,47 +168,49 @@ public void A_BarrierCoordinator_must_enter_barrier_with_leaving_node()
barrier.Tell(new Controller.NodeInfo(A, Address.Parse("akka://sys"), a.Ref));
barrier.Tell(new Controller.NodeInfo(B, Address.Parse("akka://sys"), b.Ref));
barrier.Tell(new Controller.NodeInfo(C, Address.Parse("akka://sys"), c.Ref));
a.Send(barrier, new EnterBarrier("bar4", null));
b.Send(barrier, new EnterBarrier("bar4", null));
a.Send(barrier, new EnterBarrier("bar4", null, new RoleName("a")));
b.Send(barrier, new EnterBarrier("bar4", null, new RoleName("b")));
barrier.Tell(new BarrierCoordinator.RemoveClient(A));
barrier.Tell(new Controller.ClientDisconnected(A));
NoMsg(a, b, c);
Within(TimeSpan.FromSeconds(2), () =>
await WithinAsync(TimeSpan.FromSeconds(2), async () =>
{
barrier.Tell(new BarrierCoordinator.RemoveClient(C));
b.ExpectMsg(new ToClient<BarrierResult>(new BarrierResult("bar4", true)));
await b.ExpectMsgAsync(new ToClient<BarrierResult>(new BarrierResult("bar4", true)));
});
barrier.Tell(new Controller.ClientDisconnected(C));
ExpectNoMsg(TimeSpan.FromSeconds(1));
await ExpectNoMsgAsync(TimeSpan.FromSeconds(1));
}

[Fact]
public void A_BarrierCoordinator_must_enter_leave_barrier_when_last_arrived_is_removed()
public async Task A_BarrierCoordinator_must_enter_leave_barrier_when_last_arrived_is_removed()
{
var barrier = GetBarrier();
var roleName = new RoleName("normal");
var a = CreateTestProbe();
var b = CreateTestProbe();
barrier.Tell(new Controller.NodeInfo(A, Address.Parse("akka://sys"), a.Ref));
barrier.Tell(new Controller.NodeInfo(B, Address.Parse("akka://sys"), b.Ref));
a.Send(barrier, new EnterBarrier("bar5", null));
a.Send(barrier, new EnterBarrier("bar5", null, roleName));
barrier.Tell(new BarrierCoordinator.RemoveClient(A));
b.Send(barrier, new EnterBarrier("foo", null));
b.ExpectMsg(new ToClient<BarrierResult>(new BarrierResult("foo", true)));
b.Send(barrier, new EnterBarrier("foo", null, roleName));
await b.ExpectMsgAsync(new ToClient<BarrierResult>(new BarrierResult("foo", true)));
}

[Fact]
public void A_BarrierCoordinator_must_fail_barrier_with_disconnecting_node()
public async Task A_BarrierCoordinator_must_fail_barrier_with_disconnecting_node()
{
var barrier = GetBarrier();
var roleName = new RoleName("normal");
var a = CreateTestProbe();
var b = CreateTestProbe();
var nodeA = new Controller.NodeInfo(A, Address.Parse("akka://sys"), a.Ref);
barrier.Tell(nodeA);
barrier.Tell(new Controller.NodeInfo(B, Address.Parse("akka://sys"), b.Ref));
a.Send(barrier, new EnterBarrier("bar6", null));
a.Send(barrier, new EnterBarrier("bar6", null, roleName));
//TODO: EventFilter?
barrier.Tell(new Controller.ClientDisconnected(B));
var msg = ExpectMsg<Failed>();
var msg = await ExpectMsgAsync<Failed>();
Assert.Equal(new BarrierCoordinator.ClientLostException(
new BarrierCoordinator.Data(
ImmutableHashSet.Create(nodeA),
Expand All @@ -218,9 +221,11 @@ public void A_BarrierCoordinator_must_fail_barrier_with_disconnecting_node()
}

[Fact]
public void A_BarrierCoordinator_must_fail_barrier_when_disconnecting_node_who_already_arrived()
public async Task A_BarrierCoordinator_must_fail_barrier_when_disconnecting_node_who_already_arrived()
{
var barrier = GetBarrier();
var roleNameA = new RoleName("a");
var roleNameB = new RoleName("b");
var a = CreateTestProbe();
var b = CreateTestProbe();
var c = CreateTestProbe();
Expand All @@ -229,11 +234,11 @@ public void A_BarrierCoordinator_must_fail_barrier_when_disconnecting_node_who_a
barrier.Tell(nodeA);
barrier.Tell(new Controller.NodeInfo(B, Address.Parse("akka://sys"), b.Ref));
barrier.Tell(nodeC);
a.Send(barrier, new EnterBarrier("bar7", null));
b.Send(barrier, new EnterBarrier("bar7", null));
a.Send(barrier, new EnterBarrier("bar7", null, roleNameA));
b.Send(barrier, new EnterBarrier("bar7", null, roleNameB));
//TODO: Event filter?
barrier.Tell(new Controller.ClientDisconnected(B));
var msg = ExpectMsg<Failed>();
var msg = await ExpectMsgAsync<Failed>();
Assert.Equal(new BarrierCoordinator.ClientLostException(
new BarrierCoordinator.Data(
ImmutableHashSet.Create(nodeA, nodeC),
Expand All @@ -245,22 +250,24 @@ public void A_BarrierCoordinator_must_fail_barrier_when_disconnecting_node_who_a
}

[Fact]
public void A_BarrierCoordinator_must_fail_when_entering_wrong_barrier()
public async Task A_BarrierCoordinator_must_fail_when_entering_wrong_barrier()
{
var barrier = GetBarrier();
var roleName = new RoleName("failer");
var a = CreateTestProbe();
var b = CreateTestProbe();
var nodeA = (new Controller.NodeInfo(A, Address.Parse("akka://sys"), a.Ref));
barrier.Tell(nodeA);
var nodeB = (new Controller.NodeInfo(B, Address.Parse("akka://sys"), b.Ref));
barrier.Tell(nodeB);
a.Send(barrier, new EnterBarrier("bar8", null));
a.Send(barrier, new EnterBarrier("bar8", null, roleName));
//TODO: Event filter
b.Send(barrier, new EnterBarrier("foo", null));
var msg = ExpectMsg<Failed>();
b.Send(barrier, new EnterBarrier("foo", null, roleName));
var msg = await ExpectMsgAsync<Failed>();
Assert.Equal(new BarrierCoordinator.WrongBarrierException(
"foo",
b.Ref,
roleName,
new BarrierCoordinator.Data(
ImmutableHashSet.Create(nodeA, nodeB),
"bar8",
Expand All @@ -270,13 +277,14 @@ public void A_BarrierCoordinator_must_fail_when_entering_wrong_barrier()
}

[Fact]
public void A_BarrierCoordinator_must_fail_barrier_after_first_failure()
public async Task A_BarrierCoordinator_must_fail_barrier_after_first_failure()
{
var barrier = GetBarrier();
var a = CreateTestProbe();
var roleName = new RoleName("failer");
//TODO: EventFilter
barrier.Tell(new BarrierCoordinator.RemoveClient(A));
var msg = ExpectMsg<Failed>();
var msg = await ExpectMsgAsync<Failed>();
Assert.Equal(new BarrierCoordinator.BarrierEmptyException(
new BarrierCoordinator.Data(
ImmutableHashSet.Create<Controller.NodeInfo>(),
Expand All @@ -285,24 +293,25 @@ public void A_BarrierCoordinator_must_fail_barrier_after_first_failure()
((BarrierCoordinator.BarrierEmptyException)msg.Exception).BarrierData.Deadline)
, "cannot remove RoleName(a): no client to remove"), msg.Exception);
barrier.Tell(new Controller.NodeInfo(A, Address.Parse("akka://sys"), a.Ref));
a.Send(barrier, new EnterBarrier("bar9", null));
a.Send(barrier, new EnterBarrier("bar9", null, roleName));
a.ExpectMsg(new ToClient<BarrierResult>(new BarrierResult("bar9", false)));
}

[Fact]
public void A_BarrierCoordinator_must_fail_after_barrier_timeout()
public async Task A_BarrierCoordinator_must_fail_after_barrier_timeout()
{
var barrier = GetBarrier();
var roleName = new RoleName("failer");
var a = CreateTestProbe();
var b = CreateTestProbe();
var nodeA = new Controller.NodeInfo(A, Address.Parse("akka://sys"), a.Ref);
var nodeB = new Controller.NodeInfo(B, Address.Parse("akka://sys"), b.Ref);
barrier.Tell(nodeA);
barrier.Tell(nodeB);
a.Send(barrier, new EnterBarrier("bar10", null));
EventFilter.Exception<BarrierCoordinator.BarrierTimeoutException>().ExpectOne(() =>
a.Send(barrier, new EnterBarrier("bar10", null, roleName));
await EventFilter.Exception<BarrierCoordinator.BarrierTimeoutException>().ExpectOneAsync(async () =>
{
var msg = ExpectMsg<Failed>(TimeSpan.FromSeconds(7));
var msg = await ExpectMsgAsync<Failed>(TimeSpan.FromSeconds(7));
Assert.Equal(new BarrierCoordinator.BarrierTimeoutException(
new BarrierCoordinator.Data(
ImmutableHashSet.Create(nodeA, nodeB),
Expand Down Expand Up @@ -347,8 +356,8 @@ private IActorRef GetBarrier()

private class BarrierCoordinatorSupervisor : UntypedActor
{
readonly IActorRef _testActor;
readonly IActorRef _barrier;
private readonly IActorRef _testActor;
private readonly IActorRef _barrier;

public BarrierCoordinatorSupervisor(IActorRef testActor)
{
Expand Down Expand Up @@ -376,11 +385,6 @@ private void NoMsg(params TestProbe[] probes)
ExpectNoMsg(TimeSpan.FromSeconds(1));
foreach (var probe in probes) Assert.False(probe.HasMessages);
}

public IActorRef Self
{
get { return TestActor; }
}
}
}

Loading

0 comments on commit 355439e

Please sign in to comment.