Skip to content

Commit

Permalink
fix(withLock): revert improvements due to behavioral change
Browse files Browse the repository at this point in the history
  • Loading branch information
giladgd committed Dec 25, 2024
1 parent 3f3b33f commit bdc3b77
Showing 1 changed file with 113 additions and 69 deletions.
182 changes: 113 additions & 69 deletions src/withLock.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const locks = new Map<any, Map<string, [queue: (() => void)[], onDelete: (() => void)[]]>>();
const locks = new Map<any, Map<string, Promise<any>>>();

/**
* Only allow one instance of the callback to run at a time for a given `scope` and `key`.
Expand All @@ -25,38 +25,44 @@ export async function withLock<ReturnType, const ScopeType = any>(
if (callback == null)
throw new Error("callback is required");

if (acquireLockSignal?.aborted)
throw acquireLockSignal.reason;
while (locks.get(scope)?.has(key)) {
if (acquireLockSignal?.aborted)
throw acquireLockSignal.reason;

let keyMap = locks.get(scope);
if (keyMap == null) {
keyMap = new Map();
locks.set(scope, keyMap);
}
try {
if (acquireLockSignal != null) {
const acquireLockPromise = createAbortSignalAbortPromise(acquireLockSignal);

await Promise.race([
acquireLockPromise.promise,
locks.get(scope)?.get(key)
]);

acquireLockPromise.dispose();
} else
await locks.get(scope)?.get(key);
} catch (err) {
// we only need to wait here for the promise to resolve, we don't care about the result
}

let [queue, onDelete] = keyMap.get(key) || [];
if (queue != null && onDelete != null)
await createQueuePromise(queue, acquireLockSignal);
else {
queue = [];
onDelete = [];
keyMap.set(key, [queue, onDelete]);
if (acquireLockSignal?.aborted)
throw acquireLockSignal.reason;
}

const promise = callback.call(scope);

if (!locks.has(scope))
locks.set(scope, new Map());

locks.get(scope)!.set(key, promise);

try {
return await callback.call(scope);
return await promise;
} finally {
if (queue.length > 0)
queue.shift()!();
else {
locks.get(scope)?.delete(key);

if (locks.get(scope)?.size === 0)
locks.delete(scope);
locks.get(scope)?.delete(key);

while (onDelete.length > 0)
onDelete.shift()!();
}
if (locks.get(scope)?.size === 0)
locks.delete(scope);
}
}

Expand All @@ -70,45 +76,75 @@ export function isLockActive(scope: any, key: string): boolean {
/**
* Acquire a lock for a given `scope` and `key`.
*/
export function acquireLock<S = any, K extends string = string>(
export async function acquireLock<S = any, K extends string = string>(
scope: S, key: K, acquireLockSignal?: AbortSignal
): Promise<Lock<S, K>> {
return new Promise<Lock<S, K>>((accept, reject) => {
void withLock(scope, key, acquireLockSignal, () => {
let releaseLock: () => void;
const promise = new Promise<void>((accept) => {
releaseLock = accept;
});
let releaseLock: (param: null) => void;

accept({
scope,
key,
dispose() {
releaseLock();
},
[Symbol.dispose]() {
releaseLock();
}
});
await new Promise((accept, reject) => {
void withLock(scope, key, acquireLockSignal, async () => {
accept(null);

return promise;
await new Promise((accept) => {
releaseLock = accept;
});
})
.catch(reject);
});

return {
scope,
key,
dispose() {
releaseLock(null);
},
[Symbol.dispose]() {
releaseLock(null);
}
};
}

/**
* Wait for a lock to be released for a given `scope` and `key`.
*/
export async function waitForLockRelease(scope: any, key: string, signal?: AbortSignal): Promise<void> {
if (signal?.aborted)
throw signal.reason;
// eslint-disable-next-line no-constant-condition
while (true) {
if (signal?.aborted)
throw signal.reason;

try {
if (signal != null) {
const signalPromise = createAbortSignalAbortPromise(signal);

await Promise.race([
signalPromise.promise,
locks.get(scope)?.get(key)
]);

signalPromise.dispose();
} else
await locks.get(scope)?.get(key);
} catch (err) {
// we only need to wait here for the promise to resolve, we don't care about the result
}

const [queue, onDelete] = locks.get(scope)?.get(key) ?? [];
if (queue == null || onDelete == null)
return;
if (signal?.aborted)
throw signal.reason;

if (locks.get(scope)?.has(key))
continue;

await Promise.resolve(); // wait for a microtask to run, so other pending locks can be registered

await createQueuePromise(onDelete, signal);
if (signal?.aborted)
throw signal.reason;

if (locks.get(scope)?.has(key))
continue;

return;
}
}

export type Lock<S = any, K extends string = string> = {
Expand All @@ -118,28 +154,36 @@ export type Lock<S = any, K extends string = string> = {
[Symbol.dispose](): void
};

function createQueuePromise(queue: (() => void)[], signal?: AbortSignal) {
if (signal == null)
return new Promise<void>((accept) => void queue.push(accept));
function createControlledPromise<T = any>() {
let resolve: (value: T | Promise<T>) => void;
let reject: (reason?: any) => void;

return new Promise<void>((accept, reject) => {
function onAcquireLock() {
signal!.removeEventListener("abort", onAbort);
accept();
}
const promise = new Promise<T>((accept, fail) => {
resolve = accept;
reject = fail;
});

const queueLength = queue.length;
return {
promise,
resolve: resolve!,
reject: reject!
};
}

function onAbort() {
const itemIndex = queue.lastIndexOf(onAcquireLock, queueLength);
if (itemIndex >= 0)
queue.splice(itemIndex, 1);
function createAbortSignalAbortPromise(signal: AbortSignal) {
const acquireLockPromise = createControlledPromise<void>();

signal!.removeEventListener("abort", onAbort);
reject(signal!.reason);
}
const onAbort = () => {
acquireLockPromise.resolve();
signal.removeEventListener("abort", onAbort);
};
signal.addEventListener("abort", onAbort);

queue.push(onAcquireLock);
signal.addEventListener("abort", onAbort);
});
return {
promise: acquireLockPromise.promise,
dispose() {
signal.removeEventListener("abort", onAbort);
}
};
}

0 comments on commit bdc3b77

Please sign in to comment.