Skip to content

Commit

Permalink
Improve JournalPerfSpec (#7357)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus authored Oct 9, 2024
1 parent 3bdb6c0 commit 755da5d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/core/Akka.Persistence.TCK/Akka.Persistence.TCK.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)"/>
<PackageReference Include="MathNet.Numerics" Version="5.0.0" />
</ItemGroup>

</Project>
101 changes: 82 additions & 19 deletions src/core/Akka.Persistence.TCK/Performance/JournalPerfSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
using Akka.TestKit;
using Akka.Util;
using Akka.Util.Internal;
using MathNet.Numerics.Statistics;
using Xunit;
using Xunit.Abstractions;

Expand All @@ -33,7 +34,7 @@ namespace Akka.Persistence.TestKit.Performance
/// </summary>
public abstract class JournalPerfSpec : Akka.TestKit.Xunit2.TestKit
{
private readonly TestProbe testProbe;
private readonly TestProbe _testProbe;

/// <summary>
/// Number of messages sent to the PersistentActor under test for each test iteration
Expand All @@ -44,6 +45,11 @@ public abstract class JournalPerfSpec : Akka.TestKit.Xunit2.TestKit
/// Number of measurement iterations each test will be run.
/// </summary>
protected int MeasurementIterations = 10;

/// <summary>
/// Sigma value for the z-score outlier rejection algorithm
/// </summary>
protected double OutlierRejectionSigma = 2.0;

/// <summary>
/// Override in order to customize timeouts used for ExpectMsg, in order to tune the awaits to your journal's perf
Expand All @@ -55,12 +61,12 @@ public abstract class JournalPerfSpec : Akka.TestKit.Xunit2.TestKit
protected JournalPerfSpec(Config config, string actorSystem, ITestOutputHelper output)
: base(config ?? Config.Empty, actorSystem, output)
{
testProbe = CreateTestProbe();
_testProbe = CreateTestProbe();
}

internal IActorRef BenchActor(string pid, int replyAfter)
{
return Sys.ActorOf(Props.Create(() => new BenchActor(pid, testProbe, EventsCount, false)));;
return Sys.ActorOf(Props.Create(() => new BenchActor(pid, _testProbe, EventsCount, false)));;
}

internal (IActorRef aut,TestProbe probe) BenchActorNewProbe(string pid, int replyAfter)
Expand All @@ -81,27 +87,39 @@ internal IActorRef BenchActor(string pid, int replyAfter)

internal void FeedAndExpectLast(IActorRef actor, string mode, IReadOnlyList<int> commands)
{
commands.ForEach(c => actor.Tell(new Cmd(mode, c)));
testProbe.ExpectMsg(commands.Last(), ExpectDuration);
for (var i = 0; i < commands.Count; i++)
{
actor.Tell(new Cmd(mode, commands[i]));
}

_testProbe.ExpectMsg(commands[commands.Count - 1], ExpectDuration);
}

internal void FeedAndExpectLastSpecific((IActorRef actor, TestProbe probe) aut, string mode, IReadOnlyList<int> commands)
{
commands.ForEach(c => aut.actor.Tell(new Cmd(mode, c)));
var (actor, probe) = aut;
for (var i = 0; i < commands.Count; i++)
{
actor.Tell(new Cmd(mode, commands[i]));
}

aut.probe.ExpectMsg(commands.Last(), ExpectDuration);
probe.ExpectMsg(commands[commands.Count - 1], ExpectDuration);
}

internal void FeedAndExpectLastRouterSet(
(IActorRef actor, TestProbe probe) autSet, string mode,
IReadOnlyList<int> commands, int numExpect)
{

commands.ForEach(c => autSet.actor.Tell(new Broadcast(new Cmd(mode, c))));
var (actor, probe) = autSet;
for (var i = 0; i < commands.Count; i++)
{
actor.Tell(new Broadcast(new Cmd(mode, commands[i])));
}

for (int i = 0; i < numExpect; i++)
var expected = commands[commands.Count - 1];
for (var i = 0; i < numExpect; i++)
{
autSet.probe.ExpectMsg(commands.Last(), ExpectDuration);
probe.ExpectMsg(expected, ExpectDuration);
}

}
Expand All @@ -126,10 +144,25 @@ internal void Measure(Func<TimeSpan, string> msg, Action block)
i++;
}

double avgTime = measurements.Select(c => c.TotalMilliseconds).Sum() / MeasurementIterations;
double msgPerSec = (EventsCount / avgTime) * 1000;
var (rejected, times) = RejectOutliers(measurements.Select(c => c.TotalMilliseconds).ToArray(), OutlierRejectionSigma);

