From 755da5d84c4bd9a92e1958bcfaf7836c1b9ec33e Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Wed, 9 Oct 2024 16:53:26 +0700 Subject: [PATCH] Improve JournalPerfSpec (#7357) --- .../Akka.Persistence.TCK.csproj | 1 + .../Performance/JournalPerfSpec.cs | 101 ++++++++++++++---- 2 files changed, 83 insertions(+), 19 deletions(-) diff --git a/src/core/Akka.Persistence.TCK/Akka.Persistence.TCK.csproj b/src/core/Akka.Persistence.TCK/Akka.Persistence.TCK.csproj index b8dd14c05cb..95c6d001e5e 100644 --- a/src/core/Akka.Persistence.TCK/Akka.Persistence.TCK.csproj +++ b/src/core/Akka.Persistence.TCK/Akka.Persistence.TCK.csproj @@ -16,6 +16,7 @@ + \ No newline at end of file diff --git a/src/core/Akka.Persistence.TCK/Performance/JournalPerfSpec.cs b/src/core/Akka.Persistence.TCK/Performance/JournalPerfSpec.cs index e3285c4d6b2..f3c4d92fcd1 100644 --- a/src/core/Akka.Persistence.TCK/Performance/JournalPerfSpec.cs +++ b/src/core/Akka.Persistence.TCK/Performance/JournalPerfSpec.cs @@ -17,6 +17,7 @@ using Akka.TestKit; using Akka.Util; using Akka.Util.Internal; +using MathNet.Numerics.Statistics; using Xunit; using Xunit.Abstractions; @@ -33,7 +34,7 @@ namespace Akka.Persistence.TestKit.Performance /// public abstract class JournalPerfSpec : Akka.TestKit.Xunit2.TestKit { - private readonly TestProbe testProbe; + private readonly TestProbe _testProbe; /// /// Number of messages sent to the PersistentActor under test for each test iteration @@ -44,6 +45,11 @@ public abstract class JournalPerfSpec : Akka.TestKit.Xunit2.TestKit /// Number of measurement iterations each test will be run. /// protected int MeasurementIterations = 10; + + /// + /// Sigma value for the z-score outlier rejection algorithm + /// + protected double OutlierRejectionSigma = 2.0; /// /// Override in order to customize timeouts used for ExpectMsg, in order to tune the awaits to your journal's perf @@ -55,12 +61,12 @@ public abstract class JournalPerfSpec : Akka.TestKit.Xunit2.TestKit protected JournalPerfSpec(Config config, string actorSystem, ITestOutputHelper output) : base(config ?? Config.Empty, actorSystem, output) { - testProbe = CreateTestProbe(); + _testProbe = CreateTestProbe(); } internal IActorRef BenchActor(string pid, int replyAfter) { - return Sys.ActorOf(Props.Create(() => new BenchActor(pid, testProbe, EventsCount, false)));; + return Sys.ActorOf(Props.Create(() => new BenchActor(pid, _testProbe, EventsCount, false)));; } internal (IActorRef aut,TestProbe probe) BenchActorNewProbe(string pid, int replyAfter) @@ -81,27 +87,39 @@ internal IActorRef BenchActor(string pid, int replyAfter) internal void FeedAndExpectLast(IActorRef actor, string mode, IReadOnlyList commands) { - commands.ForEach(c => actor.Tell(new Cmd(mode, c))); - testProbe.ExpectMsg(commands.Last(), ExpectDuration); + for (var i = 0; i < commands.Count; i++) + { + actor.Tell(new Cmd(mode, commands[i])); + } + + _testProbe.ExpectMsg(commands[commands.Count - 1], ExpectDuration); } internal void FeedAndExpectLastSpecific((IActorRef actor, TestProbe probe) aut, string mode, IReadOnlyList commands) { - commands.ForEach(c => aut.actor.Tell(new Cmd(mode, c))); + var (actor, probe) = aut; + for (var i = 0; i < commands.Count; i++) + { + actor.Tell(new Cmd(mode, commands[i])); + } - aut.probe.ExpectMsg(commands.Last(), ExpectDuration); + probe.ExpectMsg(commands[commands.Count - 1], ExpectDuration); } internal void FeedAndExpectLastRouterSet( (IActorRef actor, TestProbe probe) autSet, string mode, IReadOnlyList commands, int numExpect) { - - commands.ForEach(c => autSet.actor.Tell(new Broadcast(new Cmd(mode, c)))); + var (actor, probe) = autSet; + for (var i = 0; i < commands.Count; i++) + { + actor.Tell(new Broadcast(new Cmd(mode, commands[i]))); + } - for (int i = 0; i < numExpect; i++) + var expected = commands[commands.Count - 1]; + for (var i = 0; i < numExpect; i++) { - autSet.probe.ExpectMsg(commands.Last(), ExpectDuration); + probe.ExpectMsg(expected, ExpectDuration); } } @@ -126,10 +144,25 @@ internal void Measure(Func msg, Action block) i++; } - double avgTime = measurements.Select(c => c.TotalMilliseconds).Sum() / MeasurementIterations; - double msgPerSec = (EventsCount / avgTime) * 1000; + var (rejected, times) = RejectOutliers(measurements.Select(c => c.TotalMilliseconds).ToArray(), OutlierRejectionSigma); + + var mean = times.Average(); + var stdDev = times.PopulationStandardDeviation(); + var min = times.Minimum(); + var q1 = times.LowerQuartile(); + var median = times.Median(); + var q3 = times.UpperQuartile(); + var max = times.Maximum(); + + Output.WriteLine($"Mean: {mean:F2} ms, Standard Deviation: {stdDev:F2} ms, Min: {min:F2} ms, Q1: {q1:F2} ms, Median: {median:F2} ms, Q3: {q3:F2} ms, Max: {max:F2} ms"); - Output.WriteLine($"Average time: {avgTime} ms, {msgPerSec} msg/sec"); + var msgPerSec = EventsCount / mean * 1000; + Output.WriteLine($"Mean throughput: {msgPerSec:F2} msg/s"); + + var medianMsgPerSec = EventsCount / median * 1000; + Output.WriteLine($"Median throughput: {medianMsgPerSec:F2} msg/s"); + + Output.WriteLine($"Rejected outlier (sigma: {OutlierRejectionSigma}): {string.Join(", ", rejected)}"); } /// @@ -156,10 +189,28 @@ internal void MeasureGroup(Func msg, Action block, int numMsg, i++; } - double avgTime = measurements.Select(c => c.TotalMilliseconds).Sum() / MeasurementIterations; - double msgPerSec = (numMsg / avgTime) * 1000; - double msgPerSecTotal = (numMsg*numGroup / avgTime) * 1000; - Output.WriteLine($"Workers: {numGroup} , Average time: {avgTime} ms, {msgPerSec} msg/sec/actor, {msgPerSecTotal} total msg/sec."); + var (rejected, times) = RejectOutliers(measurements.Select(c => c.TotalMilliseconds).ToArray(), OutlierRejectionSigma); + + var mean = times.Average(); + var stdDev = times.PopulationStandardDeviation(); + var min = times.Minimum(); + var q1 = times.LowerQuartile(); + var median = times.Median(); + var q3 = times.UpperQuartile(); + var max = times.Maximum(); + + Output.WriteLine($"Workers: {numGroup}, Mean: {mean:F2} ms, Standard Deviation: {stdDev:F2} ms, Min: {min:F2} ms, Q1: {q1:F2} ms, Median: {median:F2} ms, Q3: {q3:F2} ms, Max: {max:F2} ms"); + + var msgPerSec = numMsg / mean * 1000; + var msgPerSecTotal = numMsg * numGroup / mean * 1000; + + Output.WriteLine($"Mean throughput: {msgPerSec:F2} msg/s/actor, Mean total throughput: {msgPerSecTotal:F2} msg/s"); + + var medianMsgPerSec = numMsg / median * 1000; + var medianMsgPerSecTotal = numMsg * numGroup / median * 1000; + Output.WriteLine($"Median throughput: {medianMsgPerSec:F2} msg/s/actor, Median total throughput: {medianMsgPerSecTotal:F2} msg/s"); + + Output.WriteLine($"Rejected outlier (sigma: {OutlierRejectionSigma}): {string.Join(", ", rejected)}"); } private void RunPersistGroupBenchmark(int numGroup, int numCommands) @@ -179,6 +230,18 @@ private void RunPersistGroupBenchmark(int numGroup, int numCommands) ); } + private static (IReadOnlyList Rejected, IReadOnlyList Measurements) RejectOutliers(IReadOnlyList measurements, double sigma) + { + var mean = measurements.Average(); + var stdDev = measurements.PopulationStandardDeviation(); + var threshold = sigma * stdDev; + var minThreshold = mean - threshold; + var maxThreshold = mean + threshold; + var rejected = measurements.Where(m => m < minThreshold || m > maxThreshold); + var accepted = measurements.Where(m => m >= minThreshold && m <= maxThreshold); + return (rejected.ToArray(), accepted.ToArray()); + } + [Fact] public void PersistenceActor_performance_must_measure_Persist() { @@ -272,7 +335,7 @@ public void PersistenceActor_performance_must_measure_Recovering() Measure(d => $"Recovering {EventsCount} took {d.TotalMilliseconds} ms", () => { BenchActor("PersistRecoverPid", EventsCount); - testProbe.ExpectMsg(Commands.Last(), ExpectDuration); + _testProbe.ExpectMsg(Commands.Last(), ExpectDuration); }); }