Skip to content

Commit

Permalink
fix(withLock): improve queue efficiency
Browse files Browse the repository at this point in the history
  • Loading branch information
giladgd committed Dec 19, 2024
1 parent 002d6c9 commit 79a98d2
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 95 deletions.
2 changes: 1 addition & 1 deletion src/DisposedError.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* This error is throw when an object is disposed and a method is called on it.
* This error is thrown when an object is disposed and a method is called on it.
* You can use this error to check if an error is caused by a disposed object.
* You can also use this error to throw when an object is disposed and a method is called on it.
*/
Expand Down
145 changes: 51 additions & 94 deletions src/withLock.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const locks = new Map<any, Map<string, Promise<any>>>();
const locks = new Map<any, Map<string, [queue: (() => void)[], onDelete: (() => void)[]]>>();

/**
* Only allow one instance of the callback to run at a time for a given `scope` and `key`.
Expand All @@ -25,44 +25,38 @@ export async function withLock<ReturnType, const ScopeType = any>(
if (callback == null)
throw new Error("callback is required");

while (locks.get(scope)?.has(key)) {
if (acquireLockSignal?.aborted)
throw acquireLockSignal.reason;
if (acquireLockSignal?.aborted)
throw acquireLockSignal.reason;

try {
if (acquireLockSignal != null) {
const acquireLockPromise = createAbortSignalAbortPromise(acquireLockSignal);

await Promise.race([
acquireLockPromise.promise,
locks.get(scope)?.get(key)
]);

acquireLockPromise.dispose();
} else
await locks.get(scope)?.get(key);
} catch (err) {
// we only need to wait here for the promise to resolve, we don't care about the result
}

if (acquireLockSignal?.aborted)
throw acquireLockSignal.reason;
let keyMap = locks.get(scope);
if (keyMap == null) {
keyMap = new Map();
locks.set(scope, keyMap);
}

const promise = callback.call(scope);

if (!locks.has(scope))
locks.set(scope, new Map());

locks.get(scope)!.set(key, promise);
let [queue, onDelete] = keyMap.get(key) || [];
if (queue != null && onDelete != null)
await createQueuePromise(queue, acquireLockSignal);
else {
queue = [];
onDelete = [];
keyMap.set(key, [queue, onDelete]);
}

try {
return await promise;
return await callback.call(scope);
} finally {
locks.get(scope)?.delete(key);
if (queue.length > 0)
queue.shift()!();
else {
locks.get(scope)?.delete(key);

if (locks.get(scope)?.size === 0)
locks.delete(scope);

if (locks.get(scope)?.size === 0)
locks.delete(scope);
while (onDelete.length > 0)
onDelete.shift()!();
}
}
}

Expand Down Expand Up @@ -108,43 +102,14 @@ export async function acquireLock<S = any, K extends string = string>(
* Wait for a lock to be released for a given `scope` and `key`.
*/
export async function waitForLockRelease(scope: any, key: string, signal?: AbortSignal): Promise<void> {
// eslint-disable-next-line no-constant-condition
while (true) {
if (signal?.aborted)
throw signal.reason;

try {
if (signal != null) {
const signalPromise = createAbortSignalAbortPromise(signal);

await Promise.race([
signalPromise.promise,
locks.get(scope)?.get(key)
]);

signalPromise.dispose();
} else
await locks.get(scope)?.get(key);
} catch (err) {
// we only need to wait here for the promise to resolve, we don't care about the result
}

if (signal?.aborted)
throw signal.reason;

if (locks.get(scope)?.has(key))
continue;

await Promise.resolve(); // wait for a microtask to run, so other pending locks can be registered

if (signal?.aborted)
throw signal.reason;

if (locks.get(scope)?.has(key))
continue;
if (signal?.aborted)
throw signal.reason;

const [queue, onDelete] = locks.get(scope)?.get(key) ?? [];
if (queue == null || onDelete == null)
return;
}

await createQueuePromise(onDelete, signal);
}

export type Lock<S = any, K extends string = string> = {
Expand All @@ -154,36 +119,28 @@ export type Lock<S = any, K extends string = string> = {
[Symbol.dispose](): void
};

function createControlledPromise<T = any>() {
let resolve: (value: T | Promise<T>) => void;
let reject: (reason?: any) => void;
function createQueuePromise(queue: (() => void)[], signal?: AbortSignal) {
if (signal == null)
return new Promise<void>((accept) => void queue.push(accept));

const promise = new Promise<T>((accept, fail) => {
resolve = accept;
reject = fail;
});
return new Promise<void>((accept, reject) => {
function onAcquireLock() {
signal!.removeEventListener("abort", onAbort);
accept();
}

return {
promise,
resolve: resolve!,
reject: reject!
};
}
const queueLength = queue.length;

function createAbortSignalAbortPromise(signal: AbortSignal) {
const acquireLockPromise = createControlledPromise<void>();
function onAbort() {
const itemIndex = queue.lastIndexOf(onAcquireLock, queueLength);
if (itemIndex >= 0)
queue.splice(itemIndex, 1);

const onAbort = () => {
acquireLockPromise.resolve();
signal.removeEventListener("abort", onAbort);
};
signal.addEventListener("abort", onAbort);

return {
promise: acquireLockPromise.promise,
dispose() {
signal.removeEventListener("abort", onAbort);
signal!.removeEventListener("abort", onAbort);
reject(signal!.reason);
}
};
}

queue.push(onAcquireLock);
signal.addEventListener("abort", onAbort);
});
}
9 changes: 9 additions & 0 deletions test/withLock.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,21 @@ describe("withLock", () => {
const lock2Controller = new AbortController();
const lock2Promise = acquireLock(scope1, key1, lock2Controller.signal);

const lock3Controller = new AbortController();
const lock3Promise = acquireLock(scope1, key1, lock3Controller.signal);

lock2Controller.abort(new TestError());
await expect(lock2Promise).rejects.toBeInstanceOf(TestError);

lock1.dispose();
await new Promise((accept) => setTimeout(accept, 0));

const lock3 = await lock3Promise;
expect(isLockActive(scope1, key1)).toBe(true);

lock3.dispose();
await new Promise((accept) => setTimeout(accept, 0));

expect(isLockActive(scope1, key1)).toBe(false);
});

Expand Down

0 comments on commit 79a98d2

Please sign in to comment.