diff --git a/src/withLock.ts b/src/withLock.ts index 27dda13..c338b4d 100644 --- a/src/withLock.ts +++ b/src/withLock.ts @@ -1,4 +1,4 @@ -const locks = new Map void)[], onDelete: (() => void)[]]>>(); +const locks = new Map>>(); /** * Only allow one instance of the callback to run at a time for a given `scope` and `key`. @@ -25,38 +25,44 @@ export async function withLock( if (callback == null) throw new Error("callback is required"); - if (acquireLockSignal?.aborted) - throw acquireLockSignal.reason; + while (locks.get(scope)?.has(key)) { + if (acquireLockSignal?.aborted) + throw acquireLockSignal.reason; - let keyMap = locks.get(scope); - if (keyMap == null) { - keyMap = new Map(); - locks.set(scope, keyMap); - } + try { + if (acquireLockSignal != null) { + const acquireLockPromise = createAbortSignalAbortPromise(acquireLockSignal); + + await Promise.race([ + acquireLockPromise.promise, + locks.get(scope)?.get(key) + ]); + + acquireLockPromise.dispose(); + } else + await locks.get(scope)?.get(key); + } catch (err) { + // we only need to wait here for the promise to resolve, we don't care about the result + } - let [queue, onDelete] = keyMap.get(key) || []; - if (queue != null && onDelete != null) - await createQueuePromise(queue, acquireLockSignal); - else { - queue = []; - onDelete = []; - keyMap.set(key, [queue, onDelete]); + if (acquireLockSignal?.aborted) + throw acquireLockSignal.reason; } + const promise = callback.call(scope); + + if (!locks.has(scope)) + locks.set(scope, new Map()); + + locks.get(scope)!.set(key, promise); + try { - return await callback.call(scope); + return await promise; } finally { - if (queue.length > 0) - queue.shift()!(); - else { - locks.get(scope)?.delete(key); - - if (locks.get(scope)?.size === 0) - locks.delete(scope); + locks.get(scope)?.delete(key); - while (onDelete.length > 0) - onDelete.shift()!(); - } + if (locks.get(scope)?.size === 0) + locks.delete(scope); } } @@ -70,45 +76,75 @@ export function isLockActive(scope: any, key: string): boolean { /** * Acquire a lock for a given `scope` and `key`. */ -export function acquireLock( +export async function acquireLock( scope: S, key: K, acquireLockSignal?: AbortSignal ): Promise> { - return new Promise>((accept, reject) => { - void withLock(scope, key, acquireLockSignal, () => { - let releaseLock: () => void; - const promise = new Promise((accept) => { - releaseLock = accept; - }); + let releaseLock: (param: null) => void; - accept({ - scope, - key, - dispose() { - releaseLock(); - }, - [Symbol.dispose]() { - releaseLock(); - } - }); + await new Promise((accept, reject) => { + void withLock(scope, key, acquireLockSignal, async () => { + accept(null); - return promise; + await new Promise((accept) => { + releaseLock = accept; + }); }) .catch(reject); }); + + return { + scope, + key, + dispose() { + releaseLock(null); + }, + [Symbol.dispose]() { + releaseLock(null); + } + }; } /** * Wait for a lock to be released for a given `scope` and `key`. */ export async function waitForLockRelease(scope: any, key: string, signal?: AbortSignal): Promise { - if (signal?.aborted) - throw signal.reason; + // eslint-disable-next-line no-constant-condition + while (true) { + if (signal?.aborted) + throw signal.reason; + + try { + if (signal != null) { + const signalPromise = createAbortSignalAbortPromise(signal); + + await Promise.race([ + signalPromise.promise, + locks.get(scope)?.get(key) + ]); + + signalPromise.dispose(); + } else + await locks.get(scope)?.get(key); + } catch (err) { + // we only need to wait here for the promise to resolve, we don't care about the result + } - const [queue, onDelete] = locks.get(scope)?.get(key) ?? []; - if (queue == null || onDelete == null) - return; + if (signal?.aborted) + throw signal.reason; + + if (locks.get(scope)?.has(key)) + continue; + + await Promise.resolve(); // wait for a microtask to run, so other pending locks can be registered - await createQueuePromise(onDelete, signal); + if (signal?.aborted) + throw signal.reason; + + if (locks.get(scope)?.has(key)) + continue; + + return; + } } export type Lock = { @@ -118,28 +154,36 @@ export type Lock = { [Symbol.dispose](): void }; -function createQueuePromise(queue: (() => void)[], signal?: AbortSignal) { - if (signal == null) - return new Promise((accept) => void queue.push(accept)); +function createControlledPromise() { + let resolve: (value: T | Promise) => void; + let reject: (reason?: any) => void; - return new Promise((accept, reject) => { - function onAcquireLock() { - signal!.removeEventListener("abort", onAbort); - accept(); - } + const promise = new Promise((accept, fail) => { + resolve = accept; + reject = fail; + }); - const queueLength = queue.length; + return { + promise, + resolve: resolve!, + reject: reject! + }; +} - function onAbort() { - const itemIndex = queue.lastIndexOf(onAcquireLock, queueLength); - if (itemIndex >= 0) - queue.splice(itemIndex, 1); +function createAbortSignalAbortPromise(signal: AbortSignal) { + const acquireLockPromise = createControlledPromise(); - signal!.removeEventListener("abort", onAbort); - reject(signal!.reason); - } + const onAbort = () => { + acquireLockPromise.resolve(); + signal.removeEventListener("abort", onAbort); + }; + signal.addEventListener("abort", onAbort); - queue.push(onAcquireLock); - signal.addEventListener("abort", onAbort); - }); + return { + promise: acquireLockPromise.promise, + dispose() { + signal.removeEventListener("abort", onAbort); + } + }; } +