From 51ff30f16bb7a1bb57e71aadf6157d94eaec6ce5 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Tue, 29 Aug 2023 18:01:06 +0200 Subject: [PATCH 1/3] fix(batch): call script.after when activity.execute throws --- src/task/batch.spec.ts | 6 ++---- src/task/batch.ts | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/task/batch.spec.ts b/src/task/batch.spec.ts index 8fca4fada..aa4b0cadb 100644 --- a/src/task/batch.spec.ts +++ b/src/task/batch.spec.ts @@ -112,8 +112,7 @@ describe("Batch", () => { expect(spy).toHaveBeenCalled(); }); - // FIXME: Not working due to bug: JST-250 - xit("should call script.after() on execute error", async () => { + it("should call script.after() on execute error", async () => { const spy = jest.spyOn(batch["script"], "after"); jest.spyOn(activity, "execute").mockRejectedValue(new Error("ERROR")); @@ -187,8 +186,7 @@ describe("Batch", () => { expect(spy).toHaveBeenCalled(); }); - // FIXME: Not working due to bug: JST-250 - xit("should call script.after() on execute error", async () => { + it("should call script.after() on execute error", async () => { const spy = jest.spyOn(batch["script"], "after"); jest.spyOn(activity, "execute").mockRejectedValue(new Error("ERROR")); diff --git a/src/task/batch.ts b/src/task/batch.ts index 47de7043c..2e8839caa 100644 --- a/src/task/batch.ts +++ b/src/task/batch.ts @@ -73,7 +73,14 @@ export class Batch { async end(): Promise { await this.script.before(); await sleep(100, true); - const results = await this.activity.execute(this.script.getExeScriptRequest()); + let results: Readable; + try { + results = await this.activity.execute(this.script.getExeScriptRequest()); + } catch (error) { + // the original error is more important than the one from after() + await this.script.after([]).catch(); + throw error; + } const allResults: Result[] = []; return new Promise((resolve, reject) => { results.on("data", (res) => { @@ -99,7 +106,14 @@ export class Batch { async endStream(): Promise { const script = this.script; await script.before(); - const results = await this.activity.execute(this.script.getExeScriptRequest()); + let results: Readable; + try { + results = await this.activity.execute(this.script.getExeScriptRequest()); + } catch (error) { + // the original error is more important than the one from after() + await script.after([]).catch(); + throw error; + } const decodedResults: Result[] = []; const errorResultHandler = new Transform({ objectMode: true, From 0391c9ad5f56ac3e942a42f5004ac67b442899c4 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Tue, 29 Aug 2023 18:02:24 +0200 Subject: [PATCH 2/3] fix(batch): prevent stream from hanging when an error occurs --- src/task/batch.spec.ts | 18 ++++++++++++++++-- src/task/batch.ts | 8 +++----- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/task/batch.spec.ts b/src/task/batch.spec.ts index aa4b0cadb..8dd8c2461 100644 --- a/src/task/batch.spec.ts +++ b/src/task/batch.spec.ts @@ -167,8 +167,7 @@ describe("Batch", () => { expect(spy).toHaveBeenCalled(); }); - // FIXME: Not working due to bug: JST-252 - xit("should call script.after() on result stream error", async () => { + it("should call script.after() on result stream error", async () => { const spy = jest.spyOn(batch["script"], "after"); activity.mockResultFailure("FAILURE"); @@ -199,5 +198,20 @@ describe("Batch", () => { expect(spy).toHaveBeenCalled(); }); + + it("should destroy the stream on result stream error", async () => { + activity.mockResultFailure("FAILURE"); + const stream = await batch.endStream(); + try { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const r of stream) { + /* empty */ + } + fail("Expected to throw"); + } catch (e) { + /* empty */ + } + expect(stream.destroyed).toBe(true); + }); }); }); diff --git a/src/task/batch.ts b/src/task/batch.ts index 2e8839caa..a2d173cf2 100644 --- a/src/task/batch.ts +++ b/src/task/batch.ts @@ -2,7 +2,7 @@ import { DownloadFile, Run, Script, UploadFile } from "../script"; import { Activity, Result } from "../activity"; import { StorageProvider } from "../storage/provider"; import { Logger, sleep } from "../utils"; -import { Readable, Transform } from "stream"; +import { Readable, Transform, pipeline } from "stream"; import { UploadData } from "../script/command"; export class Batch { @@ -132,11 +132,9 @@ export class Batch { } }, }); - results.on("end", () => this.script.after(decodedResults).catch()); - results.on("error", (error) => { + const resultsWithErrorHandling = pipeline(results, errorResultHandler, () => { script.after(decodedResults).catch(); - results.destroy(error); }); - return results.pipe(errorResultHandler); + return resultsWithErrorHandling; } } From 45ff8860bad04b537b002e0c8702d9f2669dfa35 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Tue, 29 Aug 2023 18:04:22 +0200 Subject: [PATCH 3/3] chore: replace `any` with proper type --- src/activity/activity.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/activity/activity.ts b/src/activity/activity.ts index 80e8e723a..897404805 100644 --- a/src/activity/activity.ts +++ b/src/activity/activity.ts @@ -93,7 +93,7 @@ export class Activity { * @param timeout - execution timeout */ public async execute(script: ExeScriptRequest, stream?: boolean, timeout?: number): Promise { - let batchId, batchSize; + let batchId: string, batchSize: number; let startTime = new Date(); try { batchId = await this.send(script); @@ -169,7 +169,7 @@ export class Activity { private async pollingBatch(batchId, startTime, timeout): Promise { let isBatchFinished = false; - let lastIndex; + let lastIndex: number; let retryCount = 0; const maxRetries = 5; const { id: activityId, agreementId } = this;