diff --git a/.github/actions/prepare-tests/action.yml b/.github/actions/prepare-tests/action.yml index 76d700a05..22a38f172 100644 --- a/.github/actions/prepare-tests/action.yml +++ b/.github/actions/prepare-tests/action.yml @@ -51,7 +51,7 @@ runs: shell: bash run: | echo "Going to build the SDK on the requestor" - docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i && npm run build" + docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && npm i --no-progress && npm run build" echo "Successfully built the SDK on the requestor" - name: Install Cypress @@ -59,3 +59,18 @@ runs: shell: bash run: | docker exec -t docker-requestor-1 /bin/sh -c "cd /golem-js && ./node_modules/.bin/cypress install" + + - name: Run a preliminary scan of offers + shell: bash + run: | + docker exec -t docker-requestor-1 /bin/sh -c "npm install --no-progress -g @prekucki/wait-for-n && wait-for-n --limit=6 --appkey=try_golem --subnet=$YAGNA_SUBNET" + + - name: List down sessions seen on that requestor + shell: bash + run: | + docker exec docker-requestor-1 /bin/sh -c "yagna net sessions" + + - name: Run a secondary scan of offers + shell: bash + run: | + docker exec -t docker-requestor-1 /bin/sh -c "npx --no-progress --yes @golem-sdk/cli market scan -k try_golem --subnet-tag $YAGNA_SUBNET --payment-network $PAYMENT_NETWORK" diff --git a/examples/experimental/express/server.ts b/examples/experimental/express/server.ts index 2a970081a..83ecdf502 100644 --- a/examples/experimental/express/server.ts +++ b/examples/experimental/express/server.ts @@ -1,14 +1,18 @@ import express from "express"; import { JobManager, JobState } from "@golem-sdk/golem-js/experimental"; +import { fileURLToPath } from "url"; const app = express(); const port = 3000; +// get the absolute path to the public directory in case this file is run from a different directory +const publicDirectoryPath = fileURLToPath(new URL("./public", import.meta.url)); + app.use(express.text()); -const golemClient = new JobManager(); +const jobManager = new JobManager(); -await golemClient +await jobManager .init() .then(() => { console.log("Connected to the Golem Network!"); @@ -23,19 +27,17 @@ app.post("/tts", async (req, res) => { res.status(400).send("Missing text parameter"); return; } - const job = golemClient.createJob({ + const job = jobManager.createJob({ demand: { - activity: { - imageTag: "severyn/espeak:latest", - }, + workload: { imageTag: "severyn/espeak:latest" }, }, market: { rentHours: 0.5, pricing: { model: "linear", - maxStartPrice: 1, - maxCpuPerHourPrice: 1, - maxEnvPerHourPrice: 1, + maxStartPrice: 0.5, + maxCpuPerHourPrice: 1.0, + maxEnvPerHourPrice: 0.5, }, }, }); @@ -58,15 +60,19 @@ app.post("/tts", async (req, res) => { await exe .beginBatch() .run(`espeak "${req.body}" -w /golem/output/output.wav`) - .downloadFile("/golem/output/output.wav", `public/${fileName}`) + .downloadFile("/golem/output/output.wav", `${publicDirectoryPath}/${fileName}`) .end(); return fileName; }); - res.send(`Job started! ID: ${job.id}`); + res.send( + `Job started! ID: ${job.id}\n` + + `You can check it's state by calling:\ncurl http://localhost:${port}/tts/${job.id}\n` + + `And it's results by calling:\ncurl http://localhost:${port}/tts/${job.id}/results\n`, + ); }); app.get("/tts/:id", async (req, res) => { - const job = golemClient.getJobById(req.params.id); + const job = jobManager.getJobById(req.params.id); if (!job) { res.status(404).send("Job not found"); return; @@ -75,10 +81,10 @@ app.get("/tts/:id", async (req, res) => { }); // serve files in the /public directory -app.use("/results", express.static("public")); +app.use("/results", express.static(publicDirectoryPath)); app.get("/tts/:id/results", async (req, res) => { - const job = golemClient.getJobById(req.params.id); + const job = jobManager.getJobById(req.params.id); if (!job) { res.status(404).send("Job not found"); return; @@ -89,7 +95,7 @@ app.get("/tts/:id/results", async (req, res) => { const results = await job.results; res.send( - `Job completed successfully! Open the following link in your browser to listen to the result: http://localhost:${port}/results/${results}`, + `Job completed successfully! Open the following link in your browser to listen to the result: http://localhost:${port}/results/${results}\n`, ); }); @@ -98,8 +104,9 @@ app.listen(port, () => { }); process.on("SIGINT", async () => { + console.log("Gracefully shutting down..."); // cancel and cleanup all running jobs - await golemClient.close(); + await jobManager.close(); process.exit(0); }); diff --git a/examples/package.json b/examples/package.json index f8f908bee..ceff24c42 100644 --- a/examples/package.json +++ b/examples/package.json @@ -36,7 +36,7 @@ }, "dependencies": { "@golem-sdk/golem-js": "file:..", - "@golem-sdk/pino-logger": "^1.0.1", + "@golem-sdk/pino-logger": "^1.0.2", "commander": "^12.0.0", "express": "^4.18.2", "tsx": "^4.7.1" diff --git a/examples/web/hello.html b/examples/web/hello.html index 0e022139d..9b3af5e5b 100644 --- a/examples/web/hello.html +++ b/examples/web/hello.html @@ -115,11 +115,13 @@

Results

