diff --git a/src/AsyncBatch/Queuer.ts b/src/AsyncBatch/Queuer.ts index 1f3c80d..868a514 100644 --- a/src/AsyncBatch/Queuer.ts +++ b/src/AsyncBatch/Queuer.ts @@ -1,45 +1,30 @@ -type TDataTypes = TDataType[] | Generator; +export type TQDataTypes = TDataType[] | Generator | AsyncGenerator; /** * Queuer is a simple class that allow you to push datas in a queue and pull them with a generator */ export default class Queuer { - /** * It stores groups of pushed datas * The original array datas are stored and never modified, it keeps the reference and avoid to copy the datas */ - private stores: TDataTypes[] = []; - private _generator: Generator | null = null; - - /** - * Push datas in the queue with one or more arguments - */ - public push(...datas: TDataType[]) { - this.stores.push(datas); - } - - /** - * Push datas in the queue with one or more arguments at the beginning of the queue - */ - public unshift(...datas: TDataType[]) { - this.stores.unshift(datas); - } + private stores: TQDataTypes[] = []; + private _generator: Generator | AsyncGenerator | null = null; /** - * Push many datas in the queue + * Push datas in the queue */ - public pushMany(datas: TDataType[] | Generator) { + public push(datas: TQDataTypes) { this.stores.push(datas); } /** * Generator that pull datas from the queue */ - private *generator() { + private async *generator() { while (this.stores.length > 0) { - const store = this.stores.shift()!; - for (const data of store) { + const store = this.stores.shift() ?? []; + for await (const data of store) { yield data; } } diff --git a/src/AsyncBatch/index.ts b/src/AsyncBatch/index.ts index af4c0d3..2b9e1bf 100644 --- a/src/AsyncBatch/index.ts +++ b/src/AsyncBatch/index.ts @@ -13,7 +13,7 @@ import EventPaused from "./Events/EventPaused"; import EventCleared from "./Events/EventCleared"; import EventBeforeCleare from "./Events/EventBeforeCleare"; import EventStart from "./Events/EventStart"; -import Queuer from "./Queuer"; +import Queuer, { TQDataTypes } from "./Queuer"; /** * AsyncBatch is a Typescript library designed for performing batched asynchronous tasks while controlling concurrency, all without relying on external dependencies @@ -47,7 +47,7 @@ export default class AsyncBatch { * @description Create method give you more control, you can listen to events or handle start, pause, stop, add, clear, etc... */ public static create( - datas: TDataType[] | Generator, + datas: TQDataTypes, action: (data: TDataType) => TResponseType, options: Partial = {}, ): AsyncBatch { @@ -60,30 +60,28 @@ export default class AsyncBatch { * @description This is a lightwight version of create method * @returns Promise resolved when all jobs are done (empty queue) */ - public static run( - datas: TDataType[] | Generator, + public static async run( + datas: TQDataTypes, action: (data: TDataType) => TResponseType, options: Omit, "autoStart"> = {}, ): Promise { const asyncBatch = new this(action, { ...options, autoStart: true }).addMany(datas); setImmediate(() => asyncBatch.handleQueue()); - return asyncBatch.events.onEmptyPromise().then(() => Promise.resolve()); + await asyncBatch.events.onEmptyPromise(); } /** * @description Add data to the queue any time */ public add(...datas: TDataType[]): AsyncBatch { - this.queue.push(...datas); - this.unWaitNewDatas(); - return this; + return this.addMany(datas); } /** * @description Add many datas to the queue any time */ - public addMany(datas: TDataType[] | Generator): AsyncBatch { - this.queue.pushMany(datas); + public addMany(datas: TQDataTypes): AsyncBatch { + this.queue.push(datas); this.unWaitNewDatas(); return this; } @@ -201,11 +199,12 @@ export default class AsyncBatch { let isAlreadyPaused = false; const countdown = Countdown.new(this.options.rateLimit?.msTimeRange ?? 0); let callNumber = 0; + /** - * @description terminate each loop step to let the next one start + * @description Terminate the process of the current data */ - const endLoopStep = (data: TDataType, responseStored?: TResponseType, errorStored?: string | Error) => { - this.currentConcurrency--; + const processDataEnd = (data: TDataType, responseStored?: TResponseType, errorStored?: string | Error) => { + this.updateConcurrency(-1); this.emit(new EventProcessingEnd(this, data, responseStored, errorStored)); deferredQueue.resolve(undefined); deferredQueue = this.createDeferred(); @@ -213,45 +212,33 @@ export default class AsyncBatch { }; /** - * @description Loop on the queue and call the action on each data + * @description Process the data */ - const loopOnConcurrency = async (data: TDataType): Promise => { - if (!(await this.shouldPreserveData(data))) return endLoopStep(data); - if (!this.emitProcessStarted(data)) return endLoopStep(data); + const processData = async (data: TDataType): Promise<{ data: TDataType; response?: Awaited; error?: string | Error }> => { + if (!(await this.shouldPreserveData(data))) return { data }; + if (!this.emitProcessStarted(data)) return { data }; try { - const responseStored = await this.callAction(data); - const eventObject = new EventProcessingSuccess(this, data, responseStored); - - this.emit(eventObject); - endLoopStep(data, responseStored); - return; - } catch (error) { - const errorStored = error as string | Error; - const eventObject = new EventProcessingError(this, data, errorStored); - - this.emit(eventObject); - endLoopStep(data, undefined, errorStored); - return; + const response = await this.callAction(data); + this.emit(new EventProcessingSuccess(this, data, response)); + return { data, response }; + } catch (e) { + const error = e as string | Error; + this.emit(new EventProcessingError(this, data, error)); + return { data, error }; } }; let storedValue: TDataType | null = null; + /** + * @description Loop on the queue and call the action on each data + */ while (true) { let isPreviouslyPaused = await this.forPause(isAlreadyPaused, (willPause) => { isAlreadyPaused = willPause; }); - if (!storedValue) { - let nextData: IteratorResult; - do { - nextData = this.queue.pull().next(); - } while (await this.mayWaitNewDatas(nextData, this.currentConcurrency)); - - if (nextData.done) throw new Error("Queue should not be empty at this point"); - - storedValue = nextData.value; - } + if (!storedValue) storedValue = await this.extractDataWhenReady(); if (isPreviouslyPaused || isPausedInit) { isAlreadyPaused = false; @@ -261,10 +248,10 @@ export default class AsyncBatch { } countdown.start(); - this.currentConcurrency++; - let isMaxCalls = callNumber >= (this.options.rateLimit?.maxExecution ?? 0); + this.updateConcurrency(1); + const isMaxCalls = callNumber >= (this.options.rateLimit?.maxExecution ?? 0); if (isMaxCalls && countdown.willWait()) { - this.currentConcurrency--; + this.updateConcurrency(-1); await countdown.wait(); countdown.reload(); callNumber = 0; @@ -273,7 +260,8 @@ export default class AsyncBatch { callNumber++; - loopOnConcurrency(storedValue); + processData(storedValue).then(({ data, response, error }) => processDataEnd(data, response, error)); + storedValue = null; if (this.currentConcurrency === this.options.maxConcurrency) { @@ -283,6 +271,10 @@ export default class AsyncBatch { } } + private updateConcurrency(value: 1 | -1): void { + this.currentConcurrency += value; + } + /** * @description Handle the pause of the queue */ @@ -341,6 +333,17 @@ export default class AsyncBatch { this.emit(eventStartedObject); } + private async extractDataWhenReady(): Promise { + let nextData: IteratorResult; + do { + nextData = await this.queue.pull().next(); + } while (await this.mayWaitNewDatas(nextData, this.currentConcurrency)); + + if (nextData.done) throw new Error("Queue should not be empty at this point"); + + return nextData.value; + } + /** * @description Emit event for each started data */ diff --git a/src/examples/basic.ts b/src/examples/basic.ts index e800eea..f0946aa 100644 --- a/src/examples/basic.ts +++ b/src/examples/basic.ts @@ -13,7 +13,7 @@ const datas = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; */ const simpleAction = async (data: number) => { // Create a timoute to simulate a long action - await new Promise((resolve) => { + return await new Promise((resolve) => { setTimeout(() => { resolve((data + Math.random() * 4).toString()); console.log("processed", data); diff --git a/src/examples/generators.ts b/src/examples/generators.ts index 1f5b7da..b9756d0 100644 --- a/src/examples/generators.ts +++ b/src/examples/generators.ts @@ -4,12 +4,24 @@ */ import AsyncBatch from "../AsyncBatch"; -function* datas(n: number) { - // Set the datas to process - const datas = [0 + n, 1 + n, 2 + n, 3 + n, 4 + n, 5 + n, 6 + n, 7 + n, 8 + n, 9 + n]; +/** + * Here we mock a database with a function that return a paginated list of datas + */ +async function getDataFromDatabase(page: number, limit: number = 10) { + // Create a timoute to simulate a long action like a database request + await new Promise((resolve) => setTimeout(resolve, 16)); + return Array.from({ length: limit }, (_, i) => i + page * limit); +} - for (const data of datas) { - yield data; +/** + * Here we create a generator that will fetch datas from the database + */ +async function* generateDatas(fromPage: number = 0, toPage: number = 10) { + for (let i = fromPage; i < toPage; i++) { + const datas = await getDataFromDatabase(i, 20); + for (const data of datas) { + yield data; + } } } @@ -18,21 +30,26 @@ function* datas(n: number) { * For our example we just add a random number to the data and return it as a string * Obviously in a real use case you will do something more useful and return what ever you want */ -const simpleAction = (data: number) => { - return (data + Math.random() * 4).toString(); +const simpleAction = async (data: number) => { + // Create a timoute to simulate a long action + return await new Promise((resolve) => { + setTimeout(() => { + resolve((data + Math.random() * 4).toString()); + }, 20); + }); }; /** * Here is the AsyncBatch instantiation, we used a create method to give us more control * We set the max concurrency to 4 - * We set the rate limit to 8 executions per 200ms + * We set the rate limit to 100 executions per 200ms */ -const asyncBatch = AsyncBatch.create(datas(0), simpleAction, { +const asyncBatch = AsyncBatch.create(generateDatas(0, 2), simpleAction, { maxConcurrency: 4, autoStart: true, - rateLimit: { msTimeRange: 200, maxExecution: 4 }, + rateLimit: { msTimeRange: 100, maxExecution: 10 }, }); -console.log("starting with max concurrency of 4 and rate limit of 4 per 200ms"); +console.log("starting with max concurrency of 4 and rate limit of 10 per 100ms"); // All events are automatically garbage collected when the AsyncBatch is also garbage collected let done = false; @@ -42,15 +59,16 @@ let done = false; */ asyncBatch.events.onProcessingSuccess(({ type, data, response }) => { console.log(type, { data, response }); - if (data === 5 && !done) { + if (data === 2 && !done) { done = true; - asyncBatch.addMany(datas(10)); + console.log("added new datas from the processingSuccess event"); + asyncBatch.addMany(generateDatas(2, 4)); } }); (async () => { await asyncBatch.events.onEmptyPromise(); - asyncBatch.addMany(datas(20)); + asyncBatch.addMany(generateDatas(4, 6)); console.log("added new datas to the queue after the end of the first batch"); await asyncBatch.events.onEmptyPromise(); console.log("end of the second batch");