From 3346ffc1cfc2d7625521a5602960290822272d8f Mon Sep 17 00:00:00 2001 From: Anthony Lloyd Date: Mon, 26 Aug 2024 22:47:52 +0100 Subject: [PATCH] improve parallel Run and RunReplay --- CsCheck/Utils.cs | 95 ++++++++++++++++++++++++++++-------------------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/CsCheck/Utils.cs b/CsCheck/Utils.cs index 0b575d0..2636991 100644 --- a/CsCheck/Utils.cs +++ b/CsCheck/Utils.cs @@ -21,6 +21,8 @@ namespace CsCheck; using System.Collections.Concurrent; using System.Diagnostics; using System.Runtime.InteropServices; +using System.Threading; +using System; public sealed class CsCheckException : Exception { @@ -475,69 +477,82 @@ public static bool ModelEqual(T actual, M model) return actual.Equals(model); } - internal static void Run(T initialState, (string, Action)[] sequencialOperations, (string, Action)[] parallelOperations, int threads, int[]? threadIds = null) + sealed class RunWorker(T state, (string, Action)[] parallelOperations, int[]? threadIds) : IThreadPoolWorkItem { - for (int i = 0; i < sequencialOperations.Length; i++) - sequencialOperations[i].Item2(initialState); - Exception? exception = null; - var opId = -1; - var runners = new Thread[threads]; - while (--threads >= 0) + int opId = -1, threadId = -1; + public volatile bool Hold = true; + public Exception? Exception; + public void Execute() { - runners[threads] = new Thread(threadId => + int i, tid = Interlocked.Increment(ref threadId); + while (Hold) { } + while ((i = Interlocked.Increment(ref opId)) < parallelOperations.Length) { - int i, tid = (int)threadId!; - while ((i = Interlocked.Increment(ref opId)) < parallelOperations.Length) + if (threadIds is not null) threadIds[i] = tid; + try { parallelOperations[i].Item2(state); } + catch (Exception e) { - if (threadIds is not null) threadIds[i] = tid; - try { parallelOperations[i].Item2(initialState); } - catch (Exception e) + if (Exception is null) { - if (exception is null) - { - exception = e; - Interlocked.Exchange(ref opId, parallelOperations.Length); - } + Exception = e; + opId = 1_000_000; } } - }); + } } - for (int i = 0; i < runners.Length; i++) runners[i].Start(i); - for (int i = 0; i < runners.Length; i++) runners[i].Join(); - if (exception is not null) throw exception; } - internal static void RunReplay(T initialState, (string, Action)[] sequencialOperations, (string, Action)[] parallelOperations, int threads, int[] threadIds) + internal static void Run(T state, (string, Action)[] sequencialOperations, (string, Action)[] parallelOperations, int threads, int[]? threadIds = null) { for (int i = 0; i < sequencialOperations.Length; i++) - sequencialOperations[i].Item2(initialState); - Exception? exception = null; + sequencialOperations[i].Item2(state); + var worker = new RunWorker(state, parallelOperations, threadIds); var runners = new Thread[threads]; - while (--threads >= 0) + for (int i = 0; i < runners.Length; i++) runners[i] = new Thread(worker.Execute); + for (int i = 0; i < runners.Length; i++) runners[i].Start(); + worker.Hold = false; + for (int i = 0; i < runners.Length; i++) runners[i].Join(); + if (worker.Exception is not null) throw worker.Exception; + } + + sealed class RunReplayWorker(T state, (string, Action)[] parallelOperations, int[] threadIds) : IThreadPoolWorkItem + { + int threadId = -1; + public volatile bool Hold = true; + public Exception? Exception; + public void Execute() { - runners[threads] = new Thread(threadId => + int i, opId = -1, tid = Interlocked.Increment(ref threadId); + while (Hold) { } + while ((i = Interlocked.Increment(ref opId)) < parallelOperations.Length) { - int opId = -1, i = -1, tid = (int)threadId!; - while ((i = Interlocked.Increment(ref opId)) < parallelOperations.Length) + if (threadIds[i] == tid) { - if (threadIds[i] == tid) + try { parallelOperations[i].Item2(state); } + catch (Exception e) { - try { parallelOperations[i].Item2(initialState); } - catch (Exception e) + if (Exception is null) { - if (exception is null) - { - exception = e; - Interlocked.Exchange(ref opId, parallelOperations.Length); - } + Exception = e; + opId = 1_000_000; } } } - }); + } } - for (int i = 0; i < runners.Length; i++) runners[i].Start(i); + } + + internal static void RunReplay(T state, (string, Action)[] sequencialOperations, (string, Action)[] parallelOperations, int threads, int[] threadIds) + { + for (int i = 0; i < sequencialOperations.Length; i++) + sequencialOperations[i].Item2(state); + var worker = new RunReplayWorker(state, parallelOperations, threadIds); + var runners = new Thread[threads]; + for (int i = 0; i < runners.Length; i++) runners[i] = new Thread(worker.Execute); + for (int i = 0; i < runners.Length; i++) runners[i].Start(); + worker.Hold = false; for (int i = 0; i < runners.Length; i++) runners[i].Join(); - if (exception is not null) throw exception; + if (worker.Exception is not null) throw worker.Exception; } internal static IEnumerable Permutations(int[] threadIds, T[] sequence)