Skip to content

Commit

Permalink
improve parallel Run and RunReplay
Browse files Browse the repository at this point in the history
  • Loading branch information
AnthonyLloyd committed Aug 26, 2024
1 parent 2e187fb commit 3346ffc
Showing 1 changed file with 55 additions and 40 deletions.
95 changes: 55 additions & 40 deletions CsCheck/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ namespace CsCheck;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System;

public sealed class CsCheckException : Exception
{
Expand Down Expand Up @@ -475,69 +477,82 @@ public static bool ModelEqual<T, M>(T actual, M model)
return actual.Equals(model);
}

internal static void Run<T>(T initialState, (string, Action<T>)[] sequencialOperations, (string, Action<T>)[] parallelOperations, int threads, int[]? threadIds = null)
sealed class RunWorker<T>(T state, (string, Action<T>)[] parallelOperations, int[]? threadIds) : IThreadPoolWorkItem
{
for (int i = 0; i < sequencialOperations.Length; i++)
sequencialOperations[i].Item2(initialState);
Exception? exception = null;
var opId = -1;
var runners = new Thread[threads];
while (--threads >= 0)
int opId = -1, threadId = -1;
public volatile bool Hold = true;
public Exception? Exception;
public void Execute()
{
runners[threads] = new Thread(threadId =>
int i, tid = Interlocked.Increment(ref threadId);
while (Hold) { }
while ((i = Interlocked.Increment(ref opId)) < parallelOperations.Length)
{
int i, tid = (int)threadId!;
while ((i = Interlocked.Increment(ref opId)) < parallelOperations.Length)
if (threadIds is not null) threadIds[i] = tid;
try { parallelOperations[i].Item2(state); }
catch (Exception e)
{
if (threadIds is not null) threadIds[i] = tid;
try { parallelOperations[i].Item2(initialState); }
catch (Exception e)
if (Exception is null)
{
if (exception is null)
{
exception = e;
Interlocked.Exchange(ref opId, parallelOperations.Length);
}
Exception = e;
opId = 1_000_000;
}
}
});
}
}
for (int i = 0; i < runners.Length; i++) runners[i].Start(i);
for (int i = 0; i < runners.Length; i++) runners[i].Join();
if (exception is not null) throw exception;
}

internal static void RunReplay<T>(T initialState, (string, Action<T>)[] sequencialOperations, (string, Action<T>)[] parallelOperations, int threads, int[] threadIds)
internal static void Run<T>(T state, (string, Action<T>)[] sequencialOperations, (string, Action<T>)[] parallelOperations, int threads, int[]? threadIds = null)
{
for (int i = 0; i < sequencialOperations.Length; i++)
sequencialOperations[i].Item2(initialState);
Exception? exception = null;
sequencialOperations[i].Item2(state);
var worker = new RunWorker<T>(state, parallelOperations, threadIds);
var runners = new Thread[threads];
while (--threads >= 0)
for (int i = 0; i < runners.Length; i++) runners[i] = new Thread(worker.Execute);
for (int i = 0; i < runners.Length; i++) runners[i].Start();
worker.Hold = false;
for (int i = 0; i < runners.Length; i++) runners[i].Join();
if (worker.Exception is not null) throw worker.Exception;
}

sealed class RunReplayWorker<T>(T state, (string, Action<T>)[] parallelOperations, int[] threadIds) : IThreadPoolWorkItem
{
int threadId = -1;
public volatile bool Hold = true;
public Exception? Exception;
public void Execute()
{
runners[threads] = new Thread(threadId =>
int i, opId = -1, tid = Interlocked.Increment(ref threadId);
while (Hold) { }
while ((i = Interlocked.Increment(ref opId)) < parallelOperations.Length)
{
int opId = -1, i = -1, tid = (int)threadId!;
while ((i = Interlocked.Increment(ref opId)) < parallelOperations.Length)
if (threadIds[i] == tid)
{
if (threadIds[i] == tid)
try { parallelOperations[i].Item2(state); }
catch (Exception e)
{
try { parallelOperations[i].Item2(initialState); }
catch (Exception e)
if (Exception is null)
{
if (exception is null)
{
exception = e;
Interlocked.Exchange(ref opId, parallelOperations.Length);
}
Exception = e;
opId = 1_000_000;
}
}
}
});
}
}
for (int i = 0; i < runners.Length; i++) runners[i].Start(i);
}

internal static void RunReplay<T>(T state, (string, Action<T>)[] sequencialOperations, (string, Action<T>)[] parallelOperations, int threads, int[] threadIds)
{
for (int i = 0; i < sequencialOperations.Length; i++)
sequencialOperations[i].Item2(state);
var worker = new RunReplayWorker<T>(state, parallelOperations, threadIds);
var runners = new Thread[threads];
for (int i = 0; i < runners.Length; i++) runners[i] = new Thread(worker.Execute);
for (int i = 0; i < runners.Length; i++) runners[i].Start();
worker.Hold = false;
for (int i = 0; i < runners.Length; i++) runners[i].Join();
if (exception is not null) throw exception;
if (worker.Exception is not null) throw worker.Exception;
}

internal static IEnumerable<T[]> Permutations<T>(int[] threadIds, T[] sequence)
Expand Down

0 comments on commit 3346ffc

Please sign in to comment.