Skip to content

Commit

Permalink
Support async generators
Browse files Browse the repository at this point in the history
  • Loading branch information
smart-massi committed Dec 1, 2023
1 parent 4753e5a commit 167eaa6
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 82 deletions.
31 changes: 8 additions & 23 deletions src/AsyncBatch/Queuer.ts
Original file line number Diff line number Diff line change
@@ -1,45 +1,30 @@
type TDataTypes<TDataType> = TDataType[] | Generator<TDataType>;
export type TQDataTypes<TDataType> = TDataType[] | Generator<TDataType> | AsyncGenerator<TDataType>;

/**
* Queuer is a simple class that allow you to push datas in a queue and pull them with a generator
*/
export default class Queuer<TDataType> {

/**
* It stores groups of pushed datas
* The original array datas are stored and never modified, it keeps the reference and avoid to copy the datas
*/
private stores: TDataTypes<TDataType>[] = [];
private _generator: Generator<TDataType> | null = null;

/**
* Push datas in the queue with one or more arguments
*/
public push(...datas: TDataType[]) {
this.stores.push(datas);
}

/**
* Push datas in the queue with one or more arguments at the beginning of the queue
*/
public unshift(...datas: TDataType[]) {
this.stores.unshift(datas);
}
private stores: TQDataTypes<TDataType>[] = [];
private _generator: Generator<TDataType> | AsyncGenerator<TDataType> | null = null;

/**
* Push many datas in the queue
* Push datas in the queue
*/
public pushMany(datas: TDataType[] | Generator<TDataType>) {
public push(datas: TQDataTypes<TDataType>) {
this.stores.push(datas);
}

/**
* Generator that pull datas from the queue
*/
private *generator() {
private async *generator() {
while (this.stores.length > 0) {
const store = this.stores.shift()!;
for (const data of store) {
const store = this.stores.shift() ?? [];
for await (const data of store) {
yield data;
}
}
Expand Down
91 changes: 47 additions & 44 deletions src/AsyncBatch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import EventPaused from "./Events/EventPaused";
import EventCleared from "./Events/EventCleared";
import EventBeforeCleare from "./Events/EventBeforeCleare";
import EventStart from "./Events/EventStart";
import Queuer from "./Queuer";
import Queuer, { TQDataTypes } from "./Queuer";

/**
* AsyncBatch is a Typescript library designed for performing batched asynchronous tasks while controlling concurrency, all without relying on external dependencies
Expand Down Expand Up @@ -47,7 +47,7 @@ export default class AsyncBatch<TDataType, TResponseType> {
* @description Create method give you more control, you can listen to events or handle start, pause, stop, add, clear, etc...
*/
public static create<TDataType, TResponseType>(
datas: TDataType[] | Generator<TDataType>,
datas: TQDataTypes<TDataType>,
action: (data: TDataType) => TResponseType,
options: Partial<ICreateOptions> = {},
): AsyncBatch<TDataType, TResponseType> {
Expand All @@ -60,30 +60,28 @@ export default class AsyncBatch<TDataType, TResponseType> {
* @description This is a lightwight version of create method
* @returns Promise<void> resolved when all jobs are done (empty queue)
*/
public static run<TDataType, TResponseType>(
datas: TDataType[] | Generator<TDataType>,
public static async run<TDataType, TResponseType>(
datas: TQDataTypes<TDataType>,
action: (data: TDataType) => TResponseType,
options: Omit<Partial<ICreateOptions>, "autoStart"> = {},
): Promise<void> {
const asyncBatch = new this(action, { ...options, autoStart: true }).addMany(datas);
setImmediate(() => asyncBatch.handleQueue());
return asyncBatch.events.onEmptyPromise().then(() => Promise.resolve());
await asyncBatch.events.onEmptyPromise();
}

/**
* @description Add data to the queue any time
*/
public add(...datas: TDataType[]): AsyncBatch<TDataType, TResponseType> {
this.queue.push(...datas);
this.unWaitNewDatas();
return this;
return this.addMany(datas);
}

/**
* @description Add many datas to the queue any time
*/
public addMany(datas: TDataType[] | Generator<TDataType>): AsyncBatch<TDataType, TResponseType> {
this.queue.pushMany(datas);
public addMany(datas: TQDataTypes<TDataType>): AsyncBatch<TDataType, TResponseType> {
this.queue.push(datas);
this.unWaitNewDatas();
return this;
}
Expand Down Expand Up @@ -201,57 +199,46 @@ export default class AsyncBatch<TDataType, TResponseType> {
let isAlreadyPaused = false;
const countdown = Countdown.new(this.options.rateLimit?.msTimeRange ?? 0);
let callNumber = 0;

/**
* @description terminate each loop step to let the next one start
* @description Terminate the process of the current data
*/
const endLoopStep = (data: TDataType, responseStored?: TResponseType, errorStored?: string | Error) => {
this.currentConcurrency--;
const processDataEnd = (data: TDataType, responseStored?: TResponseType, errorStored?: string | Error) => {
this.updateConcurrency(-1);
this.emit(new EventProcessingEnd(this, data, responseStored, errorStored));
deferredQueue.resolve(undefined);
deferredQueue = this.createDeferred();
this.mayEmitWaitingDatas(this.currentConcurrency);
};

/**
* @description Loop on the queue and call the action on each data
* @description Process the data
*/
const loopOnConcurrency = async (data: TDataType): Promise<void> => {
if (!(await this.shouldPreserveData(data))) return endLoopStep(data);
if (!this.emitProcessStarted(data)) return endLoopStep(data);
const processData = async (data: TDataType): Promise<{ data: TDataType; response?: Awaited<TResponseType>; error?: string | Error }> => {
if (!(await this.shouldPreserveData(data))) return { data };
if (!this.emitProcessStarted(data)) return { data };

try {
const responseStored = await this.callAction(data);
const eventObject = new EventProcessingSuccess(this, data, responseStored);

this.emit(eventObject);
endLoopStep(data, responseStored);
return;
} catch (error) {
const errorStored = error as string | Error;
const eventObject = new EventProcessingError(this, data, errorStored);

this.emit(eventObject);
endLoopStep(data, undefined, errorStored);
return;
const response = await this.callAction(data);
this.emit(new EventProcessingSuccess(this, data, response));
return { data, response };
} catch (e) {
const error = e as string | Error;
this.emit(new EventProcessingError(this, data, error));
return { data, error };
}
};

let storedValue: TDataType | null = null;
/**
* @description Loop on the queue and call the action on each data
*/
while (true) {
let isPreviouslyPaused = await this.forPause(isAlreadyPaused, (willPause) => {
isAlreadyPaused = willPause;
});

if (!storedValue) {
let nextData: IteratorResult<TDataType, void>;
do {
nextData = this.queue.pull().next();
} while (await this.mayWaitNewDatas(nextData, this.currentConcurrency));

if (nextData.done) throw new Error("Queue should not be empty at this point");

storedValue = nextData.value;
}
if (!storedValue) storedValue = await this.extractDataWhenReady();

if (isPreviouslyPaused || isPausedInit) {
isAlreadyPaused = false;
Expand All @@ -261,10 +248,10 @@ export default class AsyncBatch<TDataType, TResponseType> {
}

countdown.start();
this.currentConcurrency++;
let isMaxCalls = callNumber >= (this.options.rateLimit?.maxExecution ?? 0);
this.updateConcurrency(1);
const isMaxCalls = callNumber >= (this.options.rateLimit?.maxExecution ?? 0);
if (isMaxCalls && countdown.willWait()) {
this.currentConcurrency--;
this.updateConcurrency(-1);
await countdown.wait();
countdown.reload();
callNumber = 0;
Expand All @@ -273,7 +260,8 @@ export default class AsyncBatch<TDataType, TResponseType> {

callNumber++;

loopOnConcurrency(storedValue);
processData(storedValue).then(({ data, response, error }) => processDataEnd(data, response, error));

storedValue = null;

if (this.currentConcurrency === this.options.maxConcurrency) {
Expand All @@ -283,6 +271,10 @@ export default class AsyncBatch<TDataType, TResponseType> {
}
}

private updateConcurrency(value: 1 | -1): void {
this.currentConcurrency += value;
}

/**
* @description Handle the pause of the queue
*/
Expand Down Expand Up @@ -341,6 +333,17 @@ export default class AsyncBatch<TDataType, TResponseType> {
this.emit(eventStartedObject);
}

private async extractDataWhenReady(): Promise<TDataType> {
let nextData: IteratorResult<TDataType, void>;
do {
nextData = await this.queue.pull().next();
} while (await this.mayWaitNewDatas(nextData, this.currentConcurrency));

if (nextData.done) throw new Error("Queue should not be empty at this point");

return nextData.value;
}

/**
* @description Emit event for each started data
*/
Expand Down
2 changes: 1 addition & 1 deletion src/examples/basic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
*/
const simpleAction = async (data: number) => {
// Create a timoute to simulate a long action
await new Promise((resolve) => {
return await new Promise((resolve) => {
setTimeout(() => {
resolve((data + Math.random() * 4).toString());
console.log("processed", data);
Expand Down
46 changes: 32 additions & 14 deletions src/examples/generators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,24 @@
*/
import AsyncBatch from "../AsyncBatch";

function* datas(n: number) {
// Set the datas to process
const datas = [0 + n, 1 + n, 2 + n, 3 + n, 4 + n, 5 + n, 6 + n, 7 + n, 8 + n, 9 + n];
/**
* Here we mock a database with a function that return a paginated list of datas
*/
async function getDataFromDatabase(page: number, limit: number = 10) {
// Create a timoute to simulate a long action like a database request
await new Promise((resolve) => setTimeout(resolve, 16));
return Array.from({ length: limit }, (_, i) => i + page * limit);
}

for (const data of datas) {
yield data;
/**
* Here we create a generator that will fetch datas from the database
*/
async function* generateDatas(fromPage: number = 0, toPage: number = 10) {
for (let i = fromPage; i < toPage; i++) {
const datas = await getDataFromDatabase(i, 20);
for (const data of datas) {
yield data;
}
}
}

Expand All @@ -18,21 +30,26 @@ function* datas(n: number) {
* For our example we just add a random number to the data and return it as a string
* Obviously in a real use case you will do something more useful and return what ever you want
*/
const simpleAction = (data: number) => {
return (data + Math.random() * 4).toString();
const simpleAction = async (data: number) => {
// Create a timoute to simulate a long action
return await new Promise((resolve) => {
setTimeout(() => {
resolve((data + Math.random() * 4).toString());
}, 20);
});
};

/**
* Here is the AsyncBatch instantiation, we used a create method to give us more control
* We set the max concurrency to 4
* We set the rate limit to 8 executions per 200ms
* We set the rate limit to 100 executions per 200ms
*/
const asyncBatch = AsyncBatch.create(datas(0), simpleAction, {
const asyncBatch = AsyncBatch.create(generateDatas(0, 2), simpleAction, {
maxConcurrency: 4,
autoStart: true,
rateLimit: { msTimeRange: 200, maxExecution: 4 },
rateLimit: { msTimeRange: 100, maxExecution: 10 },
});
console.log("starting with max concurrency of 4 and rate limit of 4 per 200ms");
console.log("starting with max concurrency of 4 and rate limit of 10 per 100ms");
// All events are automatically garbage collected when the AsyncBatch is also garbage collected

let done = false;
Expand All @@ -42,15 +59,16 @@ let done = false;
*/
asyncBatch.events.onProcessingSuccess(({ type, data, response }) => {
console.log(type, { data, response });
if (data === 5 && !done) {
if (data === 2 && !done) {
done = true;
asyncBatch.addMany(datas(10));
console.log("added new datas from the processingSuccess event");
asyncBatch.addMany(generateDatas(2, 4));
}
});

(async () => {
await asyncBatch.events.onEmptyPromise();
asyncBatch.addMany(datas(20));
asyncBatch.addMany(generateDatas(4, 6));
console.log("added new datas to the queue after the end of the first batch");
await asyncBatch.events.onEmptyPromise();
console.log("end of the second batch");
Expand Down

0 comments on commit 167eaa6

Please sign in to comment.