await glm.connect(); appendResults("Request for renting a provider machine"); const rental = await glm.oneOf({ order }); + appendResults("Rented resources from", rental.agreement.provider.name); await rental .getExeUnit() .then(async (exe) => appendResults("Reply: " + (await exe.run(`echo 'Hello Golem! 👋 from ${exe.provider.name}!'`)).stdout), ); + appendResults("Finished all work with the resources"); await rental.stopAndFinalize(); appendResults("Finalized renting process"); } catch (err) { diff --git a/package-lock.json b/package-lock.json index d484ec3c9..c812c221d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -12,6 +12,7 @@ "examples/" ], "dependencies": { + "@golem-sdk/pino-logger": "^1.1.0", "async-lock": "^1.4.1", "async-retry": "^1.3.3", "axios": "^1.6.7", @@ -33,7 +34,7 @@ "devDependencies": { "@commitlint/cli": "^19.0.3", "@commitlint/config-conventional": "^19.0.3", - "@johanblumenberg/ts-mockito": "^1.0.41", + "@johanblumenberg/ts-mockito": "^1.0.43", "@rollup/plugin-alias": "^5.1.0", "@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-json": "^6.1.0", @@ -98,7 +99,7 @@ "license": "LGPL-3.0", "dependencies": { "@golem-sdk/golem-js": "file:..", - "@golem-sdk/pino-logger": "^1.0.1", + "@golem-sdk/pino-logger": "^1.0.2", "commander": "^12.0.0", "express": "^4.18.2", "tsx": "^4.7.1" @@ -1459,9 +1460,9 @@ "link": true }, "node_modules/@golem-sdk/pino-logger": { - "version": "1.0.3", - "resolved": "https://registry.npmjs.org/@golem-sdk/pino-logger/-/pino-logger-1.0.3.tgz", - "integrity": "sha512-P9BMJ+QUlWx7C+4iku/SOnNnjzGGBEhaHtOf4IDXrtvpEg8zqxEuyw5mM7PXpAr7HU/5C/f2BcBG39i7QYfwsw==", + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@golem-sdk/pino-logger/-/pino-logger-1.1.0.tgz", + "integrity": "sha512-FxVqTnx7ToKPCABzfEHhwXT2x/B4PWez6y6AG9AeMjH9DA/itY8kclv0DUgWPPRTW4jr1Qrf6ovZYLhPZ5o3Dw==", "dependencies": { "pino": "^8.20.0", "pino-pretty": "^11.0.0" @@ -2340,9 +2341,10 @@ } }, "node_modules/@johanblumenberg/ts-mockito": { - "version": "1.0.41", + "version": "1.0.43", + "resolved": "https://registry.npmjs.org/@johanblumenberg/ts-mockito/-/ts-mockito-1.0.43.tgz", + "integrity": "sha512-7C1JMJzYPLmW4/nFZHTQTog/wRnB45UUU3hEur1p7HJDlpQpybQWAbE8yA+mBk+95mOEfuIVNcebSAB/KGrE0w==", "dev": true, - "license": "MIT", "dependencies": { "lodash": "^4.17.20" } @@ -2963,9 +2965,9 @@ ] }, "node_modules/@rollup/rollup-darwin-x64": { - "version": "4.18.0", - "resolved": "https://registry.npmjs.org/@rollup/rollup-darwin-x64/-/rollup-darwin-x64-4.18.0.tgz", - "integrity": "sha512-n2LMsUz7Ynu7DoQrSQkBf8iNrjOGyPLrdSg802vk6XT3FtsgX6JbE8IHRvposskFm9SNxzkLYGSq9QdpLYpRNA==", + "version": "4.18.1", + "resolved": "https://registry.npmjs.org/@rollup/rollup-darwin-x64/-/rollup-darwin-x64-4.18.1.tgz", + "integrity": "sha512-IgpzXKauRe1Tafcej9STjSSuG0Ghu/xGYH+qG6JwsAUxXrnkvNHcq/NL6nz1+jzvWAnQkuAJ4uIwGB48K9OCGA==", "cpu": [ "x64" ], @@ -3079,9 +3081,9 @@ ] }, "node_modules/@rollup/rollup-win32-arm64-msvc": { - "version": "4.18.0", - "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-arm64-msvc/-/rollup-win32-arm64-msvc-4.18.0.tgz", - "integrity": "sha512-7J6TkZQFGo9qBKH0pk2cEVSRhJbL6MtfWxth7Y5YmZs57Pi+4x6c2dStAUvaQkHQLnEQv1jzBUW43GvZW8OFqA==", + "version": "4.18.1", + "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-arm64-msvc/-/rollup-win32-arm64-msvc-4.18.1.tgz", + "integrity": "sha512-W2ZNI323O/8pJdBGil1oCauuCzmVd9lDmWBBqxYZcOqWD6aWqJtVBQ1dFrF4dYpZPks6F+xCZHfzG5hYlSHZ6g==", "cpu": [ "arm64" ], @@ -3104,9 +3106,9 @@ ] }, "node_modules/@rollup/rollup-win32-x64-msvc": { - "version": "4.18.0", - "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-x64-msvc/-/rollup-win32-x64-msvc-4.18.0.tgz", - "integrity": "sha512-UOo5FdvOL0+eIVTgS4tIdbW+TtnBLWg1YBCcU2KWM7nuNwRz9bksDX1bekJJCpu25N1DVWaCwnT39dVQxzqS8g==", + "version": "4.18.1", + "resolved": "https://registry.npmjs.org/@rollup/rollup-win32-x64-msvc/-/rollup-win32-x64-msvc-4.18.1.tgz", + "integrity": "sha512-yjk2MAkQmoaPYCSu35RLJ62+dz358nE83VfTePJRp8CG7aMg25mEJYpXFiD+NcevhX8LxD5OP5tktPXnXN7GDw==", "cpu": [ "x64" ], @@ -3903,9 +3905,10 @@ "dev": true }, "node_modules/@types/ws": { - "version": "8.5.10", + "version": "8.5.11", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.5.11.tgz", + "integrity": "sha512-4+q7P5h3SpJxaBft0Dzpbr6lmMaqh0Jr2tbhJZ/luAwvD7ohSCniYkwz/pLxuT2h0EOa6QADgJj1Ko+TzRfZ+w==", "dev": true, - "license": "MIT", "dependencies": { "@types/node": "*" } @@ -18179,9 +18182,9 @@ } }, "node_modules/tsx": { - "version": "4.16.0", - "resolved": "https://registry.npmjs.org/tsx/-/tsx-4.16.0.tgz", - "integrity": "sha512-MPgN+CuY+4iKxGoJNPv+1pyo5YWZAQ5XfsyobUG+zoKG7IkvCPLZDEyoIb8yLS2FcWci1nlxAqmvPlFWD5AFiQ==", + "version": "4.16.2", + "resolved": "https://registry.npmjs.org/tsx/-/tsx-4.16.2.tgz", + "integrity": "sha512-C1uWweJDgdtX2x600HjaFaucXTilT7tgUZHbOE4+ypskZ1OP8CRCSDkCxG6Vya9EwaFIVagWwpaVAn5wzypaqQ==", "dependencies": { "esbuild": "~0.21.5", "get-tsconfig": "^4.7.5" @@ -18929,9 +18932,9 @@ "license": "ISC" }, "node_modules/ws": { - "version": "8.17.1", - "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", - "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "version": "8.18.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.18.0.tgz", + "integrity": "sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==", "engines": { "node": ">=10.0.0" }, diff --git a/package.json b/package.json index 1f3c42dc0..72ca267f1 100644 --- a/package.json +++ b/package.json @@ -18,11 +18,13 @@ "exports": { ".": { "types": "./dist/index.d.ts", + "browser": "./dist/golem-js.min.js", "import": "./dist/golem-js.mjs", "require": "./dist/golem-js.js" }, "./experimental": { - "types": "./dist/experimental.d.ts", + "types": "./dist/experimental/index.d.ts", + "browser": null, "import": "./dist/golem-js-experimental.mjs", "require": "./dist/golem-js-experimental.js" } @@ -61,6 +63,7 @@ "node": ">=18.0.0" }, "dependencies": { + "@golem-sdk/pino-logger": "^1.1.0", "async-lock": "^1.4.1", "async-retry": "^1.3.3", "axios": "^1.6.7", @@ -82,7 +85,7 @@ "devDependencies": { "@commitlint/cli": "^19.0.3", "@commitlint/config-conventional": "^19.0.3", - "@johanblumenberg/ts-mockito": "^1.0.41", + "@johanblumenberg/ts-mockito": "^1.0.43", "@rollup/plugin-alias": "^5.1.0", "@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-json": "^6.1.0", diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index 266fadf96..de768b190 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -65,7 +65,7 @@ export class ExeScriptExecutor { const batchId = await this.send(script); const batchSize = JSON.parse(script.text).length; - this.logger.debug(`Script sent.`, { batchId }); + this.logger.debug(`Script sent.`, { batchId, script }); return { batchId, batchSize }; } catch (error) { const message = getMessageFromApiError(error); @@ -175,7 +175,8 @@ export class ExeScriptExecutor { } } catch (error) { logger.debug(`Failed to fetch activity results. Attempt: ${attempt}. ${error}`); - if (RETRYABLE_ERROR_STATUS_CODES.includes(error?.status)) { + const errorStatus = error?.status ?? error.previous?.status; + if (RETRYABLE_ERROR_STATUS_CODES.includes(errorStatus)) { throw error; } else { bail(error); @@ -235,7 +236,7 @@ export class ExeScriptExecutor { private parseEventToResult(event: StreamingBatchEvent, batchSize: number): Result { // StreamingBatchEvent has a slightly more extensive structure, // including a return code that could be added to the Result entity... (?) - return new Result({ + const result = new Result({ index: event.index, eventDate: event.timestamp, result: event?.kind?.finished @@ -250,5 +251,9 @@ export class ExeScriptExecutor { message: event?.kind?.finished?.message, isBatchFinished: event.index + 1 >= batchSize && Boolean(event?.kind?.finished), }); + + this.logger.debug("Received stream batch execution result", { result }); + + return result; } } diff --git a/src/activity/exe-unit/batch.ts b/src/activity/exe-unit/batch.ts index b39ea8c8c..c7f57dca6 100644 --- a/src/activity/exe-unit/batch.ts +++ b/src/activity/exe-unit/batch.ts @@ -109,7 +109,7 @@ export class Batch { this.executor.activity.agreement.provider, error, ); - this.logger.debug("Error in batch script execution"); + this.logger.debug("Error in batch script execution", { error }); this.script .after(allResults) .then(() => reject(golemError)) diff --git a/src/activity/exe-unit/process.ts b/src/activity/exe-unit/process.ts index 44c69d0a0..a7ef22b3d 100644 --- a/src/activity/exe-unit/process.ts +++ b/src/activity/exe-unit/process.ts @@ -95,4 +95,11 @@ export class RemoteProcess { this.subscription.add(() => end()); }); } + + /** + * Checks if the exe-script batch from Yagna has completed, reflecting all work and streaming to be completed + */ + isFinished() { + return this.lastResult?.isBatchFinished ?? false; + } } diff --git a/src/experimental/job/job.ts b/src/experimental/job/job.ts index da208eef5..dad128af5 100644 --- a/src/experimental/job/job.ts +++ b/src/experimental/job/job.ts @@ -134,24 +134,22 @@ export class Job { throw new GolemAbortError("Canceled"); } - const rental = await this.glm.oneOf({ order: this.order }); - - const exeUnit = await rental.getExeUnit(); - this.events.emit("started"); - - const onAbort = async () => { + const rental = await this.glm.oneOf({ order: this.order, signalOrTimeout: signal }); + try { + const exeUnit = await rental.getExeUnit(signal); + this.events.emit("started"); + + if (signal.aborted) { + this.events.emit("canceled"); + throw new GolemAbortError("Canceled"); + } + + signal.addEventListener("abort", () => this.events.emit("canceled"), { once: true }); + // remember to `await` here so that the `finally` block is executed AFTER the work is done + return await workOnGolem(exeUnit); + } finally { await rental.stopAndFinalize(); - this.events.emit("canceled"); - }; - - if (signal.aborted) { - await onAbort(); - throw new GolemAbortError("Canceled"); } - - signal.addEventListener("abort", onAbort, { once: true }); - - return workOnGolem(exeUnit); } /** diff --git a/src/experimental/job/job_manager.ts b/src/experimental/job/job_manager.ts index ea44fc62a..8a1d8fc8d 100644 --- a/src/experimental/job/job_manager.ts +++ b/src/experimental/job/job_manager.ts @@ -41,6 +41,7 @@ export class JobManager { url: this.config?.yagna?.basePath, }, dataTransferProtocol: storageProvider, + logger: this.logger, }); } diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 1e40d30f8..6c00f5908 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -646,10 +646,12 @@ export class GolemNetwork { if (typeof this.options.dataTransferProtocol === "string") { switch (this.options.dataTransferProtocol) { case "ws": - return new WebSocketBrowserStorageProvider(this.yagna, {}); + return new WebSocketBrowserStorageProvider(this.yagna, { + logger: this.logger, + }); case "gftp": default: - return new GftpStorageProvider(); + return new GftpStorageProvider(this.logger); } } else if (this.options.dataTransferProtocol !== undefined) { return this.options.dataTransferProtocol; diff --git a/src/market/draft-offer-proposal-pool.test.ts b/src/market/draft-offer-proposal-pool.test.ts index 52158e1a7..6d60b7208 100644 --- a/src/market/draft-offer-proposal-pool.test.ts +++ b/src/market/draft-offer-proposal-pool.test.ts @@ -1,7 +1,6 @@ import { DraftOfferProposalPool } from "./draft-offer-proposal-pool"; import { instance, mock, when } from "@johanblumenberg/ts-mockito"; import { OfferProposal } from "./index"; -import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; describe("Draft Offer Proposal Pool", () => { // GIVEN @@ -111,15 +110,13 @@ describe("Draft Offer Proposal Pool", () => { describe("Negative cases", () => { it("should abort the acquiring proposal by timeout", async () => { const pool = new DraftOfferProposalPool(); - await expect(pool.acquire(1)).rejects.toThrow(new GolemTimeoutError("Could not provide any proposal in time")); + await expect(pool.acquire(1)).rejects.toThrow("The operation was aborted due to timeout"); }); it("should abort the acquiring proposal by signal", async () => { const pool = new DraftOfferProposalPool(); const ac = new AbortController(); ac.abort(); - await expect(pool.acquire(ac.signal)).rejects.toThrow( - new GolemAbortError("The acquiring of proposals has been aborted"), - ); + await expect(pool.acquire(ac.signal)).rejects.toThrow("This operation was aborted"); }); }); }); diff --git a/src/market/draft-offer-proposal-pool.ts b/src/market/draft-offer-proposal-pool.ts index be9aae010..7b1af075f 100644 --- a/src/market/draft-offer-proposal-pool.ts +++ b/src/market/draft-offer-proposal-pool.ts @@ -1,10 +1,9 @@ import { OfferProposal, OfferProposalFilter } from "./proposal"; -import AsyncLock from "async-lock"; import { EventEmitter } from "eventemitter3"; import { GolemMarketError, MarketErrorCode } from "./error"; -import { createAbortSignalFromTimeout, defaultLogger, Logger, sleep } from "../shared/utils"; +import { createAbortSignalFromTimeout, defaultLogger, Logger, runOnNextEventLoopIteration } from "../shared/utils"; import { Observable, Subscription } from "rxjs"; -import { GolemAbortError, GolemTimeoutError } from "../shared/error/golem-error"; +import { AcquireQueue } from "../shared/utils/acquireQueue"; export type OfferProposalSelector = (proposals: OfferProposal[]) => OfferProposal; @@ -54,7 +53,7 @@ export class DraftOfferProposalPool { public readonly events = new EventEmitter(); private logger: Logger; - private readonly lock: AsyncLock = new AsyncLock(); + private acquireQueue = new AcquireQueue(); /** {@link ProposalPoolOptions.minCount} */ private readonly minCount: number = 0; @@ -98,6 +97,11 @@ export class DraftOfferProposalPool { this.logger.error("Cannot add a non-draft proposal to the pool", { proposalId: proposal.id }); throw new GolemMarketError("Cannot add a non-draft proposal to the pool", MarketErrorCode.InvalidProposal); } + // if someone is waiting for a proposal, give it to them + if (this.acquireQueue.hasAcquirers()) { + this.acquireQueue.put(proposal); + return; + } this.available.add(proposal); @@ -108,42 +112,41 @@ export class DraftOfferProposalPool { * Attempts to obtain a single proposal from the pool * @param signalOrTimeout - the timeout in milliseconds or an AbortSignal that will be used to cancel the acquiring */ - public acquire(signalOrTimeout?: number | AbortSignal): Promise { + public async acquire(signalOrTimeout?: number | AbortSignal): Promise { const signal = createAbortSignalFromTimeout(signalOrTimeout); - return this.lock.acquire("proposal-pool", async () => { - let proposal: OfferProposal | null = null; - - while (proposal === null) { - if (signal.aborted) { - throw signal.reason.name === "TimeoutError" - ? new GolemTimeoutError("Could not provide any proposal in time") - : new GolemAbortError("The acquiring of proposals has been aborted", signal.reason); - } - // Try to get one - proposal = this.available.size > 0 ? this.selectOfferProposal([...this.available]) : null; - - if (proposal) { - // Validate - if (!this.validateOfferProposal(proposal)) { - // Drop if not valid - this.removeFromAvailable(proposal); - // Keep searching - proposal = null; - } - } - // if not found or not valid wait a while for next try - if (!proposal) { - await sleep(1); - } + + signal.throwIfAborted(); + + // iterate over available proposals until we find a valid one + const tryGettingFromAvailable = async (): Promise => { + signal.throwIfAborted(); + + const proposal = this.available.size > 0 ? this.selectOfferProposal([...this.available]) : null; + if (!proposal) { + // No proposal was selected, either `available` is empty or the user's proposal filter didn't select anything + // no point retrying + return; } + if (!this.validateOfferProposal(proposal)) { + // Drop if not valid + this.removeFromAvailable(proposal); + // and try again + return runOnNextEventLoopIteration(tryGettingFromAvailable); + } + // valid proposal found + return proposal; + }; + const proposal = await tryGettingFromAvailable(); + // Try to get one + if (proposal) { this.available.delete(proposal); this.leased.add(proposal); - this.events.emit("acquired", { proposal }); - return proposal; - }); + } + // if no valid proposal was found, wait for one to appear + return this.acquireQueue.get(signal); } /** @@ -152,31 +155,33 @@ export class DraftOfferProposalPool { * Validates if the proposal is still usable before putting it back to the list of available ones * @param proposal */ - public release(proposal: OfferProposal): Promise { - return this.lock.acquire("proposal-pool", () => { - this.leased.delete(proposal); - - if (this.validateOfferProposal(proposal)) { - this.available.add(proposal); - this.events.emit("released", { proposal }); - } else { - this.events.emit("removed", { proposal }); + public release(proposal: OfferProposal): void { + this.leased.delete(proposal); + + if (this.validateOfferProposal(proposal)) { + this.events.emit("released", { proposal }); + // if someone is waiting for a proposal, give it to them + if (this.acquireQueue.hasAcquirers()) { + this.acquireQueue.put(proposal); + return; } - }); + // otherwise, put it back to the list of available proposals + this.available.add(proposal); + } else { + this.events.emit("removed", { proposal }); + } } - public remove(proposal: OfferProposal): Promise { - return this.lock.acquire("proposal-pool", () => { - if (this.leased.has(proposal)) { - this.leased.delete(proposal); - this.events.emit("removed", { proposal }); - } + public remove(proposal: OfferProposal): void { + if (this.leased.has(proposal)) { + this.leased.delete(proposal); + this.events.emit("removed", { proposal }); + } - if (this.available.has(proposal)) { - this.available.delete(proposal); - this.events.emit("removed", { proposal }); - } - }); + if (this.available.has(proposal)) { + this.available.delete(proposal); + this.events.emit("removed", { proposal }); + } } /** @@ -211,21 +216,20 @@ export class DraftOfferProposalPool { * Clears the pool entirely */ public async clear() { - return this.lock.acquire("proposal-pool", () => { - for (const proposal of this.available) { - this.available.delete(proposal); - this.events.emit("removed", { proposal }); - } + this.acquireQueue.releaseAll(); + for (const proposal of this.available) { + this.available.delete(proposal); + this.events.emit("removed", { proposal }); + } - for (const proposal of this.leased) { - this.leased.delete(proposal); - this.events.emit("removed", { proposal }); - } + for (const proposal of this.leased) { + this.leased.delete(proposal); + this.events.emit("removed", { proposal }); + } - this.available = new Set(); - this.leased = new Set(); - this.events.emit("cleared"); - }); + this.available = new Set(); + this.leased = new Set(); + this.events.emit("cleared"); } protected removeFromAvailable(proposal: OfferProposal): void { diff --git a/src/market/market.module.test.ts b/src/market/market.module.test.ts index 35cb21bc2..c91634fa0 100644 --- a/src/market/market.module.test.ts +++ b/src/market/market.module.test.ts @@ -439,9 +439,9 @@ describe("Market module", () => { const badProposal0 = {} as OfferProposal; const badProposal1 = {} as OfferProposal; const goodProposal = {} as OfferProposal; + const mockPool = mock(DraftOfferProposalPool); when(mockPool.acquire(_)).thenResolve(badProposal0).thenResolve(badProposal1).thenResolve(goodProposal); - when(mockPool.remove(_)).thenResolve(); const goodAgreement = {} as Agreement; const marketSpy = spy(marketModule); when(marketSpy.proposeAgreement(goodProposal, _)).thenResolve(goodAgreement); @@ -490,7 +490,6 @@ describe("Market module", () => { it("should abort after a set timeout", async () => { const mockPool = mock(DraftOfferProposalPool); when(mockPool.acquire()).thenResolve({} as OfferProposal); - when(mockPool.remove(_)).thenResolve(); const marketSpy = spy(marketModule); when(marketSpy.proposeAgreement(_)).thenReject(new Error("Failed to sign proposal")); @@ -499,15 +498,11 @@ describe("Market module", () => { ); }); it("respects the timeout on draft proposal pool acquire and forwards the error", async () => { - const mockAcquire: DraftOfferProposalPool["acquire"] = jest - .fn() - .mockImplementation( - () => new Promise((_, reject) => setTimeout(() => reject(new Error("Failed to acquire")), 10)), - ); - const mockPool = { - acquire: mockAcquire, - } as DraftOfferProposalPool; - expect(marketModule.signAgreementFromPool(mockPool)).rejects.toThrow("Failed to acquire"); + const mockPool = mock(DraftOfferProposalPool); + when(mockPool.acquire(_)).thenCall( + () => new Promise((_, reject) => setTimeout(() => reject(new Error("Failed to acquire")), 10)), + ); + expect(marketModule.signAgreementFromPool(instance(mockPool))).rejects.toThrow("Failed to acquire"); }); }); diff --git a/src/market/market.module.ts b/src/market/market.module.ts index 87eed3708..3ed0d2277 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -530,15 +530,22 @@ export class MarketModuleImpl implements MarketModule { agreementOptions?: AgreementOptions, signalOrTimeout?: number | AbortSignal, ): Promise { - this.logger.info("Trying to sign an agreement ..."); + this.logger.info("Trying to sign an agreement..."); const signal = createAbortSignalFromTimeout(signalOrTimeout); const getProposal = async () => { try { signal.throwIfAborted(); + this.logger.debug("Acquiring proposal from draft proposal pool", { + draftPoolCounters: { + total: draftProposalPool.count(), + available: draftProposalPool.availableCount(), + }, + }); const proposal = await draftProposalPool.acquire(signal); + this.logger.debug("Acquired proposal from the pool", { proposal }); if (signal.aborted) { - await draftProposalPool.release(proposal); + draftProposalPool.release(proposal); signal.throwIfAborted(); } return proposal; @@ -557,18 +564,12 @@ export class MarketModuleImpl implements MarketModule { try { const agreement = await this.proposeAgreement(proposal, agreementOptions); // agreement is valid, proposal can be destroyed - await draftProposalPool.remove(proposal).catch((error) => { - this.logger.warn("Signed the agreement but failed to remove the proposal from the pool", { error }); - }); + draftProposalPool.remove(proposal); return agreement; } catch (error) { this.logger.debug("Failed to propose agreement, retrying", { error }); // We failed to propose the agreement, destroy the proposal and try again with another one - await draftProposalPool.remove(proposal).catch((error) => { - this.logger.warn("Failed to remove the proposal from the pool after unsuccessful agreement proposal", { - error, - }); - }); + draftProposalPool.remove(proposal); return runOnNextEventLoopIteration(tryProposing); } }; @@ -603,7 +604,6 @@ export class MarketModuleImpl implements MarketModule { if (isCancelled) { return; } - this.logger.debug("Waiting for reduced proposals..."); try { await proposalsBatch.waitForProposals(); const proposals = await proposalsBatch.getProposals(); diff --git a/src/resource-rental/resource-rental-pool.test.ts b/src/resource-rental/resource-rental-pool.test.ts index f400324e3..479d083b9 100644 --- a/src/resource-rental/resource-rental-pool.test.ts +++ b/src/resource-rental/resource-rental-pool.test.ts @@ -164,12 +164,12 @@ describe("ResourceRentalPool", () => { const acquiredRentalPromise = pool.acquire(); // go to the next tick await Promise.resolve(); - expect(pool["acquireQueue"].length).toBe(1); + expect(pool["acquireQueue"].size()).toBe(1); pool.release(acquiredRental1); await acquiredRentalPromise; expect(pool.getAvailableSize()).toBe(0); expect(pool.getBorrowedSize()).toBe(3); - expect(pool["acquireQueue"].length).toBe(0); + expect(pool["acquireQueue"].size()).toBe(0); }); it("validates the resource rental before returning it", async () => { const pool = getRentalPool({ min: 3 }); @@ -214,7 +214,7 @@ describe("ResourceRentalPool", () => { expect(pool.getSize()).toBe(3); expect(pool.getBorrowedSize()).toBe(3); expect(pool.getAvailableSize()).toBe(0); - expect(pool["acquireQueue"].length).toBe(4); + expect(pool["acquireQueue"].size()).toBe(4); }); }); describe("release()", () => { @@ -393,8 +393,12 @@ describe("ResourceRentalPool", () => { throw new Error("Acquire resolved even though it should have been rejected"); }) .catch((error) => { - expect(error).toBe("The pool is in draining mode"); + expect(error).toEqual("The pool is in draining mode"); }); + + // flush the promise queue to make sure the acquire promise is created + await new Promise(setImmediate); + await pool.drainAndClear(); await acquirePromise; expect(pool.getSize()).toBe(0); diff --git a/src/resource-rental/resource-rental-pool.ts b/src/resource-rental/resource-rental-pool.ts index 87ca6c4b9..d9f619257 100644 --- a/src/resource-rental/resource-rental-pool.ts +++ b/src/resource-rental/resource-rental-pool.ts @@ -11,6 +11,7 @@ import { RentalModule } from "./rental.module"; import { AgreementOptions } from "../market/agreement/agreement"; import { GolemAbortError } from "../shared/error/golem-error"; import AsyncLock from "async-lock"; +import { AcquireQueue } from "../shared/utils/acquireQueue"; export interface ResourceRentalPoolDependencies { allocation: Allocation; @@ -70,7 +71,7 @@ export class ResourceRentalPool { /** * Queue of functions that are waiting for a lease process to be available */ - private acquireQueue: Array<(rental: ResourceRental) => void> = []; + private acquireQueue = new AcquireQueue(); private logger: Logger; private drainPromise?: Promise; @@ -142,6 +143,10 @@ export class ResourceRentalPool { this.events.emit("created", { agreement }); return resourceRental; } catch (error) { + if (signal.aborted) { + this.logger.debug("Creating resource rental was aborted", error); + throw error; + } this.events.emit("errorCreatingRental", { error: new GolemMarketError( "Creating resource rental failed", @@ -196,16 +201,41 @@ export class ResourceRentalPool { return resourceRental; } - private async enqueueAcquire(): Promise { - return new Promise((resolve) => { - this.acquireQueue.push((resourceRental) => { - this.borrowed.add(resourceRental); - this.events.emit("acquired", { - agreement: resourceRental.agreement, - }); - resolve(resourceRental); - }); + private async enqueueAcquire(signalOrTimeout?: number | AbortSignal): Promise { + const rental = await this.acquireQueue.get(signalOrTimeout); + this.borrowed.add(rental); + this.events.emit("acquired", { + agreement: rental.agreement, }); + return rental; + } + + /** + * Sign a new resource rental or wait for one to become available in the pool, + * whichever comes first. + */ + private async raceNewRentalWithAcquireQueue(signalOrTimeout?: number | AbortSignal) { + const ac = new AbortController(); + const signal = anyAbortSignal( + ac.signal, + createAbortSignalFromTimeout(signalOrTimeout), + this.abortController.signal, + ); + return Promise.any([ + this.createNewResourceRental(signal), + this.acquireQueue.get(signal).then((rental) => { + this.logger.info("A rental became available in the pool, using it instead of creating a new one"); + return rental; + }), + ]) + .catch((err: AggregateError) => { + // if all promises fail (i.e. the signal is aborted by the user) then + // rethrow the error produced by `createNewResourceRental` because it's more relevant + throw err.errors[0]; + }) + .finally(() => { + ac.abort(); + }); } /** @@ -222,9 +252,9 @@ export class ResourceRentalPool { if (!resourceRental) { if (!this.canCreateMoreResourceRentals()) { - return this.enqueueAcquire(); + return this.enqueueAcquire(signalOrTimeout); } - resourceRental = await this.createNewResourceRental(signalOrTimeout); + resourceRental = await this.raceNewRentalWithAcquireQueue(signalOrTimeout); } this.borrowed.add(resourceRental); @@ -240,9 +270,8 @@ export class ResourceRentalPool { * Otherwise, the resource rental will be added to the queue. */ private passResourceRentalToWaitingAcquireOrBackToPool(resourceRental: ResourceRental) { - if (this.acquireQueue.length > 0) { - const acquire = this.acquireQueue.shift()!; - acquire(resourceRental); + if (this.acquireQueue.hasAcquirers()) { + this.acquireQueue.put(resourceRental); return; } if (resourceRental.hasActivity()) { @@ -299,7 +328,7 @@ export class ResourceRentalPool { await this.asyncLock.acquire("resource-rental-pool", async () => { this.abortController.abort("The pool is in draining mode"); this.events.emit("draining"); - this.acquireQueue = []; + this.acquireQueue.releaseAll(); const allResourceRentals = Array.from(this.borrowed) .concat(Array.from(this.lowPriority)) .concat(Array.from(this.highPriority)); diff --git a/src/shared/utils/acquireQueue.ts b/src/shared/utils/acquireQueue.ts new file mode 100644 index 000000000..430f228b7 --- /dev/null +++ b/src/shared/utils/acquireQueue.ts @@ -0,0 +1,78 @@ +import { GolemInternalError } from "../error/golem-error"; +import { anyAbortSignal, createAbortSignalFromTimeout } from "./abortSignal"; + +/** + * `Promise.withResolvers` is only available in Node 22.0.0 and later. + */ +function withResolvers() { + let resolve!: (value: T | PromiseLike) => void; + let reject!: (reason: unknown) => void; + const promise = new Promise((_resolve, _reject) => { + resolve = _resolve; + reject = _reject; + }); + return { resolve, reject, promise }; +} + +type Acquire = (item: T) => void; + +/** + * A queue of acquirers waiting for an item. + * use `get` to queue up for the next available item. + * use `put` to give the item to the next acquirer. + */ +export class AcquireQueue { + private queue: Acquire[] = []; + private abortController = new AbortController(); + + /** + * Release (reject) all acquirers. + * Essentially this is a way to reset the queue. + */ + public releaseAll() { + this.abortController.abort(); + this.queue = []; + this.abortController = new AbortController(); + } + + /** + * Queue up for the next available item. + */ + public async get(signalOrTimeout?: number | AbortSignal): Promise { + const signal = anyAbortSignal(createAbortSignalFromTimeout(signalOrTimeout), this.abortController.signal); + signal.throwIfAborted(); + const { resolve, promise } = withResolvers(); + this.queue.push(resolve); + + const abortPromise = new Promise((_, reject) => { + signal.addEventListener("abort", () => { + this.queue = this.queue.filter((r) => r !== resolve); + reject(signal.reason); + }); + }); + return Promise.race([promise, abortPromise]); + } + + /** + * Are there any acquirers waiting for an item? + */ + public hasAcquirers() { + return this.queue.length > 0; + } + + /** + * Give the item to the next acquirer. + * If there are no acquirers, throw an error. You should check `hasAcquirers` before calling this method. + */ + public put(item: T) { + if (!this.hasAcquirers()) { + throw new GolemInternalError("No acquirers waiting for the item"); + } + const resolve = this.queue.shift()!; + resolve(item); + } + + public size() { + return this.queue.length; + } +} diff --git a/src/shared/yagna/yagnaApi.ts b/src/shared/yagna/yagnaApi.ts index 2a3cc1c99..01058adc3 100644 --- a/src/shared/yagna/yagnaApi.ts +++ b/src/shared/yagna/yagnaApi.ts @@ -135,6 +135,7 @@ export class YagnaApi { eventSource.addEventListener("runtime", (event) => observer.next(JSON.parse(event.data))); eventSource.addEventListener("error", (error) => observer.error(error)); + return () => eventSource.close(); }); }, @@ -155,7 +156,7 @@ export class YagnaApi { this.gsb = gsbClient.requestor; - this.logger = options?.logger ?? defaultLogger("yagna"); + this.logger = options?.logger ? options.logger.child("yagna") : defaultLogger("yagna"); const identityClient = new YaTsClient.IdentityApi.Client({ BASE: this.basePath, diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml index 314b31d53..a41a888d5 100644 --- a/tests/docker/docker-compose.yml +++ b/tests/docker/docker-compose.yml @@ -20,8 +20,11 @@ services: retries: 1 start_period: 40s environment: + - YA_NET_BROADCAST_SIZE=10 - NODE_NAME=provider-1 - SUBNET=${YAGNA_SUBNET:-golemjstest} + - YA_NET_BIND_URL=udp://0.0.0.0:0 + - YA_NET_RELAY_HOST=63.34.24.27:7477 provider-2: build: context: . @@ -42,8 +45,11 @@ services: retries: 1 start_period: 40s environment: + - YA_NET_BROADCAST_SIZE=10 - NODE_NAME=provider-2 - SUBNET=${YAGNA_SUBNET:-golemjstest} + - YA_NET_BIND_URL=udp://0.0.0.0:0 + - YA_NET_RELAY_HOST=63.34.24.27:7477 requestor: build: context: . @@ -55,13 +61,15 @@ services: - /root/.local/share/yagna/ - ../../:/golem-js environment: + - YA_NET_BROADCAST_SIZE=10 - YAGNA_AUTOCONF_APPKEY=try_golem - YAGNA_API_URL=http://0.0.0.0:7465 - GSB_URL=tcp://0.0.0.0:7464 - YAGNA_SUBNET=${YAGNA_SUBNET:-golemjstest} - YAGNA_APPKEY=try_golem - PAYMENT_NETWORK=${PAYMENT_NETWORK} - + - YA_NET_BIND_URL=udp://0.0.0.0:0 + - YA_NET_RELAY_HOST=63.34.24.27:7477 healthcheck: test: ["CMD-SHELL", "curl -s -o /dev/null -w '%{http_code}' http://localhost:7465 | grep -q 401"] interval: 10s diff --git a/tests/e2e/resourceRentalPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts index a1e06d180..a07a971c0 100644 --- a/tests/e2e/resourceRentalPool.spec.ts +++ b/tests/e2e/resourceRentalPool.spec.ts @@ -1,5 +1,5 @@ import { Subscription } from "rxjs"; -import { Allocation, DraftOfferProposalPool, GolemAbortError, GolemNetwork } from "../../src"; +import { Allocation, DraftOfferProposalPool, GolemAbortError, GolemNetwork, GolemTimeoutError } from "../../src"; describe("ResourceRentalPool", () => { const glm = new GolemNetwork(); @@ -42,10 +42,8 @@ describe("ResourceRentalPool", () => { const draftProposal$ = glm.market.collectDraftOfferProposals({ demandSpecification, pricing: { - model: "linear", - maxStartPrice: 0.5, - maxCpuPerHourPrice: 1.0, - maxEnvPerHourPrice: 0.5, + model: "burn-rate", + avgGlmPerHour: 1, }, }); @@ -213,7 +211,7 @@ describe("ResourceRentalPool", () => { it("should abort getting the newly created exe-unit by timeout", async () => { const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { poolSize: 1 }); const rental = await pool.acquire(); - // wait for init and destroy the exe-unit created automatically on startup renatl + // wait for init and destroy the exe-unit created automatically on startup rental await rental.getExeUnit(); await rental.destroyExeUnit(); await expect(rental.getExeUnit(10)).rejects.toThrow( @@ -225,7 +223,7 @@ describe("ResourceRentalPool", () => { const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { poolSize: 1 }); const abortController = new AbortController(); const rental = await pool.acquire(); - // wait for init and destroy the exe-unit created automatically on startup renatl + // wait for init and destroy the exe-unit created automatically on startup rental await rental.getExeUnit(); await rental.destroyExeUnit(); abortController.abort(); @@ -261,4 +259,27 @@ describe("ResourceRentalPool", () => { await expect(acquirePromise).rejects.toThrow("The signing of the agreement has been aborted"); expect(pool.getSize()).toEqual(0); }); + it("should do all tasks on the same provider if that's the only one available", async () => { + // simulate a situation where only one provider is available + const offer = await proposalPool.acquire(); + const newPool = new DraftOfferProposalPool(); + newPool.add(offer); + + const pool = glm.rental.createResourceRentalPool(newPool, allocation, { poolSize: 3 }); + expect.assertions(3); + await Promise.all([ + pool.withRental(async (rental) => { + const exe = await rental.getExeUnit(); + expect(exe.provider.id).toEqual(offer.provider.id); + }), + pool.withRental(async (rental) => { + const exe = await rental.getExeUnit(); + expect(exe.provider.id).toEqual(offer.provider.id); + }), + pool.withRental(async (rental) => { + const exe = await rental.getExeUnit(); + expect(exe.provider.id).toEqual(offer.provider.id); + }), + ]); + }); });