Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wasm-mt] Support async JS interop on threadpool threads #84494

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
8c3fef3
[wasm-mt] Full JSInterop on threadpool workers
lambdageek Apr 7, 2023
a3db886
[wasm-mt] Add background interop to smoketest
lambdageek Apr 7, 2023
847ecd5
update to use the LowLevelLifoAsyncWaitSemaphore
lambdageek Apr 8, 2023
dd69c8f
adjust to renamed PortableThreadPool helper methods
lambdageek Apr 18, 2023
451d21b
adjust to renamed WebWorkerEventLoop.HasJavaScriptInteropDependents
lambdageek Apr 19, 2023
2889a1f
extend and rationalize the smoke test a bit
lambdageek Apr 19, 2023
92d80a5
remove old-Emscripten workaround hack
lambdageek Apr 19, 2023
5f4acc8
hide some debug output
lambdageek Apr 20, 2023
806b770
smoke test: dispose of the ImportAsync result after the task is done
lambdageek Apr 20, 2023
d2811f7
[wasm-mt] make JSHostImplementation.s_csOwnedObjects ThreadStatic
lambdageek Apr 20, 2023
48d22ae
remove locking on JSHostImplementation.CsOwnedObjects
lambdageek Apr 21, 2023
c813701
Merge remote-tracking branch 'origin/main' into pieces-wasm-threadpoo…
lambdageek Apr 24, 2023
0a4dd8c
Merge remote-tracking branch 'origin/main' into pieces-wasm-threadpoo…
lambdageek Apr 27, 2023
77a54ff
[threads] make the "external eventloop" platform independent
lambdageek Apr 27, 2023
fd99953
fix wasi and singlethreaded browser-wasm
lambdageek Apr 27, 2023
4671e03
Add a Thread.HasExternalEventLoop managed property
lambdageek Apr 27, 2023
e22ca46
rename JSHostImplementation.ThreadCsOwnedObjects
lambdageek Apr 27, 2023
c591501
[checked] assert GC Safe mode, when returning to external eventloop
lambdageek Apr 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2529,9 +2529,9 @@
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.ThreadCounts.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WaitThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerThread.NonBrowser.cs" Condition="'$(TargetsBrowser)' != 'true'"/>
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.WorkerTracking.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or '$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Unix.cs" Condition="'$(TargetsUnix)' == 'true' or ('$(TargetsBrowser)' == 'true' and '$(FeatureWasmThreads)' != 'true') or '$(TargetsWasi)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\PortableThreadPool.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\LowLevelLifoSemaphore.cs" />
<Compile Include="$(MSBuildThisFileDirectory)System\Threading\LowLevelLifoSemaphore.Windows.cs" Condition="'$(TargetsWindows)' == 'true'" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public static partial class ThreadPool
{
// Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that
// the runtime may use the thread for processing other work
#if !(TARGET_BROWSER && FEATURE_WASM_THREADS)
internal static bool YieldFromDispatchLoop => false;
#endif

#if NATIVEAOT
private const bool IsWorkerTrackingEnabledInConfig = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,15 @@ internal static void PreventTrimming()

public static void GetCSOwnedObjectByJSHandleRef(nint jsHandle, int shouldAddInflight, out JSObject? result)
{
lock (JSHostImplementation.s_csOwnedObjects)
if (JSHostImplementation.CsOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference))
{
if (JSHostImplementation.s_csOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference))
reference.TryGetTarget(out JSObject? jsObject);
if (shouldAddInflight != 0)
{
reference.TryGetTarget(out JSObject? jsObject);
if (shouldAddInflight != 0)
{
jsObject?.AddInFlight();
}
result = jsObject;
return;
jsObject?.AddInFlight();
}
result = jsObject;
return;
}
result = null;
}
Expand Down Expand Up @@ -77,14 +74,12 @@ public static void CreateCSOwnedProxyRef(nint jsHandle, LegacyHostImplementation

JSObject? res = null;

lock (JSHostImplementation.s_csOwnedObjects)
if (!JSHostImplementation.CsOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference) ||
!reference.TryGetTarget(out res) ||
res.IsDisposed)
{
if (!JSHostImplementation.s_csOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference) ||
!reference.TryGetTarget(out res) ||
res.IsDisposed)
{
#pragma warning disable CS0612 // Type or member is obsolete
res = mappedType switch
res = mappedType switch
{
LegacyHostImplementation.MappedType.JSObject => new JSObject(jsHandle),
LegacyHostImplementation.MappedType.Array => new Array(jsHandle),
Expand All @@ -95,8 +90,7 @@ public static void CreateCSOwnedProxyRef(nint jsHandle, LegacyHostImplementation
_ => throw new ArgumentOutOfRangeException(nameof(mappedType))
};
#pragma warning restore CS0612 // Type or member is obsolete
JSHostImplementation.s_csOwnedObjects[(int)jsHandle] = new WeakReference<JSObject>(res, trackResurrection: true);
}
JSHostImplementation.CsOwnedObjects[(int)jsHandle] = new WeakReference<JSObject>(res, trackResurrection: true);
}
if (shouldAddInflight != 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,20 @@ internal static partial class JSHostImplementation
private const string TaskGetResultName = "get_Result";
private static MethodInfo? s_taskGetResultMethodInfo;
// we use this to maintain identity of JSHandle for a JSObject proxy
public static readonly Dictionary<int, WeakReference<JSObject>> s_csOwnedObjects = new Dictionary<int, WeakReference<JSObject>>();
#if FEATURE_WASM_THREADS
[ThreadStatic]
#endif
private static Dictionary<int, WeakReference<JSObject>>? s_csOwnedObjects;

public static Dictionary<int, WeakReference<JSObject>> CsOwnedObjects
lambdageek marked this conversation as resolved.
Show resolved Hide resolved
{
get
{
s_csOwnedObjects ??= new ();
return s_csOwnedObjects;
}
}

// we use this to maintain identity of GCHandle for a managed object
public static Dictionary<object, IntPtr> s_gcHandleFromJSOwnedObject = new Dictionary<object, IntPtr>(ReferenceEqualityComparer.Instance);

Expand All @@ -24,10 +37,7 @@ public static void ReleaseCSOwnedObject(nint jsHandle)
{
if (jsHandle != IntPtr.Zero)
{
lock (s_csOwnedObjects)
{
s_csOwnedObjects.Remove((int)jsHandle);
}
CsOwnedObjects.Remove((int)jsHandle);
Interop.Runtime.ReleaseCSOwnedObject(jsHandle);
}
}
Expand Down Expand Up @@ -175,17 +185,14 @@ public static unsafe void FreeMethodSignatureBuffer(JSFunctionBinding signature)

public static JSObject CreateCSOwnedProxy(nint jsHandle)
{
JSObject? res = null;
JSObject? res;

lock (s_csOwnedObjects)
if (!CsOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference) ||
!reference.TryGetTarget(out res) ||
res.IsDisposed)
{
if (!s_csOwnedObjects.TryGetValue((int)jsHandle, out WeakReference<JSObject>? reference) ||
!reference.TryGetTarget(out res) ||
res.IsDisposed)
{
res = new JSObject(jsHandle);
s_csOwnedObjects[(int)jsHandle] = new WeakReference<JSObject>(res, trackResurrection: true);
}
res = new JSObject(jsHandle);
CsOwnedObjects[(int)jsHandle] = new WeakReference<JSObject>(res, trackResurrection: true);
}
return res;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ public static void ReleaseInFlight(object obj)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void RegisterCSOwnedObject(JSObject proxy)
{
lock (JSHostImplementation.s_csOwnedObjects)
{
JSHostImplementation.s_csOwnedObjects[(int)proxy.JSHandle] = new WeakReference<JSObject>(proxy, trackResurrection: true);
}
JSHostImplementation.CsOwnedObjects[(int)proxy.JSHandle] = new WeakReference<JSObject>(proxy, trackResurrection: true);
}

public static MarshalType GetMarshalTypeFromType(Type type)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,9 @@
<ItemGroup Condition="('$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true') and '$(FeatureWasmThreads)' == 'true'">
<Compile Include="$(BclSourcesRoot)\System\Threading\ThreadPoolBoundHandle.Browser.Threads.Mono.cs" />
<Compile Include="$(BclSourcesRoot)\System\Threading\LowLevelLifoAsyncWaitSemaphore.Browser.Threads.Mono.cs" />
<Compile Include="$(BclSourcesRoot)\System\Threading\PortableThreadPool.Browser.Threads.Mono.cs" />
<Compile Include="$(BclSourcesRoot)\System\Threading\PortableThreadPool.WorkerThread.Browser.Threads.Mono.cs" />
<Compile Include="$(BclSourcesRoot)\System\Threading\ThreadPool.Browser.Threads.Mono.cs" />
<Compile Include="$(BclSourcesRoot)\System\Threading\WebWorkerEventLoop.Browser.Threads.Mono.cs" />
</ItemGroup>
<ItemGroup Condition="('$(TargetsBrowser)' == 'true' or '$(TargetsWasi)' == 'true') and '$(FeatureWasmThreads)' != 'true'">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace System.Threading;

internal sealed partial class PortableThreadPool
{
private static partial class WorkerThread
{
private static bool IsIOPending => WebWorkerEventLoop.HasJavaScriptInteropDependents;
}

private struct CpuUtilizationReader
{
#pragma warning disable CA1822
public double CurrentUtilization => 0.0; // FIXME: can we do better
#pragma warning restore CA1822
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Diagnostics.Tracing;
using System.Runtime.CompilerServices;

namespace System.Threading
{
internal sealed partial class PortableThreadPool
{
/// <summary>
/// The worker thread infastructure for the CLR thread pool.
/// </summary>
private static partial class WorkerThread
{
/// <summary>
/// Semaphore for controlling how many threads are currently working.
/// </summary>
private static readonly LowLevelLifoAsyncWaitSemaphore s_semaphore =
new LowLevelLifoAsyncWaitSemaphore(
0,
MaxPossibleThreadCount,
AppContextConfigHelper.GetInt32Config(
"System.Threading.ThreadPool.UnfairSemaphoreSpinLimit",
SemaphoreSpinCountDefault,
false),
onWait: () =>
{
if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadWait(
(uint)ThreadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}
});

private static readonly ThreadStart s_workerThreadStart = WorkerThreadStart;

private sealed record SemaphoreWaitState(PortableThreadPool ThreadPoolInstance, LowLevelLock ThreadAdjustmentLock, WebWorkerEventLoop.KeepaliveToken KeepaliveToken)
{
public bool SpinWait = true;

public void ResetIteration() {
SpinWait = true;
}
}

private static void WorkerThreadStart()
{
Thread.CurrentThread.SetThreadPoolWorkerThreadName();

PortableThreadPool threadPoolInstance = ThreadPoolInstance;

if (NativeRuntimeEventSource.Log.IsEnabled())
{
NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadStart(
(uint)threadPoolInstance._separated.counts.VolatileRead().NumExistingThreads);
}

LowLevelLock threadAdjustmentLock = threadPoolInstance._threadAdjustmentLock;
var keepaliveToken = WebWorkerEventLoop.KeepalivePush();
SemaphoreWaitState state = new(threadPoolInstance, threadAdjustmentLock, keepaliveToken) { SpinWait = true };
// set up the callbacks for semaphore waits, tell
// emscripten to keep the thread alive, and return to
// the JS event loop.
WaitForWorkLoop(s_semaphore, state);
// return from thread start with keepalive - the thread will stay alive in the JS event loop
}

private static readonly Action<LowLevelLifoAsyncWaitSemaphore, object?> s_WorkLoopSemaphoreSuccess = new(WorkLoopSemaphoreSuccess);
private static readonly Action<LowLevelLifoAsyncWaitSemaphore, object?> s_WorkLoopSemaphoreTimedOut = new(WorkLoopSemaphoreTimedOut);

private static void WaitForWorkLoop(LowLevelLifoAsyncWaitSemaphore semaphore, SemaphoreWaitState state)
{
semaphore.PrepareAsyncWait(ThreadPoolThreadTimeoutMs, s_WorkLoopSemaphoreSuccess, s_WorkLoopSemaphoreTimedOut, state);
// thread should still be kept alive
Debug.Assert(state.KeepaliveToken.Valid);
}

private static void WorkLoopSemaphoreSuccess(LowLevelLifoAsyncWaitSemaphore semaphore, object? stateObject)
{
SemaphoreWaitState state = (SemaphoreWaitState)stateObject!;
WorkerDoWork(state.ThreadPoolInstance, ref state.SpinWait);
// Go around the loop one more time, keeping existing mutated state
WaitForWorkLoop(semaphore, state);
}

private static void WorkLoopSemaphoreTimedOut(LowLevelLifoAsyncWaitSemaphore semaphore, object? stateObject)
{
SemaphoreWaitState state = (SemaphoreWaitState)stateObject!;
if (ShouldExitWorker(state.ThreadPoolInstance, state.ThreadAdjustmentLock)) {
// we're done, kill the thread.

// we're wrapped in an emscripten eventloop handler which will consult the
// keepalive count, destroy the thread and run the TLS dtor which will
// unregister the thread from Mono
state.KeepaliveToken.Pop();
return;
} else {
// more work showed up while we were shutting down, go around one more time
state.ResetIteration();
WaitForWorkLoop(semaphore, state);
}
}

private static void CreateWorkerThread()
{
// Thread pool threads must start in the default execution context without transferring the context, so
// using captureContext: false.
Thread workerThread = new Thread(s_workerThreadStart);
workerThread.IsThreadPoolThread = true;
workerThread.IsBackground = true;
// thread name will be set in thread proc

// This thread will return to the JS event loop - tell the runtime not to cleanup
// after the start function returns, if the Emscripten keepalive is non-zero.
WebWorkerEventLoop.StartExitable(workerThread, captureContext: false);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace System.Threading
{
public static partial class ThreadPool
{
// Indicates that the threadpool should yield the thread from the dispatch loop to the
// runtime periodically. We use this to return back to the JS event loop so that the JS
// event queue can be drained
internal static bool YieldFromDispatchLoop => true;
}
}
3 changes: 3 additions & 0 deletions src/mono/mono/metadata/threads-types.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ typedef enum {
MONO_THREAD_CREATE_FLAGS_DEBUGGER = 0x02,
MONO_THREAD_CREATE_FLAGS_FORCE_CREATE = 0x04,
MONO_THREAD_CREATE_FLAGS_SMALL_STACK = 0x08,
#if defined(HOST_BROWSER) && !defined(DISABLE_THREADS)
MONO_THREAD_CREATE_FLAGS_RETURNS_TO_JS_EVENT_LOOP = 0x10,
#endif
} MonoThreadCreateFlags;

MONO_COMPONENT_API MonoInternalThread*
Expand Down
Loading