var mean = times.Average();
var stdDev = times.PopulationStandardDeviation();
var min = times.Minimum();
var q1 = times.LowerQuartile();
var median = times.Median();
var q3 = times.UpperQuartile();
var max = times.Maximum();

Output.WriteLine($"Mean: {mean:F2} ms, Standard Deviation: {stdDev:F2} ms, Min: {min:F2} ms, Q1: {q1:F2} ms, Median: {median:F2} ms, Q3: {q3:F2} ms, Max: {max:F2} ms");

Output.WriteLine($"Average time: {avgTime} ms, {msgPerSec} msg/sec");
var msgPerSec = EventsCount / mean * 1000;
Output.WriteLine($"Mean throughput: {msgPerSec:F2} msg/s");

var medianMsgPerSec = EventsCount / median * 1000;
Output.WriteLine($"Median throughput: {medianMsgPerSec:F2} msg/s");

Output.WriteLine($"Rejected outlier (sigma: {OutlierRejectionSigma}): {string.Join(", ", rejected)}");
}

/// <summary>
Expand All @@ -156,10 +189,28 @@ internal void MeasureGroup(Func<TimeSpan, string> msg, Action block, int numMsg,
i++;
}

double avgTime = measurements.Select(c => c.TotalMilliseconds).Sum() / MeasurementIterations;
double msgPerSec = (numMsg / avgTime) * 1000;
double msgPerSecTotal = (numMsg*numGroup / avgTime) * 1000;
Output.WriteLine($"Workers: {numGroup} , Average time: {avgTime} ms, {msgPerSec} msg/sec/actor, {msgPerSecTotal} total msg/sec.");
var (rejected, times) = RejectOutliers(measurements.Select(c => c.TotalMilliseconds).ToArray(), OutlierRejectionSigma);

var mean = times.Average();
var stdDev = times.PopulationStandardDeviation();
var min = times.Minimum();
var q1 = times.LowerQuartile();
var median = times.Median();
var q3 = times.UpperQuartile();
var max = times.Maximum();

Output.WriteLine($"Workers: {numGroup}, Mean: {mean:F2} ms, Standard Deviation: {stdDev:F2} ms, Min: {min:F2} ms, Q1: {q1:F2} ms, Median: {median:F2} ms, Q3: {q3:F2} ms, Max: {max:F2} ms");

var msgPerSec = numMsg / mean * 1000;
var msgPerSecTotal = numMsg * numGroup / mean * 1000;

Output.WriteLine($"Mean throughput: {msgPerSec:F2} msg/s/actor, Mean total throughput: {msgPerSecTotal:F2} msg/s");

var medianMsgPerSec = numMsg / median * 1000;
var medianMsgPerSecTotal = numMsg * numGroup / median * 1000;
Output.WriteLine($"Median throughput: {medianMsgPerSec:F2} msg/s/actor, Median total throughput: {medianMsgPerSecTotal:F2} msg/s");

Output.WriteLine($"Rejected outlier (sigma: {OutlierRejectionSigma}): {string.Join(", ", rejected)}");
}

private void RunPersistGroupBenchmark(int numGroup, int numCommands)
Expand All @@ -179,6 +230,18 @@ private void RunPersistGroupBenchmark(int numGroup, int numCommands)
);
}

private static (IReadOnlyList<double> Rejected, IReadOnlyList<double> Measurements) RejectOutliers(IReadOnlyList<double> measurements, double sigma)
{
var mean = measurements.Average();
var stdDev = measurements.PopulationStandardDeviation();
var threshold = sigma * stdDev;
var minThreshold = mean - threshold;
var maxThreshold = mean + threshold;
var rejected = measurements.Where(m => m < minThreshold || m > maxThreshold);
var accepted = measurements.Where(m => m >= minThreshold && m <= maxThreshold);
return (rejected.ToArray(), accepted.ToArray());
}

[Fact]
public void PersistenceActor_performance_must_measure_Persist()
{
Expand Down Expand Up @@ -272,7 +335,7 @@ public void PersistenceActor_performance_must_measure_Recovering()
Measure(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms", () =>
{
BenchActor("PersistRecoverPid", EventsCount);
testProbe.ExpectMsg(Commands.Last(), ExpectDuration);
_testProbe.ExpectMsg(Commands.Last(), ExpectDuration);
});
}

Expand Down

0 comments on commit 755da5d

Please sign in to comment.