From f3de834bb9758d384ec78bfb131c61e1f005857b Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 12:18:55 +0200 Subject: [PATCH 01/10] feat: rename lease module/process/pool to resource rental --- src/experimental/deployment/builder.ts | 2 +- src/experimental/deployment/deployment.ts | 14 +- src/golem-network/golem-network.ts | 34 +- src/index.ts | 2 +- src/lease-process/index.ts | 3 - src/lease-process/lease-process-pool.test.ts | 366 ----------------- src/lease-process/lease-process-pool.ts | 368 ----------------- src/market/error.ts | 4 +- src/market/market.module.ts | 2 +- src/resource-rental/index.ts | 3 + .../rental.module.ts} | 34 +- .../resource-rental-pool.test.ts | 366 +++++++++++++++++ src/resource-rental/resource-rental-pool.ts | 376 ++++++++++++++++++ .../resource-rental.ts} | 25 +- .../yagna/adapters/market-api-adapter.ts | 6 +- tests/e2e/leaseProcessPool.spec.ts | 76 ++-- 16 files changed, 845 insertions(+), 836 deletions(-) delete mode 100644 src/lease-process/index.ts delete mode 100644 src/lease-process/lease-process-pool.test.ts delete mode 100644 src/lease-process/lease-process-pool.ts create mode 100644 src/resource-rental/index.ts rename src/{lease-process/lease.module.ts => resource-rental/rental.module.ts} (63%) create mode 100644 src/resource-rental/resource-rental-pool.test.ts create mode 100644 src/resource-rental/resource-rental-pool.ts rename src/{lease-process/lease-process.ts => resource-rental/resource-rental.ts} (80%) diff --git a/src/experimental/deployment/builder.ts b/src/experimental/deployment/builder.ts index 564b23500..11415bc0f 100644 --- a/src/experimental/deployment/builder.ts +++ b/src/experimental/deployment/builder.ts @@ -57,7 +57,7 @@ export class GolemDeploymentBuilder { market: this.glm.market, activity: this.glm.activity, network: this.glm.network, - lease: this.glm.lease, + lease: this.glm.rental, }); this.reset(); diff --git a/src/experimental/deployment/deployment.ts b/src/experimental/deployment/deployment.ts index cef2e19d0..c585c1694 100644 --- a/src/experimental/deployment/deployment.ts +++ b/src/experimental/deployment/deployment.ts @@ -8,7 +8,7 @@ import { DraftOfferProposalPool, MarketModule } from "../../market"; import { PaymentModule } from "../../payment"; import { CreateLeaseProcessPoolOptions } from "./builder"; import { Subscription } from "rxjs"; -import { LeaseModule, LeaseProcessPool } from "../../lease-process"; +import { RentalModule, ResourceRentalPool } from "../../resource-rental"; export enum DeploymentState { INITIAL = "INITIAL", @@ -65,7 +65,7 @@ export class Deployment { { proposalPool: DraftOfferProposalPool; proposalSubscription: Subscription; - leaseProcessPool: LeaseProcessPool; + leaseProcessPool: ResourceRentalPool; } >(); @@ -76,7 +76,7 @@ export class Deployment { activity: ActivityModule; payment: PaymentModule; network: NetworkModule; - lease: LeaseModule; + lease: RentalModule; }; constructor( @@ -88,7 +88,7 @@ export class Deployment { activity: ActivityModule; payment: PaymentModule; network: NetworkModule; - lease: LeaseModule; + lease: RentalModule; }, ) { validateDeployment(components); @@ -165,10 +165,10 @@ export class Deployment { const proposalSubscription = proposalPool.readFrom(draftProposal$); - const leaseProcessPool = this.modules.lease.createLeaseProcessPool(proposalPool, allocation, { + const leaseProcessPool = this.modules.lease.createResourceRentalPool(proposalPool, allocation, { replicas: pool.options.deployment?.replicas, network, - leaseProcessOptions: { + resourceRentalOptions: { activity: pool.options?.activity, payment: pool.options?.payment, }, @@ -219,7 +219,7 @@ export class Deployment { this.events.emit("end"); } - getLeaseProcessPool(name: string): LeaseProcessPool { + getLeaseProcessPool(name: string): ResourceRentalPool { const pool = this.pools.get(name); if (!pool) { throw new GolemUserError(`LeaseProcessPool ${name} not found`); diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 66f01b2e2..5299536b9 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -14,12 +14,12 @@ import { INetworkApi, Network, NetworkModule, NetworkModuleImpl, NetworkOptions import { EventEmitter } from "eventemitter3"; import { Concurrency, - LeaseModule, - LeaseModuleImpl, - LeaseProcess, - LeaseProcessOptions, - LeaseProcessPool, -} from "../lease-process"; + RentalModule, + RentalModuleImpl, + ResourceRental, + ResourceRentalOptions, + ResourceRentalPool, +} from "../resource-rental"; import { DebitNoteRepository, InvoiceRepository, MarketApiAdapter, PaymentApiAdapter } from "../shared/yagna"; import { ActivityApiAdapter } from "../shared/yagna/adapters/activity-api-adapter"; import { ActivityRepository } from "../shared/yagna/repository/activity-repository"; @@ -106,7 +106,7 @@ export interface GolemNetworkOptions { payment: InstanceOrFactory; activity: InstanceOrFactory; network: InstanceOrFactory; - lease: InstanceOrFactory; + lease: InstanceOrFactory; } >; } @@ -125,8 +125,8 @@ type AllocationOptions = { export interface MarketOrderSpec { demand: BuildDemandOptions; market: MarketOptions; - activity?: LeaseProcessOptions["activity"]; - payment?: LeaseProcessOptions["payment"] & AllocationOptions; + activity?: ResourceRentalOptions["activity"]; + payment?: ResourceRentalOptions["payment"] & AllocationOptions; network?: Network; } @@ -182,7 +182,7 @@ export class GolemNetwork { public readonly payment: PaymentModule; public readonly activity: ActivityModule; public readonly network: NetworkModule; - public readonly lease: LeaseModule; + public readonly rental: RentalModule; /** * Dependency Container @@ -270,8 +270,8 @@ export class GolemNetwork { }); this.payment = getFactory(PaymentModuleImpl, this.options.override?.payment)(this.services, this.options.payment); this.activity = getFactory(ActivityModuleImpl, this.options.override?.activity)(this.services); - this.lease = getFactory( - LeaseModuleImpl, + this.rental = getFactory( + RentalModuleImpl, this.options.override?.lease, )({ activityModule: this.activity, @@ -359,7 +359,7 @@ export class GolemNetwork { * * @param order */ - async oneOf(order: MarketOrderSpec): Promise { + async oneOf(order: MarketOrderSpec): Promise { const proposalPool = new DraftOfferProposalPool({ logger: this.logger, validateProposal: order.market.proposalFilter, @@ -385,7 +385,7 @@ export class GolemNetwork { ? await this.network.createNetworkNode(order.network, agreement.provider.id) : undefined; - const lease = this.lease.createLease(agreement, allocation, { + const lease = this.rental.createResourceRental(agreement, allocation, { payment: order.payment, activity: order.activity, networkNode, @@ -450,7 +450,7 @@ export class GolemNetwork { * * @param options Demand specification and concurrency level */ - public async manyOf({ concurrency, order }: ManyOfOptions): Promise { + public async manyOf({ concurrency, order }: ManyOfOptions): Promise { const proposalPool = new DraftOfferProposalPool({ logger: this.logger, validateProposal: order.market.proposalFilter, @@ -467,10 +467,10 @@ export class GolemNetwork { }); const subscription = proposalPool.readFrom(draftProposal$); - const leaseProcessPool = this.lease.createLeaseProcessPool(proposalPool, allocation, { + const leaseProcessPool = this.rental.createResourceRentalPool(proposalPool, allocation, { replicas: concurrency, network: order.network, - leaseProcessOptions: { + resourceRentalOptions: { activity: order.activity, payment: order.payment, }, diff --git a/src/index.ts b/src/index.ts index 0a3408ae3..e4b53295b 100755 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,6 @@ // High-level entry points export * from "./golem-network"; -export * from "./lease-process"; +export * from "./resource-rental"; // Low level entry points for advanced users export * from "./market"; diff --git a/src/lease-process/index.ts b/src/lease-process/index.ts deleted file mode 100644 index 2e203ee96..000000000 --- a/src/lease-process/index.ts +++ /dev/null @@ -1,3 +0,0 @@ -export * from "./lease-process"; -export * from "./lease-process-pool"; -export * from "./lease.module"; diff --git a/src/lease-process/lease-process-pool.test.ts b/src/lease-process/lease-process-pool.test.ts deleted file mode 100644 index 6eff64ef8..000000000 --- a/src/lease-process/lease-process-pool.test.ts +++ /dev/null @@ -1,366 +0,0 @@ -import type { Agreement } from "../market/agreement/agreement"; -import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito"; -import { LeaseProcess } from "./lease-process"; -import { Allocation } from "../payment"; -import type { MarketModule } from "../market"; -import { DraftOfferProposalPool } from "../market"; -import { LeaseProcessPool } from "./lease-process-pool"; -import { type RequireAtLeastOne } from "../shared/utils/types"; -import { NetworkModule } from "../network"; -import { LeaseModule } from "./lease.module"; -import { Logger } from "../shared/utils"; - -const allocation = mock(Allocation); -const proposalPool = mock(DraftOfferProposalPool); -const marketModule = imock(); -const networkModule = imock(); -const leaseModule = imock(); - -function getMockLeaseProcess() { - return { - hasActivity: () => false, - fetchAgreementState: () => Promise.resolve("Approved"), - agreement: { id: "1" } as Agreement, - } as LeaseProcess; -} - -function getLeasePool(replicas: RequireAtLeastOne<{ min: number; max: number }>) { - return new LeaseProcessPool({ - allocation: instance(allocation), - proposalPool: instance(proposalPool), - marketModule: instance(marketModule), - networkModule: instance(networkModule), - leaseModule: instance(leaseModule), - logger: instance(imock()), - network: undefined, - replicas, - }); -} - -beforeEach(() => { - jest.useRealTimers(); - jest.clearAllMocks(); - reset(allocation); - reset(proposalPool); - reset(marketModule); - reset(networkModule); - reset(leaseModule); -}); - -describe("LeaseProcessPool", () => { - describe("ready()", () => { - it("prepares MIN_POOL_SIZE lease processes", async () => { - when(marketModule.signAgreementFromPool(_, _)).thenResolve({} as Agreement); - when(leaseModule.createLease(_, _, _)).thenCall(() => ({}) as LeaseProcess); - - const pool = getLeasePool({ min: 5, max: 10 }); - - await pool.ready(); - - expect(pool.getAvailableSize()).toBe(5); - verify(marketModule.signAgreementFromPool(_, _)).times(5); - }); - it("retries on error", async () => { - when(leaseModule.createLease(_, _, _)).thenCall(() => ({}) as LeaseProcess); - - const fakeAgreement = {} as Agreement; - when(marketModule.signAgreementFromPool(_, _)) - .thenResolve(fakeAgreement) - .thenReject(new Error("Failed to propose agreement")) - .thenResolve(fakeAgreement) - .thenReject(new Error("Failed to propose agreement")) - .thenResolve(fakeAgreement); - - const pool = getLeasePool({ min: 3 }); - - await pool.ready(); - - expect(pool.getAvailableSize()).toBe(3); - verify(marketModule.signAgreementFromPool(_, _)).times(5); - }); - it("stops retrying after abort signal is triggered", async () => { - const pool = getLeasePool({ min: 3 }); - pool["createNewLeaseProcess"] = jest - .fn( - () => - new Promise((_, reject) => - setTimeout(() => reject(new Error("Failed to propose agreement")), 50), - ), - ) - // the first call will succeed, the rest will fail (fall back to the first implementation) - .mockImplementationOnce(() => new Promise((resolve) => setTimeout(() => resolve(getMockLeaseProcess()), 50))); - - await expect(pool.ready(AbortSignal.timeout(60))).rejects.toThrow( - "Could not create enough lease processes to reach the minimum pool size in time", - ); - expect(pool.getAvailableSize()).toBe(1); - // first loop 3 times, then 2 times - expect(pool["createNewLeaseProcess"]).toHaveBeenCalledTimes(5); - }); - it("stops retrying after specified timeout is reached", async () => { - const pool = getLeasePool({ min: 3 }); - const poolSpy = spy(pool); - when(poolSpy["createNewLeaseProcess"]()) - .thenResolve(getMockLeaseProcess()) - .thenReject(new Error("Failed to propose agreement")); - - await expect(pool.ready(10)).rejects.toThrow( - "Could not create enough lease processes to reach the minimum pool size in time", - ); - expect(pool.getAvailableSize()).toBe(1); - verify(poolSpy["createNewLeaseProcess"]()).atLeast(3); - }); - }); - describe("acquire()", () => { - it("takes a random lease process from the pool if none have activities", async () => { - const pool = getLeasePool({ min: 3 }); - const lease1 = getMockLeaseProcess(); - const lease2 = getMockLeaseProcess(); - const lease3 = getMockLeaseProcess(); - pool["lowPriority"].add(lease1); - pool["lowPriority"].add(lease2); - pool["lowPriority"].add(lease3); - - const leaseProcess = await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(1); - expect(pool.getAvailableSize()).toBe(2); - expect([lease1, lease2, lease3]).toContain(leaseProcess); - }); - it("prioritizes lease processes from high priority pool", async () => { - const pool = getLeasePool({ min: 3 }); - const lease1 = getMockLeaseProcess(); - const lease2 = getMockLeaseProcess(); - const lease3 = getMockLeaseProcess(); - pool["lowPriority"].add(lease1); - pool["highPriority"].add(lease2); - pool["lowPriority"].add(lease3); - - const leaseProcess = await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(1); - expect(pool.getAvailableSize()).toBe(2); - expect(leaseProcess).toBe(lease2); - }); - it("creates a new lease process if none are available", async () => { - const pool = getLeasePool({ min: 3 }); - pool["createNewLeaseProcess"] = jest.fn(() => Promise.resolve(getMockLeaseProcess())); - - expect(pool.getSize()).toBe(0); - await pool.acquire(); - expect(pool.getSize()).toBe(1); - expect(pool.getBorrowedSize()).toBe(1); - expect(pool.getAvailableSize()).toBe(0); - }); - it("waits for a lease to become available when the pool is full", async () => { - const pool = getLeasePool({ min: 3, max: 3 }); - const lease1 = getMockLeaseProcess(); - const lease2 = getMockLeaseProcess(); - const lease3 = getMockLeaseProcess(); - pool["lowPriority"].add(lease1); - pool["lowPriority"].add(lease2); - pool["lowPriority"].add(lease3); - - const acquiredLease1 = await pool.acquire(); - await pool.acquire(); - await pool.acquire(); - - expect(pool.getAvailableSize()).toBe(0); - expect(pool.getBorrowedSize()).toBe(3); - const acquiredLeasePromise = pool.acquire(); - // go to the next tick - await Promise.resolve(); - expect(pool["acquireQueue"].length).toBe(1); - pool.release(acquiredLease1); - await acquiredLeasePromise; - expect(pool.getAvailableSize()).toBe(0); - expect(pool.getBorrowedSize()).toBe(3); - expect(pool["acquireQueue"].length).toBe(0); - }); - it("validates the lease process before returning it", async () => { - const pool = getLeasePool({ min: 3 }); - const newlyCreatedLease = getMockLeaseProcess(); - jest.spyOn(pool, "destroy"); - pool["createNewLeaseProcess"] = jest.fn(() => Promise.resolve(newlyCreatedLease)); - - const lease1 = getMockLeaseProcess(); - lease1.fetchAgreementState = jest.fn().mockResolvedValue("Expired"); - const lease2 = getMockLeaseProcess(); - lease2.fetchAgreementState = jest.fn().mockResolvedValue("Expired"); - pool["lowPriority"].add(lease1); - pool["lowPriority"].add(lease2); - - expect(pool.getBorrowedSize()).toBe(0); - expect(pool.getAvailableSize()).toBe(2); - const leaseProcess = await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(1); - expect(pool.getAvailableSize()).toBe(0); - expect(leaseProcess).toBe(newlyCreatedLease); - expect(pool["destroy"]).toHaveBeenCalledWith(lease1); - expect(pool["destroy"]).toHaveBeenCalledWith(lease2); - }); - it("should not create more processes than allowed", async () => { - jest.useFakeTimers(); - const pool = getLeasePool({ min: 3, max: 3 }); - pool["createNewLeaseProcess"] = jest.fn(async () => { - pool["leasesBeingSigned"]++; - await new Promise((resolve) => setTimeout(resolve, 50)); - pool["leasesBeingSigned"]--; - return getMockLeaseProcess(); - }); - expect(pool.getSize()).toBe(0); - pool.acquire(); // should be resolved after 50ms - pool.acquire(); // should be resolved after 50ms - pool.acquire(); // should be resolved after 50ms - pool.acquire(); // should be added to the queue - pool.acquire(); // should be added to the queue - pool.acquire(); // should be added to the queue - pool.acquire(); // should be added to the queue - await jest.advanceTimersByTimeAsync(50); - expect(pool.getSize()).toBe(3); - expect(pool.getBorrowedSize()).toBe(3); - expect(pool.getAvailableSize()).toBe(0); - expect(pool["acquireQueue"].length).toBe(4); - }); - }); - describe("release()", () => { - it("releases a lease process back to the pool", async () => { - const pool = getLeasePool({ min: 3 }); - const lease1 = getMockLeaseProcess(); - const lease2 = getMockLeaseProcess(); - pool["lowPriority"].add(lease1); - pool["lowPriority"].add(lease2); - - const leaseProcess = await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(1); - expect(pool.getAvailableSize()).toBe(1); - await pool.release(leaseProcess); - expect(pool.getBorrowedSize()).toBe(0); - expect(pool.getAvailableSize()).toBe(2); - expect(pool["lowPriority"].has(lease1)).toBe(true); - expect(pool["lowPriority"].has(lease2)).toBe(true); - }); - it("releases a lease process back to the high priority pool if it has an activity", async () => { - const pool = getLeasePool({ min: 3 }); - const lease1 = getMockLeaseProcess(); - const lease2 = getMockLeaseProcess(); - const lease3 = getMockLeaseProcess(); - pool["lowPriority"].add(lease1); - pool["lowPriority"].add(lease2); - pool["lowPriority"].add(lease3); - - const leaseProcess = await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(1); - expect(pool.getAvailableSize()).toBe(2); - leaseProcess.hasActivity = () => true; - await pool.release(leaseProcess); - expect(pool.getBorrowedSize()).toBe(0); - expect(pool.getAvailableSize()).toBe(3); - expect(pool["highPriority"].size).toBe(1); - expect(pool["lowPriority"].size).toBe(2); - }); - it("destroys the lease process if the pool is full", async () => { - const pool = getLeasePool({ max: 2 }); - jest.spyOn(pool, "destroy"); - const lease1 = getMockLeaseProcess(); - const lease2 = getMockLeaseProcess(); - const lease3 = getMockLeaseProcess(); - - pool["lowPriority"].add(lease1); - pool["lowPriority"].add(lease2); - - const acquiredLease1 = await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(1); - expect(pool.getAvailableSize()).toBe(1); - - pool["lowPriority"].add(lease3); - - await pool.release(acquiredLease1); - expect(pool.getBorrowedSize()).toBe(0); - expect(pool.getAvailableSize()).toBe(2); - expect(pool["lowPriority"].has(lease2)).toBe(true); - expect(pool["lowPriority"].has(lease3)).toBe(true); - expect(pool["destroy"]).toHaveBeenCalledWith(lease1); - }); - it("destroys the lease process if it is invalid", async () => { - const pool = getLeasePool({ max: 1 }); - jest.spyOn(pool, "destroy"); - const lease1 = getMockLeaseProcess(); - - pool["lowPriority"].add(lease1); - - const acquiredLease1 = await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(1); - expect(pool.getAvailableSize()).toBe(0); - - acquiredLease1.fetchAgreementState = jest.fn().mockResolvedValue("Expired"); - - await pool.release(acquiredLease1); - expect(pool.getBorrowedSize()).toBe(0); - expect(pool.getAvailableSize()).toBe(0); - expect(pool["destroy"]).toHaveBeenCalledWith(lease1); - }); - }); - describe("destroy()", () => { - it("removes the lease process from the pool", async () => { - const pool = getLeasePool({ max: 1 }); - const lease1 = getMockLeaseProcess(); - pool["lowPriority"].add(lease1); - - const leaseProcess = await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(1); - expect(pool.getAvailableSize()).toBe(0); - pool.destroy(leaseProcess); - expect(pool.getBorrowedSize()).toBe(0); - expect(pool.getAvailableSize()).toBe(0); - }); - }); - describe("drainAndClear", () => { - it("destroys all lease processes in the pool", async () => { - const pool = getLeasePool({ max: 3 }); - jest.spyOn(pool, "destroy"); - const lease1 = getMockLeaseProcess(); - const lease2 = getMockLeaseProcess(); - const lease3 = getMockLeaseProcess(); - pool["lowPriority"].add(lease1); - pool["lowPriority"].add(lease2); - pool["lowPriority"].add(lease3); - - await pool.acquire(); - await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(2); - expect(pool.getAvailableSize()).toBe(1); - await pool.drainAndClear(); - expect(pool.getBorrowedSize()).toBe(0); - expect(pool.getAvailableSize()).toBe(0); - expect(pool["destroy"]).toHaveBeenCalledWith(lease1); - expect(pool["destroy"]).toHaveBeenCalledWith(lease2); - expect(pool["destroy"]).toHaveBeenCalledWith(lease3); - }); - it("prevents new leases from being acquired during the drain", async () => { - const pool = getLeasePool({ max: 3 }); - const realDestroy = pool.destroy; - jest.spyOn(pool, "destroy").mockImplementation(async (...args) => { - await new Promise((resolve) => setTimeout(resolve, 100)); - return realDestroy.apply(pool, args); - }); - const lease1 = getMockLeaseProcess(); - const lease2 = getMockLeaseProcess(); - const lease3 = getMockLeaseProcess(); - pool["lowPriority"].add(lease1); - pool["lowPriority"].add(lease2); - pool["lowPriority"].add(lease3); - - await pool.acquire(); - await pool.acquire(); - expect(pool.getBorrowedSize()).toBe(2); - expect(pool.getAvailableSize()).toBe(1); - const drainPromise = pool.drainAndClear(); - expect(pool.acquire()).rejects.toThrow("The pool is in draining mode"); - await drainPromise; - expect(pool.getBorrowedSize()).toBe(0); - expect(pool.getAvailableSize()).toBe(0); - expect(pool["destroy"]).toHaveBeenCalledWith(lease1); - expect(pool["destroy"]).toHaveBeenCalledWith(lease2); - expect(pool["destroy"]).toHaveBeenCalledWith(lease3); - }); - }); -}); diff --git a/src/lease-process/lease-process-pool.ts b/src/lease-process/lease-process-pool.ts deleted file mode 100644 index a4298dd7a..000000000 --- a/src/lease-process/lease-process-pool.ts +++ /dev/null @@ -1,368 +0,0 @@ -import type { Agreement, DraftOfferProposalPool, MarketModule } from "../market"; -import { GolemMarketError, MarketErrorCode } from "../market"; -import type { Logger } from "../shared/utils"; -import { createAbortSignalFromTimeout, runOnNextEventLoopIteration } from "../shared/utils"; -import { EventEmitter } from "eventemitter3"; -import type { RequireAtLeastOne } from "../shared/utils/types"; -import type { Allocation } from "../payment"; -import type { LeaseProcess, LeaseProcessOptions } from "./lease-process"; -import { Network, NetworkModule } from "../network"; -import { LeaseModule } from "./lease.module"; -import { AgreementOptions } from "../market/agreement/agreement"; - -export interface LeaseProcessPoolDependencies { - allocation: Allocation; - proposalPool: DraftOfferProposalPool; - marketModule: MarketModule; - networkModule: NetworkModule; - leaseModule: LeaseModule; - logger: Logger; -} - -export type Concurrency = number | RequireAtLeastOne<{ min: number; max: number }>; - -export interface LeaseProcessPoolOptions { - replicas?: Concurrency; - network?: Network; - leaseProcessOptions?: LeaseProcessOptions; - agreementOptions?: AgreementOptions; -} - -export interface LeaseProcessPoolEvents { - ready: () => void; - end: () => void; - acquired: (agreement: Agreement) => void; - released: (agreement: Agreement) => void; - created: (agreement: Agreement) => void; - destroyed: (agreement: Agreement) => void; - error: (error: GolemMarketError) => void; -} - -const MAX_REPLICAS = 100; - -export class LeaseProcessPool { - public readonly events = new EventEmitter(); - - /** - * Pool of lease processes that do not have an activity - */ - private lowPriority = new Set(); - /** - * Pool of lease processes that have an activity - */ - private highPriority = new Set(); - private borrowed = new Set(); - /** - * Queue of functions that are waiting for a lease process to be available - */ - private acquireQueue: Array<(lease: LeaseProcess) => void> = []; - private isDraining = false; - private logger: Logger; - - private allocation: Allocation; - private network?: Network; - private proposalPool: DraftOfferProposalPool; - private marketModule: MarketModule; - private networkModule: NetworkModule; - private leaseModule: LeaseModule; - private readonly minPoolSize: number; - private readonly maxPoolSize: number; - private readonly leaseProcessOptions?: LeaseProcessOptions; - private readonly agreementOptions?: AgreementOptions; - /** - * Number of lease processes that are currently being signed. - * This is used to prevent creating more lease processes than the pool size allows. - */ - private leasesBeingSigned = 0; - - constructor(options: LeaseProcessPoolOptions & LeaseProcessPoolDependencies) { - this.allocation = options.allocation; - this.proposalPool = options.proposalPool; - this.marketModule = options.marketModule; - this.leaseModule = options.leaseModule; - this.networkModule = options.networkModule; - this.network = options.network; - this.leaseProcessOptions = options.leaseProcessOptions; - this.agreementOptions = options.agreementOptions; - - this.logger = options.logger; - - this.minPoolSize = - (() => { - if (typeof options?.replicas === "number") { - return options?.replicas; - } - if (typeof options?.replicas === "object") { - return options?.replicas.min; - } - })() || 0; - - this.maxPoolSize = - (() => { - if (typeof options?.replicas === "object") { - return options?.replicas.max; - } - })() || MAX_REPLICAS; - } - - private async createNewLeaseProcess() { - this.logger.debug("Creating new lease process to add to pool"); - try { - this.leasesBeingSigned++; - const agreement = await this.marketModule.signAgreementFromPool(this.proposalPool, this.agreementOptions); - const networkNode = this.network - ? await this.networkModule.createNetworkNode(this.network, agreement.provider.id) - : undefined; - const leaseProcess = this.leaseModule.createLease(agreement, this.allocation, { - networkNode, - ...this.leaseProcessOptions, - }); - this.events.emit("created", agreement); - return leaseProcess; - } catch (error) { - this.events.emit( - "error", - new GolemMarketError("Creating lease process failed", MarketErrorCode.LeaseProcessCreationFailed, error), - ); - this.logger.error("Creating lease process failed", error); - throw error; - } finally { - this.leasesBeingSigned--; - } - } - - private async validate(leaseProcess: LeaseProcess) { - try { - const state = await leaseProcess.fetchAgreementState(); - const result = state === "Approved"; - this.logger.debug("Validated lease process in the pool", { result, state }); - return result; - } catch (err) { - this.logger.error("Something went wrong while validating lease process, it will be destroyed", err); - return false; - } - } - - private canCreateMoreLeaseProcesses() { - return this.getSize() + this.leasesBeingSigned < this.maxPoolSize; - } - - /** - * Take the first valid lease process from the pool - * If there is no valid lease process, return null - */ - private async takeValidLeaseProcess(): Promise { - let leaseProcess: LeaseProcess | null = null; - if (this.highPriority.size > 0) { - leaseProcess = this.highPriority.values().next().value as LeaseProcess; - this.highPriority.delete(leaseProcess); - } else if (this.lowPriority.size > 0) { - leaseProcess = this.lowPriority.values().next().value as LeaseProcess; - this.lowPriority.delete(leaseProcess); - } - if (!leaseProcess) { - return null; - } - const isValid = await this.validate(leaseProcess); - if (!isValid) { - await this.destroy(leaseProcess); - return this.takeValidLeaseProcess(); - } - return leaseProcess; - } - - private async enqueueAcquire(): Promise { - return new Promise((resolve) => { - this.acquireQueue.push((leaseProcess) => { - this.borrowed.add(leaseProcess); - this.events.emit("acquired", leaseProcess.agreement); - resolve(leaseProcess); - }); - }); - } - - /** - * Borrow a lease process from the pool. If there is no valid lease process a new one will be created. - */ - async acquire(): Promise { - if (this.isDraining) { - throw new Error("The pool is in draining mode"); - } - let leaseProcess = await this.takeValidLeaseProcess(); - if (!leaseProcess) { - if (!this.canCreateMoreLeaseProcesses()) { - return this.enqueueAcquire(); - } - leaseProcess = await this.createNewLeaseProcess(); - } - this.borrowed.add(leaseProcess); - this.events.emit("acquired", leaseProcess.agreement); - return leaseProcess; - } - - /** - * If there are any acquires waiting in the queue, the lease process will be passed to the first one. - * Otherwise, the lease process will be added to the queue. - */ - private passLeaseProcessToWaitingAcquireOrBackToPool(leaseProcess: LeaseProcess) { - if (this.acquireQueue.length > 0) { - const acquire = this.acquireQueue.shift()!; - acquire(leaseProcess); - return; - } - if (leaseProcess.hasActivity()) { - this.highPriority.add(leaseProcess); - } else { - this.lowPriority.add(leaseProcess); - } - } - - async release(leaseProcess: LeaseProcess): Promise { - if (this.getAvailableSize() >= this.maxPoolSize) { - return this.destroy(leaseProcess); - } - this.borrowed.delete(leaseProcess); - const isValid = await this.validate(leaseProcess); - if (!isValid) { - return this.destroy(leaseProcess); - } - this.events.emit("released", leaseProcess.agreement); - this.passLeaseProcessToWaitingAcquireOrBackToPool(leaseProcess); - } - - async destroy(leaseProcess: LeaseProcess): Promise { - try { - this.borrowed.delete(leaseProcess); - this.logger.debug("Destroying lease process from the pool", { agreementId: leaseProcess.agreement.id }); - await Promise.all([leaseProcess.finalize(), this.removeNetworkNode(leaseProcess)]); - this.events.emit("destroyed", leaseProcess.agreement); - } catch (error) { - this.events.emit( - "error", - new GolemMarketError("Destroying lease process failed", MarketErrorCode.LeaseProcessTerminationFailed, error), - ); - this.logger.error("Destroying lease process failed", error); - } - } - - /** - * Sets the pool into draining mode and then clears it - * - * When set to drain mode, no new acquires will be possible. At the same time, all agreements in the pool will be terminated with the Providers. - * - * @return Resolves when all agreements are terminated - */ - async drainAndClear() { - this.isDraining = true; - this.acquireQueue = []; - const allLeaseProcesses = Array.from(this.borrowed) - .concat(Array.from(this.lowPriority)) - .concat(Array.from(this.highPriority)); - await Promise.allSettled(allLeaseProcesses.map((leaseProcess) => this.destroy(leaseProcess))); - this.lowPriority.clear(); - this.highPriority.clear(); - this.borrowed.clear(); - this.isDraining = false; - this.events.emit("end"); - return; - } - - /** - * Total size (available + borrowed) - */ - getSize() { - return this.getAvailableSize() + this.getBorrowedSize(); - } - - /** - * Available size (how many lease processes are ready to be borrowed) - */ - getAvailableSize() { - return this.lowPriority.size + this.highPriority.size; - } - - /** - * Borrowed size (how many lease processes are currently out of the pool) - */ - getBorrowedSize() { - return this.borrowed.size; - } - - /** - * Wait till the pool is ready to use (min number of items in pool are usable). - * If an error occurs while creating new lease processes, it will be retried until the pool is ready - * (potentially indefinitely). To stop this process if it fails to reach the desired state in a given time, - * you can pass either a timeout in milliseconds or an AbortSignal. - * - * @example - * ```typescript - * await pool.ready(10_000); // If the pool is not ready in 10 seconds, an error will be thrown - * ``` - * @example - * ```typescript - * await pool.ready(AbortSignal.timeout(10_000)); // If the pool is not ready in 10 seconds, an error will be thrown - * ``` - */ - async ready(timeoutMs?: number): Promise; - async ready(abortSignal?: AbortSignal): Promise; - async ready(timeoutOrAbortSignal?: number | AbortSignal): Promise { - if (this.minPoolSize <= this.getAvailableSize()) { - return; - } - const signal = createAbortSignalFromTimeout(timeoutOrAbortSignal); - - const tryCreatingMissingLeaseProcesses = async () => { - await Promise.allSettled( - new Array(this.minPoolSize - this.getAvailableSize()).fill(0).map(() => - this.createNewLeaseProcess().then( - (leaseProcess) => this.lowPriority.add(leaseProcess), - (error) => this.logger.error("Creating lease process failed", error), - ), - ), - ); - }; - - while (this.minPoolSize > this.getAvailableSize()) { - if (signal.aborted) { - break; - } - await runOnNextEventLoopIteration(tryCreatingMissingLeaseProcesses); - } - - if (this.minPoolSize > this.getAvailableSize()) { - throw new Error("Could not create enough lease processes to reach the minimum pool size in time"); - } - this.events.emit("ready"); - } - - private async removeNetworkNode(leaseProcess: LeaseProcess) { - if (this.network && leaseProcess.networkNode) { - this.logger.debug("Removing a node from the network", { - network: this.network.getNetworkInfo().ip, - nodeIp: leaseProcess.networkNode.ip, - }); - await this.networkModule.removeNetworkNode(this.network, leaseProcess.networkNode); - } - } - - /** - * Acquire a lease process from the pool and release it after the callback is done - * @example - * ```typescript - * const result = await pool.withLease(async (lease) => { - * // Do something with the lease - * return result; - * // pool.release(lease) is called automatically - * // even if an error is thrown in the callback - * }); - * ``` - */ - public async withLease(callback: (lease: LeaseProcess) => Promise): Promise { - const lease = await this.acquire(); - try { - return await callback(lease); - } finally { - await this.release(lease); - } - } -} diff --git a/src/market/error.ts b/src/market/error.ts index 1d54f6a43..2b1ed1f71 100644 --- a/src/market/error.ts +++ b/src/market/error.ts @@ -10,8 +10,8 @@ export enum MarketErrorCode { ProposalResponseFailed = "ProposalResponseFailed", ProposalRejectionFailed = "ProposalRejectionFailed", DemandExpired = "DemandExpired", - LeaseProcessTerminationFailed = "LeaseProcessTerminationFailed", - LeaseProcessCreationFailed = "LeaseProcessCreationFailed", + ResourceRentalTerminationFailed = "LeaseProcessTerminationFailed", + ResourceRentalCreationFailed = "LeaseProcessCreationFailed", AgreementApprovalFailed = "AgreementApprovalFailed", NoProposalAvailable = "NoProposalAvailable", InternalError = "InternalError", diff --git a/src/market/market.module.ts b/src/market/market.module.ts index 4a487fe73..10a3a22bb 100644 --- a/src/market/market.module.ts +++ b/src/market/market.module.ts @@ -40,7 +40,7 @@ import { GolemUserError } from "../shared/error/golem-error"; import { MarketOrderSpec } from "../golem-network"; import { INetworkApi, NetworkModule } from "../network"; import { AgreementOptions } from "./agreement/agreement"; -import { Concurrency } from "../lease-process"; +import { Concurrency } from "../resource-rental"; export type DemandEngine = "vm" | "vm-nvidia" | "wasmtime"; diff --git a/src/resource-rental/index.ts b/src/resource-rental/index.ts new file mode 100644 index 000000000..22e852149 --- /dev/null +++ b/src/resource-rental/index.ts @@ -0,0 +1,3 @@ +export * from "./resource-rental"; +export * from "./resource-rental-pool"; +export * from "./rental.module"; diff --git a/src/lease-process/lease.module.ts b/src/resource-rental/rental.module.ts similarity index 63% rename from src/lease-process/lease.module.ts rename to src/resource-rental/rental.module.ts index 2414ba9bd..91d04acb1 100644 --- a/src/lease-process/lease.module.ts +++ b/src/resource-rental/rental.module.ts @@ -4,27 +4,27 @@ import { NetworkModule } from "../network"; import { Allocation, PaymentModule } from "../payment"; import { StorageProvider } from "../shared/storage"; import { Logger } from "../shared/utils"; -import { LeaseProcess, LeaseProcessOptions } from "./lease-process"; -import { LeaseProcessPool, LeaseProcessPoolOptions } from "./lease-process-pool"; +import { ResourceRental, ResourceRentalOptions } from "./resource-rental"; +import { ResourceRentalPool, ResourceRentalPoolOptions } from "./resource-rental-pool"; -export interface LeaseModule { +export interface RentalModule { /** * Factory that creates a new lease process that's fully configured. * This method will also create the payment process for the agreement. * */ - createLease(agreement: Agreement, allocation: Allocation, options?: LeaseProcessOptions): LeaseProcess; + createResourceRental(agreement: Agreement, allocation: Allocation, options?: ResourceRentalOptions): ResourceRental; /** * Factory that creates new lease process pool that's fully configured */ - createLeaseProcessPool( + createResourceRentalPool( draftPool: DraftOfferProposalPool, allocation: Allocation, - options?: LeaseProcessPoolOptions, - ): LeaseProcessPool; + options?: ResourceRentalPoolOptions, + ): ResourceRentalPool; } -export class LeaseModuleImpl implements LeaseModule { +export class RentalModuleImpl implements RentalModule { constructor( private readonly deps: { marketModule: MarketModule; @@ -36,13 +36,13 @@ export class LeaseModuleImpl implements LeaseModule { }, ) {} - createLease(agreement: Agreement, allocation: Allocation, options?: LeaseProcessOptions): LeaseProcess { + createResourceRental(agreement: Agreement, allocation: Allocation, options?: ResourceRentalOptions): ResourceRental { const paymentProcess = this.deps.paymentModule.createAgreementPaymentProcess( agreement, allocation, options?.payment, ); - const lease = new LeaseProcess( + const lease = new ResourceRental( agreement, this.deps.storageProvider, paymentProcess, @@ -54,19 +54,19 @@ export class LeaseModuleImpl implements LeaseModule { return lease; } - public createLeaseProcessPool( + public createResourceRentalPool( draftPool: DraftOfferProposalPool, allocation: Allocation, - options?: LeaseProcessPoolOptions, - ): LeaseProcessPool { - return new LeaseProcessPool({ + options?: ResourceRentalPoolOptions, + ): ResourceRentalPool { + return new ResourceRentalPool({ allocation, - leaseModule: this, + rentalModule: this, marketModule: this.deps.marketModule, networkModule: this.deps.networkModule, proposalPool: draftPool, - leaseProcessOptions: options?.leaseProcessOptions, - logger: this.deps.logger.child("lease-process-pool"), + resourceRentalOptions: options?.resourceRentalOptions, + logger: this.deps.logger.child("resource-rental-pool"), network: options?.network, replicas: options?.replicas, }); diff --git a/src/resource-rental/resource-rental-pool.test.ts b/src/resource-rental/resource-rental-pool.test.ts new file mode 100644 index 000000000..ff60195ce --- /dev/null +++ b/src/resource-rental/resource-rental-pool.test.ts @@ -0,0 +1,366 @@ +import type { Agreement } from "../market/agreement/agreement"; +import { _, imock, instance, mock, reset, spy, verify, when } from "@johanblumenberg/ts-mockito"; +import { ResourceRental } from "./resource-rental"; +import { Allocation } from "../payment"; +import type { MarketModule } from "../market"; +import { DraftOfferProposalPool } from "../market"; +import { ResourceRentalPool } from "./resource-rental-pool"; +import { type RequireAtLeastOne } from "../shared/utils/types"; +import { NetworkModule } from "../network"; +import { RentalModule } from "./rental.module"; +import { Logger } from "../shared/utils"; + +const allocation = mock(Allocation); +const proposalPool = mock(DraftOfferProposalPool); +const marketModule = imock(); +const networkModule = imock(); +const rentalModule = imock(); + +function getMockResourceRental() { + return { + hasActivity: () => false, + fetchAgreementState: () => Promise.resolve("Approved"), + agreement: { id: "1" } as Agreement, + } as ResourceRental; +} + +function getRentalPool(replicas: RequireAtLeastOne<{ min: number; max: number }>) { + return new ResourceRentalPool({ + allocation: instance(allocation), + proposalPool: instance(proposalPool), + marketModule: instance(marketModule), + networkModule: instance(networkModule), + rentalModule: instance(rentalModule), + logger: instance(imock()), + network: undefined, + replicas, + }); +} + +beforeEach(() => { + jest.useRealTimers(); + jest.clearAllMocks(); + reset(allocation); + reset(proposalPool); + reset(marketModule); + reset(networkModule); + reset(rentalModule); +}); + +describe("ResourceRentalPool", () => { + describe("ready()", () => { + it("prepares MIN_POOL_SIZE resource rentals", async () => { + when(marketModule.signAgreementFromPool(_, _)).thenResolve({} as Agreement); + when(rentalModule.createResourceRental(_, _, _)).thenCall(() => ({}) as ResourceRental); + + const pool = getRentalPool({ min: 5, max: 10 }); + + await pool.ready(); + + expect(pool.getAvailableSize()).toBe(5); + verify(marketModule.signAgreementFromPool(_, _)).times(5); + }); + it("retries on error", async () => { + when(rentalModule.createResourceRental(_, _, _)).thenCall(() => ({}) as ResourceRental); + + const fakeAgreement = {} as Agreement; + when(marketModule.signAgreementFromPool(_, _)) + .thenResolve(fakeAgreement) + .thenReject(new Error("Failed to propose agreement")) + .thenResolve(fakeAgreement) + .thenReject(new Error("Failed to propose agreement")) + .thenResolve(fakeAgreement); + + const pool = getRentalPool({ min: 3 }); + + await pool.ready(); + + expect(pool.getAvailableSize()).toBe(3); + verify(marketModule.signAgreementFromPool(_, _)).times(5); + }); + it("stops retrying after abort signal is triggered", async () => { + const pool = getRentalPool({ min: 3 }); + pool["createNewResourceRental"] = jest + .fn( + () => + new Promise((_, reject) => + setTimeout(() => reject(new Error("Failed to propose agreement")), 50), + ), + ) + // the first call will succeed, the rest will fail (fall back to the first implementation) + .mockImplementationOnce(() => new Promise((resolve) => setTimeout(() => resolve(getMockResourceRental()), 50))); + + await expect(pool.ready(AbortSignal.timeout(60))).rejects.toThrow( + "Could not create enough resource rentals to reach the minimum pool size in time", + ); + expect(pool.getAvailableSize()).toBe(1); + // first loop 3 times, then 2 times + expect(pool["createNewResourceRental"]).toHaveBeenCalledTimes(5); + }); + it("stops retrying after specified timeout is reached", async () => { + const pool = getRentalPool({ min: 3 }); + const poolSpy = spy(pool); + when(poolSpy["createNewResourceRental"]()) + .thenResolve(getMockResourceRental()) + .thenReject(new Error("Failed to propose agreement")); + + await expect(pool.ready(10)).rejects.toThrow( + "Could not create enough resource rentals to reach the minimum pool size in time", + ); + expect(pool.getAvailableSize()).toBe(1); + verify(poolSpy["createNewResourceRental"]()).atLeast(3); + }); + }); + describe("acquire()", () => { + it("takes a random resource rental from the pool if none have activities", async () => { + const pool = getRentalPool({ min: 3 }); + const rental1 = getMockResourceRental(); + const rental2 = getMockResourceRental(); + const rental3 = getMockResourceRental(); + pool["lowPriority"].add(rental1); + pool["lowPriority"].add(rental2); + pool["lowPriority"].add(rental3); + + const resourceRental = await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(1); + expect(pool.getAvailableSize()).toBe(2); + expect([rental1, rental2, rental3]).toContain(resourceRental); + }); + it("prioritizes resource rentals from high priority pool", async () => { + const pool = getRentalPool({ min: 3 }); + const rental1 = getMockResourceRental(); + const rental2 = getMockResourceRental(); + const rental3 = getMockResourceRental(); + pool["lowPriority"].add(rental1); + pool["highPriority"].add(rental2); + pool["lowPriority"].add(rental3); + + const resourceRental = await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(1); + expect(pool.getAvailableSize()).toBe(2); + expect(resourceRental).toBe(rental2); + }); + it("creates a new resource rental if none are available", async () => { + const pool = getRentalPool({ min: 3 }); + pool["createNewResourceRental"] = jest.fn(() => Promise.resolve(getMockResourceRental())); + + expect(pool.getSize()).toBe(0); + await pool.acquire(); + expect(pool.getSize()).toBe(1); + expect(pool.getBorrowedSize()).toBe(1); + expect(pool.getAvailableSize()).toBe(0); + }); + it("waits for a rental to become available when the pool is full", async () => { + const pool = getRentalPool({ min: 3, max: 3 }); + const rental1 = getMockResourceRental(); + const rental2 = getMockResourceRental(); + const rental3 = getMockResourceRental(); + pool["lowPriority"].add(rental1); + pool["lowPriority"].add(rental2); + pool["lowPriority"].add(rental3); + + const acquiredRental1 = await pool.acquire(); + await pool.acquire(); + await pool.acquire(); + + expect(pool.getAvailableSize()).toBe(0); + expect(pool.getBorrowedSize()).toBe(3); + const acquiredRentalPromise = pool.acquire(); + // go to the next tick + await Promise.resolve(); + expect(pool["acquireQueue"].length).toBe(1); + pool.release(acquiredRental1); + await acquiredRentalPromise; + expect(pool.getAvailableSize()).toBe(0); + expect(pool.getBorrowedSize()).toBe(3); + expect(pool["acquireQueue"].length).toBe(0); + }); + it("validates the resource rental before returning it", async () => { + const pool = getRentalPool({ min: 3 }); + const newlyCreatedRental = getMockResourceRental(); + jest.spyOn(pool, "destroy"); + pool["createNewResourceRental"] = jest.fn(() => Promise.resolve(newlyCreatedRental)); + + const rental1 = getMockResourceRental(); + rental1.fetchAgreementState = jest.fn().mockResolvedValue("Expired"); + const rental2 = getMockResourceRental(); + rental2.fetchAgreementState = jest.fn().mockResolvedValue("Expired"); + pool["lowPriority"].add(rental1); + pool["lowPriority"].add(rental2); + + expect(pool.getBorrowedSize()).toBe(0); + expect(pool.getAvailableSize()).toBe(2); + const resourceRental = await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(1); + expect(pool.getAvailableSize()).toBe(0); + expect(resourceRental).toBe(newlyCreatedRental); + expect(pool["destroy"]).toHaveBeenCalledWith(rental1); + expect(pool["destroy"]).toHaveBeenCalledWith(rental2); + }); + it("should not create more processes than allowed", async () => { + jest.useFakeTimers(); + const pool = getRentalPool({ min: 3, max: 3 }); + pool["createNewResourceRental"] = jest.fn(async () => { + pool["rentalsBeingSigned"]++; + await new Promise((resolve) => setTimeout(resolve, 50)); + pool["rentalsBeingSigned"]--; + return getMockResourceRental(); + }); + expect(pool.getSize()).toBe(0); + pool.acquire(); // should be resolved after 50ms + pool.acquire(); // should be resolved after 50ms + pool.acquire(); // should be resolved after 50ms + pool.acquire(); // should be added to the queue + pool.acquire(); // should be added to the queue + pool.acquire(); // should be added to the queue + pool.acquire(); // should be added to the queue + await jest.advanceTimersByTimeAsync(50); + expect(pool.getSize()).toBe(3); + expect(pool.getBorrowedSize()).toBe(3); + expect(pool.getAvailableSize()).toBe(0); + expect(pool["acquireQueue"].length).toBe(4); + }); + }); + describe("release()", () => { + it("releases a resource rental back to the pool", async () => { + const pool = getRentalPool({ min: 3 }); + const rental1 = getMockResourceRental(); + const rental2 = getMockResourceRental(); + pool["lowPriority"].add(rental1); + pool["lowPriority"].add(rental2); + + const resourceRental = await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(1); + expect(pool.getAvailableSize()).toBe(1); + await pool.release(resourceRental); + expect(pool.getBorrowedSize()).toBe(0); + expect(pool.getAvailableSize()).toBe(2); + expect(pool["lowPriority"].has(rental1)).toBe(true); + expect(pool["lowPriority"].has(rental2)).toBe(true); + }); + it("releases a resource rental back to the high priority pool if it has an activity", async () => { + const pool = getRentalPool({ min: 3 }); + const rental1 = getMockResourceRental(); + const rental2 = getMockResourceRental(); + const rental3 = getMockResourceRental(); + pool["lowPriority"].add(rental1); + pool["lowPriority"].add(rental2); + pool["lowPriority"].add(rental3); + + const resourceRental = await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(1); + expect(pool.getAvailableSize()).toBe(2); + resourceRental.hasActivity = () => true; + await pool.release(resourceRental); + expect(pool.getBorrowedSize()).toBe(0); + expect(pool.getAvailableSize()).toBe(3); + expect(pool["highPriority"].size).toBe(1); + expect(pool["lowPriority"].size).toBe(2); + }); + it("destroys the resource rental if the pool is full", async () => { + const pool = getRentalPool({ max: 2 }); + jest.spyOn(pool, "destroy"); + const rental1 = getMockResourceRental(); + const rental2 = getMockResourceRental(); + const rental3 = getMockResourceRental(); + + pool["lowPriority"].add(rental1); + pool["lowPriority"].add(rental2); + + const acquiredRental1 = await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(1); + expect(pool.getAvailableSize()).toBe(1); + + pool["lowPriority"].add(rental3); + + await pool.release(acquiredRental1); + expect(pool.getBorrowedSize()).toBe(0); + expect(pool.getAvailableSize()).toBe(2); + expect(pool["lowPriority"].has(rental2)).toBe(true); + expect(pool["lowPriority"].has(rental3)).toBe(true); + expect(pool["destroy"]).toHaveBeenCalledWith(rental1); + }); + it("destroys the resource rental if it is invalid", async () => { + const pool = getRentalPool({ max: 1 }); + jest.spyOn(pool, "destroy"); + const rental1 = getMockResourceRental(); + + pool["lowPriority"].add(rental1); + + const acquiredRental1 = await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(1); + expect(pool.getAvailableSize()).toBe(0); + + acquiredRental1.fetchAgreementState = jest.fn().mockResolvedValue("Expired"); + + await pool.release(acquiredRental1); + expect(pool.getBorrowedSize()).toBe(0); + expect(pool.getAvailableSize()).toBe(0); + expect(pool["destroy"]).toHaveBeenCalledWith(rental1); + }); + }); + describe("destroy()", () => { + it("removes the resource rental from the pool", async () => { + const pool = getRentalPool({ max: 1 }); + const rental1 = getMockResourceRental(); + pool["lowPriority"].add(rental1); + + const resourceRental = await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(1); + expect(pool.getAvailableSize()).toBe(0); + pool.destroy(resourceRental); + expect(pool.getBorrowedSize()).toBe(0); + expect(pool.getAvailableSize()).toBe(0); + }); + }); + describe("drainAndClear", () => { + it("destroys all resource rentals in the pool", async () => { + const pool = getRentalPool({ max: 3 }); + jest.spyOn(pool, "destroy"); + const rental1 = getMockResourceRental(); + const rental2 = getMockResourceRental(); + const rental3 = getMockResourceRental(); + pool["lowPriority"].add(rental1); + pool["lowPriority"].add(rental2); + pool["lowPriority"].add(rental3); + + await pool.acquire(); + await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(2); + expect(pool.getAvailableSize()).toBe(1); + await pool.drainAndClear(); + expect(pool.getBorrowedSize()).toBe(0); + expect(pool.getAvailableSize()).toBe(0); + expect(pool["destroy"]).toHaveBeenCalledWith(rental1); + expect(pool["destroy"]).toHaveBeenCalledWith(rental2); + expect(pool["destroy"]).toHaveBeenCalledWith(rental3); + }); + it("prevents new rentals from being acquired during the drain", async () => { + const pool = getRentalPool({ max: 3 }); + const realDestroy = pool.destroy; + jest.spyOn(pool, "destroy").mockImplementation(async (...args) => { + await new Promise((resolve) => setTimeout(resolve, 100)); + return realDestroy.apply(pool, args); + }); + const rental1 = getMockResourceRental(); + const rental2 = getMockResourceRental(); + const rental3 = getMockResourceRental(); + pool["lowPriority"].add(rental1); + pool["lowPriority"].add(rental2); + pool["lowPriority"].add(rental3); + + await pool.acquire(); + await pool.acquire(); + expect(pool.getBorrowedSize()).toBe(2); + expect(pool.getAvailableSize()).toBe(1); + const drainPromise = pool.drainAndClear(); + expect(pool.acquire()).rejects.toThrow("The pool is in draining mode"); + await drainPromise; + expect(pool.getBorrowedSize()).toBe(0); + expect(pool.getAvailableSize()).toBe(0); + expect(pool["destroy"]).toHaveBeenCalledWith(rental1); + expect(pool["destroy"]).toHaveBeenCalledWith(rental2); + expect(pool["destroy"]).toHaveBeenCalledWith(rental3); + }); + }); +}); diff --git a/src/resource-rental/resource-rental-pool.ts b/src/resource-rental/resource-rental-pool.ts new file mode 100644 index 000000000..62841f207 --- /dev/null +++ b/src/resource-rental/resource-rental-pool.ts @@ -0,0 +1,376 @@ +import type { Agreement, DraftOfferProposalPool, MarketModule } from "../market"; +import { GolemMarketError, MarketErrorCode } from "../market"; +import type { Logger } from "../shared/utils"; +import { createAbortSignalFromTimeout, runOnNextEventLoopIteration } from "../shared/utils"; +import { EventEmitter } from "eventemitter3"; +import type { RequireAtLeastOne } from "../shared/utils/types"; +import type { Allocation } from "../payment"; +import type { ResourceRental, ResourceRentalOptions } from "./resource-rental"; +import { Network, NetworkModule } from "../network"; +import { RentalModule } from "./rental.module"; +import { AgreementOptions } from "../market/agreement/agreement"; + +export interface ResourceRentalPoolDependencies { + allocation: Allocation; + proposalPool: DraftOfferProposalPool; + marketModule: MarketModule; + networkModule: NetworkModule; + rentalModule: RentalModule; + logger: Logger; +} + +export type Concurrency = number | RequireAtLeastOne<{ min: number; max: number }>; + +export interface ResourceRentalPoolOptions { + replicas?: Concurrency; + network?: Network; + resourceRentalOptions?: ResourceRentalOptions; + agreementOptions?: AgreementOptions; +} + +export interface ResourceRentalPoolEvents { + ready: () => void; + end: () => void; + acquired: (agreement: Agreement) => void; + released: (agreement: Agreement) => void; + created: (agreement: Agreement) => void; + destroyed: (agreement: Agreement) => void; + error: (error: GolemMarketError) => void; +} + +const MAX_REPLICAS = 100; + +/** + * Pool of resource rentals that can be borrowed, released or destroyed. + */ +export class ResourceRentalPool { + public readonly events = new EventEmitter(); + + /** + * Pool of resource rentals that do not have an activity + */ + private lowPriority = new Set(); + /** + * Pool of resource rentals that have an activity + */ + private highPriority = new Set(); + private borrowed = new Set(); + /** + * Queue of functions that are waiting for a lease process to be available + */ + private acquireQueue: Array<(rental: ResourceRental) => void> = []; + private isDraining = false; + private logger: Logger; + + private allocation: Allocation; + private network?: Network; + private proposalPool: DraftOfferProposalPool; + private marketModule: MarketModule; + private networkModule: NetworkModule; + private rentalModule: RentalModule; + private readonly minPoolSize: number; + private readonly maxPoolSize: number; + private readonly resourceRentalOptions?: ResourceRentalOptions; + private readonly agreementOptions?: AgreementOptions; + /** + * Number of resource rentals that are currently being signed. + * This is used to prevent creating more resource rentals than the pool size allows. + */ + private rentalsBeingSigned = 0; + + constructor(options: ResourceRentalPoolOptions & ResourceRentalPoolDependencies) { + this.allocation = options.allocation; + this.proposalPool = options.proposalPool; + this.marketModule = options.marketModule; + this.rentalModule = options.rentalModule; + this.networkModule = options.networkModule; + this.network = options.network; + this.resourceRentalOptions = options.resourceRentalOptions; + this.agreementOptions = options.agreementOptions; + + this.logger = options.logger; + + this.minPoolSize = + (() => { + if (typeof options?.replicas === "number") { + return options?.replicas; + } + if (typeof options?.replicas === "object") { + return options?.replicas.min; + } + })() || 0; + + this.maxPoolSize = + (() => { + if (typeof options?.replicas === "object") { + return options?.replicas.max; + } + })() || MAX_REPLICAS; + } + + private async createNewResourceRental() { + this.logger.debug("Creating new resource rental to add to pool"); + try { + this.rentalsBeingSigned++; + const agreement = await this.marketModule.signAgreementFromPool(this.proposalPool, this.agreementOptions); + const networkNode = this.network + ? await this.networkModule.createNetworkNode(this.network, agreement.provider.id) + : undefined; + const resourceRental = this.rentalModule.createResourceRental(agreement, this.allocation, { + networkNode, + ...this.resourceRentalOptions, + }); + this.events.emit("created", agreement); + return resourceRental; + } catch (error) { + this.events.emit( + "error", + new GolemMarketError("Creating resource rental failed", MarketErrorCode.ResourceRentalCreationFailed, error), + ); + this.logger.error("Creating resource rental failed", error); + throw error; + } finally { + this.rentalsBeingSigned--; + } + } + + private async validate(resourceRental: ResourceRental) { + try { + const state = await resourceRental.fetchAgreementState(); + const result = state === "Approved"; + this.logger.debug("Validated resource rental in the pool", { result, state }); + return result; + } catch (err) { + this.logger.error("Something went wrong while validating resource rental, it will be destroyed", err); + return false; + } + } + + private canCreateMoreResourceRentals() { + return this.getSize() + this.rentalsBeingSigned < this.maxPoolSize; + } + + /** + * Take the first valid resource rental from the pool + * If there is no valid resource rental, return null + */ + private async takeValidResourceRental(): Promise { + let resourceRental: ResourceRental | null = null; + if (this.highPriority.size > 0) { + resourceRental = this.highPriority.values().next().value as ResourceRental; + this.highPriority.delete(resourceRental); + } else if (this.lowPriority.size > 0) { + resourceRental = this.lowPriority.values().next().value as ResourceRental; + this.lowPriority.delete(resourceRental); + } + if (!resourceRental) { + return null; + } + const isValid = await this.validate(resourceRental); + if (!isValid) { + await this.destroy(resourceRental); + return this.takeValidResourceRental(); + } + return resourceRental; + } + + private async enqueueAcquire(): Promise { + return new Promise((resolve) => { + this.acquireQueue.push((resourceRental) => { + this.borrowed.add(resourceRental); + this.events.emit("acquired", resourceRental.agreement); + resolve(resourceRental); + }); + }); + } + + /** + * Borrow a resource rental from the pool. + * If there is no valid resource rental a new one will be created. + */ + async acquire(): Promise { + if (this.isDraining) { + throw new Error("The pool is in draining mode"); + } + let resourceRental = await this.takeValidResourceRental(); + if (!resourceRental) { + if (!this.canCreateMoreResourceRentals()) { + return this.enqueueAcquire(); + } + resourceRental = await this.createNewResourceRental(); + } + this.borrowed.add(resourceRental); + this.events.emit("acquired", resourceRental.agreement); + return resourceRental; + } + + /** + * If there are any acquires waiting in the queue, the resource rental will be passed to the first one. + * Otherwise, the resource rental will be added to the queue. + */ + private passResourceRentalToWaitingAcquireOrBackToPool(resourceRental: ResourceRental) { + if (this.acquireQueue.length > 0) { + const acquire = this.acquireQueue.shift()!; + acquire(resourceRental); + return; + } + if (resourceRental.hasActivity()) { + this.highPriority.add(resourceRental); + } else { + this.lowPriority.add(resourceRental); + } + } + + async release(resourceRental: ResourceRental): Promise { + if (this.getAvailableSize() >= this.maxPoolSize) { + return this.destroy(resourceRental); + } + this.borrowed.delete(resourceRental); + const isValid = await this.validate(resourceRental); + if (!isValid) { + return this.destroy(resourceRental); + } + this.events.emit("released", resourceRental.agreement); + this.passResourceRentalToWaitingAcquireOrBackToPool(resourceRental); + } + + async destroy(resourceRental: ResourceRental): Promise { + try { + this.borrowed.delete(resourceRental); + this.logger.debug("Destroying resource rental from the pool", { agreementId: resourceRental.agreement.id }); + await Promise.all([resourceRental.finalize(), this.removeNetworkNode(resourceRental)]); + this.events.emit("destroyed", resourceRental.agreement); + } catch (error) { + this.events.emit( + "error", + new GolemMarketError( + "Destroying resource rental failed", + MarketErrorCode.ResourceRentalTerminationFailed, + error, + ), + ); + this.logger.error("Destroying resource rental failed", error); + } + } + + /** + * Sets the pool into draining mode and then clears it + * + * When set to drain mode, no new acquires will be possible. At the same time, all agreements in the pool will be terminated with the Providers. + * + * @return Resolves when all agreements are terminated + */ + async drainAndClear() { + this.isDraining = true; + this.acquireQueue = []; + const allResourceRentals = Array.from(this.borrowed) + .concat(Array.from(this.lowPriority)) + .concat(Array.from(this.highPriority)); + await Promise.allSettled(allResourceRentals.map((resourceRental) => this.destroy(resourceRental))); + this.lowPriority.clear(); + this.highPriority.clear(); + this.borrowed.clear(); + this.isDraining = false; + this.events.emit("end"); + return; + } + + /** + * Total size (available + borrowed) + */ + getSize() { + return this.getAvailableSize() + this.getBorrowedSize(); + } + + /** + * Available size (how many resource rental are ready to be borrowed) + */ + getAvailableSize() { + return this.lowPriority.size + this.highPriority.size; + } + + /** + * Borrowed size (how many resource rental are currently out of the pool) + */ + getBorrowedSize() { + return this.borrowed.size; + } + + /** + * Wait till the pool is ready to use (min number of items in pool are usable). + * If an error occurs while creating new resource rentals, it will be retried until the pool is ready + * (potentially indefinitely). To stop this process if it fails to reach the desired state in a given time, + * you can pass either a timeout in milliseconds or an AbortSignal. + * + * @example + * ```typescript + * await pool.ready(10_000); // If the pool is not ready in 10 seconds, an error will be thrown + * ``` + * @example + * ```typescript + * await pool.ready(AbortSignal.timeout(10_000)); // If the pool is not ready in 10 seconds, an error will be thrown + * ``` + */ + async ready(timeoutMs?: number): Promise; + async ready(abortSignal?: AbortSignal): Promise; + async ready(timeoutOrAbortSignal?: number | AbortSignal): Promise { + if (this.minPoolSize <= this.getAvailableSize()) { + return; + } + const signal = createAbortSignalFromTimeout(timeoutOrAbortSignal); + + const tryCreatingMissingResourceRentals = async () => { + await Promise.allSettled( + new Array(this.minPoolSize - this.getAvailableSize()).fill(0).map(() => + this.createNewResourceRental().then( + (resourceRental) => this.lowPriority.add(resourceRental), + (error) => this.logger.error("Creating resource rental failed", error), + ), + ), + ); + }; + + while (this.minPoolSize > this.getAvailableSize()) { + if (signal.aborted) { + break; + } + await runOnNextEventLoopIteration(tryCreatingMissingResourceRentals); + } + + if (this.minPoolSize > this.getAvailableSize()) { + throw new Error("Could not create enough resource rentals to reach the minimum pool size in time"); + } + this.events.emit("ready"); + } + + private async removeNetworkNode(resourceRental: ResourceRental) { + if (this.network && resourceRental.networkNode) { + this.logger.debug("Removing a node from the network", { + network: this.network.getNetworkInfo().ip, + nodeIp: resourceRental.networkNode.ip, + }); + await this.networkModule.removeNetworkNode(this.network, resourceRental.networkNode); + } + } + + /** + * Acquire a resource rental from the pool and release it after the callback is done + * @example + * ```typescript + * const result = await pool.withRental(async (rental) => { + * // Do something with the rented resources + * return result; + * // pool.release(rental) is called automatically + * // even if an error is thrown in the callback + * }); + * ``` + */ + public async withRental(callback: (rental: ResourceRental) => Promise): Promise { + const rental = await this.acquire(); + try { + return await callback(rental); + } finally { + await this.release(rental); + } + } +} diff --git a/src/lease-process/lease-process.ts b/src/resource-rental/resource-rental.ts similarity index 80% rename from src/lease-process/lease-process.ts rename to src/resource-rental/resource-rental.ts index 489862c04..956ed6714 100644 --- a/src/lease-process/lease-process.ts +++ b/src/resource-rental/resource-rental.ts @@ -9,25 +9,24 @@ import { NetworkNode } from "../network"; import { ExecutionOptions } from "../activity/exe-script-executor"; import { MarketModule } from "../market"; -export interface LeaseProcessEvents { +export interface ResourceRentalEvents { /** - * Raised when the lease process is fully finalized + * Raised when the rental process is fully finalized */ finalized: () => void; } -export interface LeaseProcessOptions { +export interface ResourceRentalOptions { activity?: ExecutionOptions; payment?: Partial; networkNode?: NetworkNode; } /** - * Represents a set of use-cases for invoking commands + * Combines an agreement, activity, exe unit and payment process into a single high-level abstraction. */ - -export class LeaseProcess { - public readonly events = new EventEmitter(); +export class ResourceRental { + public readonly events = new EventEmitter(); public readonly networkNode?: NetworkNode; private currentWorkContext: WorkContext | null = null; @@ -39,16 +38,16 @@ export class LeaseProcess { private readonly marketModule: MarketModule, private readonly activityModule: ActivityModule, private readonly logger: Logger, - private readonly leaseOptions?: LeaseProcessOptions, + private readonly resourceRentalOptions?: ResourceRentalOptions, ) { - this.networkNode = this.leaseOptions?.networkNode; + this.networkNode = this.resourceRentalOptions?.networkNode; // TODO: Listen to agreement events to know when it goes down due to provider closing it! } /** - * Resolves when the lease will be fully terminated and all pending business operations finalized. - * If the lease is already finalized, it will resolve immediately. + * Resolves when the rental will be fully terminated and all pending business operations finalized. + * If the rental is already finalized, it will resolve immediately. */ async finalize() { if (this.paymentProcess.isFinished()) { @@ -88,8 +87,8 @@ export class LeaseProcess { const activity = await this.activityModule.createActivity(this.agreement); this.currentWorkContext = await this.activityModule.createWorkContext(activity, { storageProvider: this.storageProvider, - networkNode: this.leaseOptions?.networkNode, - execution: this.leaseOptions?.activity, + networkNode: this.resourceRentalOptions?.networkNode, + execution: this.resourceRentalOptions?.activity, }); return this.currentWorkContext; diff --git a/src/shared/yagna/adapters/market-api-adapter.ts b/src/shared/yagna/adapters/market-api-adapter.ts index aa8c07d35..ac9ff2c1d 100644 --- a/src/shared/yagna/adapters/market-api-adapter.ts +++ b/src/shared/yagna/adapters/market-api-adapter.ts @@ -243,7 +243,7 @@ export class MarketApiAdapter implements IMarketApi { if (typeof agreementId !== "string") { throw new GolemMarketError( `Unable to create agreement. Invalid response from the server`, - MarketErrorCode.LeaseProcessCreationFailed, + MarketErrorCode.ResourceRentalCreationFailed, ); } @@ -258,7 +258,7 @@ export class MarketApiAdapter implements IMarketApi { const message = getMessageFromApiError(error); throw new GolemMarketError( `Unable to create agreement ${message}`, - MarketErrorCode.LeaseProcessCreationFailed, + MarketErrorCode.ResourceRentalCreationFailed, error, ); } @@ -313,7 +313,7 @@ export class MarketApiAdapter implements IMarketApi { const message = getMessageFromApiError(error); throw new GolemMarketError( `Unable to terminate agreement ${agreement.id}. ${message}`, - MarketErrorCode.LeaseProcessTerminationFailed, + MarketErrorCode.ResourceRentalTerminationFailed, error, ); } diff --git a/tests/e2e/leaseProcessPool.spec.ts b/tests/e2e/leaseProcessPool.spec.ts index 547fb9094..b4f78d64a 100644 --- a/tests/e2e/leaseProcessPool.spec.ts +++ b/tests/e2e/leaseProcessPool.spec.ts @@ -1,7 +1,7 @@ import { Subscription } from "rxjs"; import { Allocation, DraftOfferProposalPool, GolemNetwork } from "../../src"; -describe("LeaseProcessPool", () => { +describe("ResourceRentalPool", () => { const glm = new GolemNetwork(); let proposalPool: DraftOfferProposalPool; let allocation: Allocation; @@ -51,22 +51,22 @@ describe("LeaseProcessPool", () => { }); it("should run a simple script on the activity from the pool", async () => { - const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); pool.events.on("error", (error) => { throw error; }); - const leaseProcess = await pool.acquire(); + const resourceRental = await pool.acquire(); expect(pool.getSize()).toEqual(1); expect(pool.getAvailableSize()).toEqual(0); expect(pool.getBorrowedSize()).toEqual(1); - const result = await leaseProcess.getExeUnit().then((exe) => exe.run("echo Hello World")); + const result = await resourceRental.getExeUnit().then((exe) => exe.run("echo Hello World")); expect(result.stdout?.toString().trim()).toEqual("Hello World"); - await pool.destroy(leaseProcess); + await pool.destroy(resourceRental); await pool.drainAndClear(); }); it("should prepare two activity ready to use", async () => { - const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 2 }); + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 2 }); pool.events.on("error", (error) => { throw error; }); @@ -74,64 +74,64 @@ describe("LeaseProcessPool", () => { expect(pool.getSize()).toEqual(2); expect(pool.getAvailableSize()).toEqual(2); expect(pool.getBorrowedSize()).toEqual(0); - const lease1 = await pool.acquire(); - const activity1 = await lease1.getExeUnit(); + const rental1 = await pool.acquire(); + const activity1 = await rental1.getExeUnit(); expect(pool.getAvailableSize()).toEqual(1); expect(pool.getBorrowedSize()).toEqual(1); - const lease2 = await pool.acquire(); - const activity2 = await lease2.getExeUnit(); + const rental2 = await pool.acquire(); + const activity2 = await rental2.getExeUnit(); expect(pool.getAvailableSize()).toEqual(0); expect(pool.getBorrowedSize()).toEqual(2); expect(activity1).toBeDefined(); expect(activity2).toBeDefined(); expect(activity1.provider.id).not.toEqual(activity2.provider.id); - await pool.release(lease1); + await pool.release(rental1); expect(pool.getAvailableSize()).toEqual(1); expect(pool.getBorrowedSize()).toEqual(1); - await pool.release(lease2); + await pool.release(rental2); expect(pool.getAvailableSize()).toEqual(2); expect(pool.getBorrowedSize()).toEqual(0); await pool.drainAndClear(); }); it("should release the activity and reuse it again", async () => { - const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 1 }); + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); pool.events.on("error", (error) => { throw error; }); - const lease = await pool.acquire(); - const activity = await lease.getExeUnit(); + const rental = await pool.acquire(); + const activity = await rental.getExeUnit(); const result1 = await activity.run("echo result-1"); expect(result1.stdout?.toString().trim()).toEqual("result-1"); - await pool.release(lease); - const sameLease = await pool.acquire(); - const activityAfterRelease = await sameLease.getExeUnit(); + await pool.release(rental); + const sameRental = await pool.acquire(); + const activityAfterRelease = await sameRental.getExeUnit(); const result2 = await activityAfterRelease.run("echo result-2"); expect(result2.stdout?.toString().trim()).toEqual("result-2"); - await pool.destroy(sameLease); + await pool.destroy(sameRental); expect(activity.activity.id).toEqual(activityAfterRelease.activity.id); await pool.drainAndClear(); }); it("should terminate all agreements after drain and clear the poll", async () => { - const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 2 }); + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 2 }); pool.events.on("error", (error) => { throw error; }); const agreementTerminatedIds: string[] = []; pool.events.on("destroyed", (agreement) => agreementTerminatedIds.push(agreement.id)); - const lease1 = await pool.acquire(); - const lease2 = await pool.acquire(); + const rental1 = await pool.acquire(); + const rental2 = await pool.acquire(); - const activity1 = await lease1.getExeUnit(); - const activity2 = await lease2.getExeUnit(); + const activity1 = await rental1.getExeUnit(); + const activity2 = await rental2.getExeUnit(); await activity1.run("echo result-1"); await activity2.run("echo result-2"); - await pool.release(lease1); - await pool.release(lease2); + await pool.release(rental1); + await pool.release(rental2); await pool.drainAndClear(); expect(agreementTerminatedIds.sort()).toEqual( [activity1.activity.agreement.id, activity2.activity.agreement.id].sort(), @@ -140,35 +140,37 @@ describe("LeaseProcessPool", () => { it("should establish a connection between two activities from pool via vpn", async () => { const network = await glm.network.createNetwork(); - const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: 2, network }); + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 2, network }); pool.events.on("error", (error) => { throw error; }); - const leaseProcess1 = await pool.acquire(); - const leaseProcess2 = await pool.acquire(); - const exe1 = await leaseProcess1.getExeUnit(); - const exe2 = await leaseProcess2.getExeUnit(); + const resourceRental1 = await pool.acquire(); + const resourceRental2 = await pool.acquire(); + const exe1 = await resourceRental1.getExeUnit(); + const exe2 = await resourceRental2.getExeUnit(); const result1 = await exe1.run(`ping ${exe2.getIp()} -c 4`); const result2 = await exe2.run(`ping ${exe1.getIp()} -c 4`); expect(result1.stdout?.toString().trim()).toMatch("4 packets transmitted, 4 packets received, 0% packet loss"); expect(result2.stdout?.toString().trim()).toMatch("4 packets transmitted, 4 packets received, 0% packet loss"); expect(Object.keys(network.getNetworkInfo().nodes)).toEqual(["192.168.0.1", "192.168.0.2", "192.168.0.3"]); - await pool.destroy(leaseProcess1); - await pool.destroy(leaseProcess2); + await pool.destroy(resourceRental1); + await pool.destroy(resourceRental2); await pool.drainAndClear(); await glm.network.removeNetwork(network); }); - it("should not lease more process than maximum size", async () => { + it("should not rent more resources than maximum size", async () => { const maxPoolSize = 3; - const pool = glm.lease.createLeaseProcessPool(proposalPool, allocation, { replicas: { min: 1, max: maxPoolSize } }); + const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { + replicas: { min: 1, max: maxPoolSize }, + }); const poolSizesDuringWork: number[] = []; pool.events.on("acquired", () => poolSizesDuringWork.push(pool.getSize())); const data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; await Promise.allSettled( data.map((item) => - pool.withLease((lease) => - lease.getExeUnit().then((exe) => exe.run(`echo ${item} from provider ${exe.provider.name}`)), + pool.withRental((rental) => + rental.getExeUnit().then((exe) => exe.run(`echo ${item} from provider ${exe.provider.name}`)), ), ), ); From 8de6eff17ea7041c86ad7a4f1ee79467b51fbd4d Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 12:23:44 +0200 Subject: [PATCH 02/10] feat: rename lease to rental in GolemNetwork --- src/golem-network/golem-network.test.ts | 36 +++++++++---------- src/golem-network/golem-network.ts | 46 ++++++++++++------------- src/market/error.ts | 4 +-- 3 files changed, 43 insertions(+), 43 deletions(-) diff --git a/src/golem-network/golem-network.test.ts b/src/golem-network/golem-network.test.ts index 2b48916ad..9641faae2 100644 --- a/src/golem-network/golem-network.test.ts +++ b/src/golem-network/golem-network.test.ts @@ -1,6 +1,6 @@ import { Subject } from "rxjs"; import { ActivityModuleImpl } from "../activity"; -import { LeaseModuleImpl, LeaseProcess, LeaseProcessPool } from "../lease-process"; +import { RentalModuleImpl, ResourceRental, ResourceRentalPool } from "../resource-rental"; import { DraftOfferProposalPool, MarketModuleImpl, OfferProposal } from "../market"; import { NetworkModuleImpl } from "../network"; import { Allocation, PaymentModuleImpl } from "../payment"; @@ -29,7 +29,7 @@ const mockMarket = mock(MarketModuleImpl); const mockPayment = mock(PaymentModuleImpl); const mockActivity = mock(ActivityModuleImpl); const mockNetwork = mock(NetworkModuleImpl); -const mockLease = mock(LeaseModuleImpl); +const mockRental = mock(RentalModuleImpl); const mockYagna = mock(YagnaApi); const mockPaymentApi = mock(PaymentApiAdapter); const mockActivityApi = mock(ActivityApiAdapter); @@ -42,7 +42,7 @@ afterEach(() => { reset(mockMarket); reset(mockPayment); reset(mockNetwork); - reset(mockLease); + reset(mockRental); reset(mockPaymentApi); reset(mockActivityApi); reset(mockMarketApi); @@ -58,7 +58,7 @@ function getGolemNetwork() { market: instance(mockMarket), payment: instance(mockPayment), network: instance(mockNetwork), - lease: instance(mockLease), + rental: instance(mockRental), paymentApi: instance(mockPaymentApi), activityApi: instance(mockActivityApi), marketApi: instance(mockMarketApi), @@ -70,11 +70,11 @@ function getGolemNetwork() { describe("Golem Network", () => { describe("oneOf()", () => { it("should create a lease and clean it up when disconnected", async () => { - const mockLeaseProcess = mock(LeaseProcess); - const testProcess = instance(mockLeaseProcess); + const mockResourceRental = mock(ResourceRental); + const testProcess = instance(mockResourceRental); - when(mockLeaseProcess.finalize()).thenResolve(); - when(mockLease.createLease(_, _, _)).thenReturn(testProcess); + when(mockResourceRental.finalize()).thenResolve(); + when(mockRental.createResourceRental(_, _, _)).thenReturn(testProcess); const draftProposal$ = new Subject(); when(mockMarket.collectDraftOfferProposals(_)).thenReturn(draftProposal$); @@ -93,16 +93,16 @@ describe("Golem Network", () => { await glm.disconnect(); - verify(mockLeaseProcess.finalize()).once(); + verify(mockResourceRental.finalize()).once(); verify(mockPayment.releaseAllocation(allocation)).once(); }); it("should not release the allocation if it was provided by the user", async () => { const allocation = instance(mock(Allocation)); - const mockLeaseProcess = mock(LeaseProcess); - const testProcess = instance(mockLeaseProcess); - when(mockLeaseProcess.finalize()).thenResolve(); - when(mockLease.createLease(_, _, _)).thenReturn(testProcess); + const mockResourceRental = mock(ResourceRental); + const testProcess = instance(mockResourceRental); + when(mockResourceRental.finalize()).thenResolve(); + when(mockRental.createResourceRental(_, _, _)).thenReturn(testProcess); when(mockMarket.collectDraftOfferProposals(_)).thenReturn(new Subject()); jest.spyOn(DraftOfferProposalPool.prototype, "acquire").mockResolvedValue({} as OfferProposal); @@ -121,7 +121,7 @@ describe("Golem Network", () => { await glm.disconnect(); - verify(mockLeaseProcess.finalize()).once(); + verify(mockResourceRental.finalize()).once(); verify(mockPayment.createAllocation(_)).never(); verify(mockPayment.releaseAllocation(allocation)).never(); }); @@ -135,10 +135,10 @@ describe("Golem Network", () => { const draftProposal$ = new Subject(); when(mockMarket.collectDraftOfferProposals(_)).thenReturn(draftProposal$); - const mockLeasePool = mock(LeaseProcessPool); + const mockLeasePool = mock(ResourceRentalPool); when(mockLeasePool.drainAndClear()).thenResolve(); const leasePool = instance(mockLeasePool); - when(mockLease.createLeaseProcessPool(_, _, _)).thenReturn(leasePool); + when(mockRental.createResourceRentalPool(_, _, _)).thenReturn(leasePool); const glm = getGolemNetwork(); @@ -160,10 +160,10 @@ describe("Golem Network", () => { const allocation = instance(mock(Allocation)); when(mockMarket.collectDraftOfferProposals(_)).thenReturn(new Subject()); - const mockLeasePool = mock(LeaseProcessPool); + const mockLeasePool = mock(ResourceRentalPool); when(mockLeasePool.drainAndClear()).thenResolve(); const leasePool = instance(mockLeasePool); - when(mockLease.createLeaseProcessPool(_, _, _)).thenReturn(leasePool); + when(mockRental.createResourceRentalPool(_, _, _)).thenReturn(leasePool); const glm = getGolemNetwork(); await glm.connect(); diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index 5299536b9..74da45584 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -106,7 +106,7 @@ export interface GolemNetworkOptions { payment: InstanceOrFactory; activity: InstanceOrFactory; network: InstanceOrFactory; - lease: InstanceOrFactory; + rental: InstanceOrFactory; } >; } @@ -120,7 +120,7 @@ type AllocationOptions = { }; /** - * Represents the order specifications which will result in access to LeaseProcess. + * Represents the order specifications which will result in access to ResourceRental. */ export interface MarketOrderSpec { demand: BuildDemandOptions; @@ -195,7 +195,7 @@ export class GolemNetwork { /** * List af additional tasks that should be executed when the network is being shut down - * (for example finalizing lease processes created with `oneOf`) + * (for example finalizing resource rental created with `oneOf`) */ private readonly cleanupTasks: (() => Promise | void)[] = []; @@ -272,7 +272,7 @@ export class GolemNetwork { this.activity = getFactory(ActivityModuleImpl, this.options.override?.activity)(this.services); this.rental = getFactory( RentalModuleImpl, - this.options.override?.lease, + this.options.override?.rental, )({ activityModule: this.activity, paymentModule: this.payment, @@ -349,12 +349,12 @@ export class GolemNetwork { * * @example * ```ts - * const lease = await glm.oneOf(demand); - * await lease + * const rental = await glm.oneOf(demand); + * await rental * .getExeUnit() * .then((exe) => exe.run("echo Hello, Golem! 👋")) * .then((res) => console.log(res.stdout)); - * await lease.finalize(); + * await rental.finalize(); * ``` * * @param order @@ -385,7 +385,7 @@ export class GolemNetwork { ? await this.network.createNetworkNode(order.network, agreement.provider.id) : undefined; - const lease = this.rental.createResourceRental(agreement, allocation, { + const rental = this.rental.createResourceRental(agreement, allocation, { payment: order.payment, activity: order.activity, networkNode, @@ -395,9 +395,9 @@ export class GolemNetwork { proposalSubscription.unsubscribe(); this.cleanupTasks.push(async () => { - // First finalize the lease (which will wait for all payments to be processed) + // First finalize the rental (which will wait for all payments to be processed) // and only then release the allocation - await lease.finalize().catch((err) => this.logger.error("Error while finalizing lease", err)); + await rental.finalize().catch((err) => this.logger.error("Error while finalizing rental", err)); if (order.network && networkNode) { await this.network .removeNetworkNode(order.network, networkNode) @@ -412,7 +412,7 @@ export class GolemNetwork { .catch((err) => this.logger.error("Error while releasing allocation", err)); }); - return lease; + return rental; } /** @@ -421,26 +421,26 @@ export class GolemNetwork { * * @example * ```ts - * // create a pool that can grow up to 3 leases at the same time + * // create a pool that can grow up to 3 rentals at the same time * const pool = await glm.manyOf({ * concurrency: 3, * demand * }); * await Promise.allSettled([ - * pool.withLease(async (lease) => - * lease + * pool.withRental(async (rental) => + * rental * .getExeUnit() * .then((exe) => exe.run("echo Hello, Golem from the first machine! 👋")) * .then((res) => console.log(res.stdout)), * ), - * pool.withLease(async (lease) => - * lease + * pool.withRental(async (rental) => + * rental * .getExeUnit() * .then((exe) => exe.run("echo Hello, Golem from the second machine! 👋")) * .then((res) => console.log(res.stdout)), * ), - * pool.withLease(async (lease) => - * lease + * pool.withRental(async (rental) => + * rental * .getExeUnit() * .then((exe) => exe.run("echo Hello, Golem from the third machine! 👋")) * .then((res) => console.log(res.stdout)), @@ -467,7 +467,7 @@ export class GolemNetwork { }); const subscription = proposalPool.readFrom(draftProposal$); - const leaseProcessPool = this.rental.createResourceRentalPool(proposalPool, allocation, { + const resourceRentalPool = this.rental.createResourceRentalPool(proposalPool, allocation, { replicas: concurrency, network: order.network, resourceRentalOptions: { @@ -482,11 +482,11 @@ export class GolemNetwork { subscription.unsubscribe(); }); this.cleanupTasks.push(async () => { - // First drain the pool (which will wait for all leases to be paid for) + // First drain the pool (which will wait for all rentals to be paid for // and only then release the allocation - await leaseProcessPool + await resourceRentalPool .drainAndClear() - .catch((err) => this.logger.error("Error while draining lease process pool", err)); + .catch((err) => this.logger.error("Error while draining resource rental pool", err)); // Don't release the allocation if it was provided by the user if (order.payment?.allocation) { return; @@ -496,7 +496,7 @@ export class GolemNetwork { .catch((err) => this.logger.error("Error while releasing allocation", err)); }); - return leaseProcessPool; + return resourceRentalPool; } isConnected() { diff --git a/src/market/error.ts b/src/market/error.ts index 2b1ed1f71..077b958a9 100644 --- a/src/market/error.ts +++ b/src/market/error.ts @@ -10,8 +10,8 @@ export enum MarketErrorCode { ProposalResponseFailed = "ProposalResponseFailed", ProposalRejectionFailed = "ProposalRejectionFailed", DemandExpired = "DemandExpired", - ResourceRentalTerminationFailed = "LeaseProcessTerminationFailed", - ResourceRentalCreationFailed = "LeaseProcessCreationFailed", + ResourceRentalTerminationFailed = "ResourceRentalTerminationFailed", + ResourceRentalCreationFailed = "ResourceRentalCreationFailed", AgreementApprovalFailed = "AgreementApprovalFailed", NoProposalAvailable = "NoProposalAvailable", InternalError = "InternalError", From 0435ef78ed7a2cc3c7afc30593547e7685f155dc Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 12:29:02 +0200 Subject: [PATCH 03/10] feat: replace lease with rental in experimental modules --- src/experimental/deployment/builder.test.ts | 8 ++--- src/experimental/deployment/builder.ts | 16 +++++----- src/experimental/deployment/deployment.ts | 30 +++++++++---------- .../deployment/validate-deployment.ts | 2 +- src/experimental/job/job.test.ts | 12 ++++---- src/experimental/job/job.ts | 6 ++-- 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/src/experimental/deployment/builder.test.ts b/src/experimental/deployment/builder.test.ts index bf1b1d708..b7e9c31e2 100644 --- a/src/experimental/deployment/builder.test.ts +++ b/src/experimental/deployment/builder.test.ts @@ -10,7 +10,7 @@ describe("Deployment builder", () => { const builder = new GolemDeploymentBuilder(mockGolemNetwork); expect(() => { builder - .createLeaseProcessPool("my-pool", { + .createResourceRentalPool("my-pool", { demand: { workload: { imageTag: "image", @@ -32,7 +32,7 @@ describe("Deployment builder", () => { replicas: 1, }, }) - .createLeaseProcessPool("my-pool", { + .createResourceRentalPool("my-pool", { demand: { workload: { imageTag: "image", @@ -54,7 +54,7 @@ describe("Deployment builder", () => { replicas: 1, }, }); - }).toThrow(new GolemConfigError(`Lease Process Pool with name my-pool already exists`)); + }).toThrow(new GolemConfigError(`Resource Rental Pool with name my-pool already exists`)); }); it("throws an error when creating a network with the same name", () => { const builder = new GolemDeploymentBuilder(mockGolemNetwork); @@ -67,7 +67,7 @@ describe("Deployment builder", () => { expect(() => { builder .createNetwork("existing-network") - .createLeaseProcessPool("my-pool", { + .createResourceRentalPool("my-pool", { demand: { workload: { imageTag: "image", minCpuCores: 1, minMemGib: 1, minStorageGib: 1 }, }, diff --git a/src/experimental/deployment/builder.ts b/src/experimental/deployment/builder.ts index 11415bc0f..3764d7af2 100644 --- a/src/experimental/deployment/builder.ts +++ b/src/experimental/deployment/builder.ts @@ -9,31 +9,31 @@ export interface DeploymentOptions { network?: string; } -export interface CreateLeaseProcessPoolOptions extends MarketOrderSpec { +export interface CreateResourceRentalPoolOptions extends MarketOrderSpec { deployment: DeploymentOptions; } export class GolemDeploymentBuilder { private components: DeploymentComponents = { - leaseProcessPools: [], + resourceRentalPools: [], networks: [], }; public reset() { this.components = { - leaseProcessPools: [], + resourceRentalPools: [], networks: [], }; } constructor(private glm: GolemNetwork) {} - createLeaseProcessPool(name: string, options: CreateLeaseProcessPoolOptions): this { - if (this.components.leaseProcessPools.some((pool) => pool.name === name)) { - throw new GolemConfigError(`Lease Process Pool with name ${name} already exists`); + createResourceRentalPool(name: string, options: CreateResourceRentalPoolOptions): this { + if (this.components.resourceRentalPools.some((pool) => pool.name === name)) { + throw new GolemConfigError(`Resource Rental Pool with name ${name} already exists`); } - this.components.leaseProcessPools.push({ name, options }); + this.components.resourceRentalPools.push({ name, options }); return this; } @@ -57,7 +57,7 @@ export class GolemDeploymentBuilder { market: this.glm.market, activity: this.glm.activity, network: this.glm.network, - lease: this.glm.rental, + rental: this.glm.rental, }); this.reset(); diff --git a/src/experimental/deployment/deployment.ts b/src/experimental/deployment/deployment.ts index c585c1694..66e0b35af 100644 --- a/src/experimental/deployment/deployment.ts +++ b/src/experimental/deployment/deployment.ts @@ -6,7 +6,7 @@ import { Network, NetworkModule, NetworkOptions } from "../../network"; import { validateDeployment } from "./validate-deployment"; import { DraftOfferProposalPool, MarketModule } from "../../market"; import { PaymentModule } from "../../payment"; -import { CreateLeaseProcessPoolOptions } from "./builder"; +import { CreateResourceRentalPoolOptions } from "./builder"; import { Subscription } from "rxjs"; import { RentalModule, ResourceRentalPool } from "../../resource-rental"; @@ -43,7 +43,7 @@ export interface DeploymentEvents { } export type DeploymentComponents = { - leaseProcessPools: { name: string; options: CreateLeaseProcessPoolOptions }[]; + resourceRentalPools: { name: string; options: CreateResourceRentalPoolOptions }[]; networks: { name: string; options: NetworkOptions }[]; }; @@ -65,7 +65,7 @@ export class Deployment { { proposalPool: DraftOfferProposalPool; proposalSubscription: Subscription; - leaseProcessPool: ResourceRentalPool; + resourceRentalPool: ResourceRentalPool; } >(); @@ -76,7 +76,7 @@ export class Deployment { activity: ActivityModule; payment: PaymentModule; network: NetworkModule; - lease: RentalModule; + rental: RentalModule; }; constructor( @@ -88,7 +88,7 @@ export class Deployment { activity: ActivityModule; payment: PaymentModule; network: NetworkModule; - lease: RentalModule; + rental: RentalModule; }, ) { validateDeployment(components); @@ -129,8 +129,8 @@ export class Deployment { // Allocation is re-used for all demands so the expiration date should // be the equal to the longest expiration date of all demands const longestExpiration = - Math.max(...this.components.leaseProcessPools.map((pool) => pool.options.market.rentHours)) * 3600; - const totalBudget = this.components.leaseProcessPools.reduce( + Math.max(...this.components.resourceRentalPools.map((pool) => pool.options.market.rentHours)) * 3600; + const totalBudget = this.components.resourceRentalPools.reduce( (acc, pool) => acc + this.modules.market.estimateBudget({ @@ -145,7 +145,7 @@ export class Deployment { expirationSec: longestExpiration, }); - for (const pool of this.components.leaseProcessPools) { + for (const pool of this.components.resourceRentalPools) { const network = pool.options?.deployment?.network ? this.networks.get(pool.options?.deployment.network) : undefined; @@ -165,7 +165,7 @@ export class Deployment { const proposalSubscription = proposalPool.readFrom(draftProposal$); - const leaseProcessPool = this.modules.lease.createResourceRentalPool(proposalPool, allocation, { + const resourceRentalPool = this.modules.rental.createResourceRentalPool(proposalPool, allocation, { replicas: pool.options.deployment?.replicas, network, resourceRentalOptions: { @@ -179,7 +179,7 @@ export class Deployment { this.pools.set(pool.name, { proposalPool, proposalSubscription, - leaseProcessPool, + resourceRentalPool, }); } @@ -200,7 +200,7 @@ export class Deployment { this.abortController.abort(); const stopPools = Array.from(this.pools.values()).map((pool) => - Promise.allSettled([pool.proposalSubscription.unsubscribe(), pool.leaseProcessPool.drainAndClear()]), + Promise.allSettled([pool.proposalSubscription.unsubscribe(), pool.resourceRentalPool.drainAndClear()]), ); await Promise.allSettled(stopPools); @@ -219,12 +219,12 @@ export class Deployment { this.events.emit("end"); } - getLeaseProcessPool(name: string): ResourceRentalPool { + getResourceRentalPool(name: string): ResourceRentalPool { const pool = this.pools.get(name); if (!pool) { - throw new GolemUserError(`LeaseProcessPool ${name} not found`); + throw new GolemUserError(`ResourceRentalPool ${name} not found`); } - return pool.leaseProcessPool; + return pool.resourceRentalPool; } getNetwork(name: string): Network { @@ -237,7 +237,7 @@ export class Deployment { private async waitForDeployment() { this.logger.info("Waiting for all components to be deployed..."); - const readyPools = [...this.pools.values()].map((component) => component.leaseProcessPool.ready()); + const readyPools = [...this.pools.values()].map((component) => component.resourceRentalPool.ready()); await Promise.all(readyPools); this.logger.info("Components deployed and ready to use"); } diff --git a/src/experimental/deployment/validate-deployment.ts b/src/experimental/deployment/validate-deployment.ts index 90b6b7ba1..43c81e61a 100644 --- a/src/experimental/deployment/validate-deployment.ts +++ b/src/experimental/deployment/validate-deployment.ts @@ -3,7 +3,7 @@ import { DeploymentComponents } from "./deployment"; function validateNetworks(components: DeploymentComponents) { const networkNames = new Set(components.networks.map((network) => network.name)); - for (const pool of components.leaseProcessPools) { + for (const pool of components.resourceRentalPools) { if (!pool.options.deployment?.network) { continue; } diff --git a/src/experimental/job/job.test.ts b/src/experimental/job/job.test.ts index db7032985..1850217c3 100644 --- a/src/experimental/job/job.test.ts +++ b/src/experimental/job/job.test.ts @@ -3,22 +3,22 @@ import { WorkContext } from "../../activity/work"; import { anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; import { Logger } from "../../shared/utils"; import { GolemNetwork } from "../../golem-network"; -import { LeaseProcess } from "../../lease-process"; +import { ResourceRental } from "../../resource-rental"; const mockGlm = mock(GolemNetwork); -const mockLease = mock(LeaseProcess); +const mockRental = mock(ResourceRental); const mockWorkContext = mock(WorkContext); describe("Job", () => { beforeEach(() => { reset(mockGlm); - reset(mockLease); + reset(mockRental); reset(mockWorkContext); }); describe("cancel()", () => { it("stops the activity and releases the agreement when canceled", async () => { - when(mockLease.getExeUnit()).thenResolve(instance(mockWorkContext)); - when(mockGlm.oneOf(anything())).thenResolve(instance(mockLease)); + when(mockRental.getExeUnit()).thenResolve(instance(mockWorkContext)); + when(mockGlm.oneOf(anything())).thenResolve(instance(mockRental)); const job = new Job( "test_id", instance(mockGlm), @@ -49,7 +49,7 @@ describe("Job", () => { await expect(job.waitForResult()).rejects.toThrow("Canceled"); - verify(mockLease.finalize()).once(); + verify(mockRental.finalize()).once(); }); }); }); diff --git a/src/experimental/job/job.ts b/src/experimental/job/job.ts index 3387d5c6c..8ac098731 100644 --- a/src/experimental/job/job.ts +++ b/src/experimental/job/job.ts @@ -132,13 +132,13 @@ export class Job { throw new GolemAbortError("Canceled"); } - const lease = await this.glm.oneOf(this.order); + const rental = await this.glm.oneOf(this.order); - const workContext = await lease.getExeUnit(); + const workContext = await rental.getExeUnit(); this.events.emit("started"); const onAbort = async () => { - await lease.finalize(); + await rental.finalize(); this.events.emit("canceled"); }; From 2840bc74e96f930ec8fd0ae811758007bd6e94b1 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 12:33:19 +0200 Subject: [PATCH 04/10] chore: rename lease to rental in tests and comments --- src/golem-network/golem-network.test.ts | 42 ++++++++++++------------- src/resource-rental/rental.module.ts | 10 +++--- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/golem-network/golem-network.test.ts b/src/golem-network/golem-network.test.ts index 9641faae2..acc00a981 100644 --- a/src/golem-network/golem-network.test.ts +++ b/src/golem-network/golem-network.test.ts @@ -69,12 +69,12 @@ function getGolemNetwork() { describe("Golem Network", () => { describe("oneOf()", () => { - it("should create a lease and clean it up when disconnected", async () => { + it("should create a rental and clean it up when disconnected", async () => { const mockResourceRental = mock(ResourceRental); - const testProcess = instance(mockResourceRental); + const mockResourceRentalInstance = instance(mockResourceRental); when(mockResourceRental.finalize()).thenResolve(); - when(mockRental.createResourceRental(_, _, _)).thenReturn(testProcess); + when(mockRental.createResourceRental(_, _, _)).thenReturn(mockResourceRentalInstance); const draftProposal$ = new Subject(); when(mockMarket.collectDraftOfferProposals(_)).thenReturn(draftProposal$); @@ -87,9 +87,9 @@ describe("Golem Network", () => { const glm = getGolemNetwork(); await glm.connect(); - const lease = await glm.oneOf(order); + const rental = await glm.oneOf(order); - expect(lease).toBe(testProcess); + expect(rental).toBe(mockResourceRentalInstance); await glm.disconnect(); @@ -100,9 +100,9 @@ describe("Golem Network", () => { const allocation = instance(mock(Allocation)); const mockResourceRental = mock(ResourceRental); - const testProcess = instance(mockResourceRental); + const mockResourceRentalInstance = instance(mockResourceRental); when(mockResourceRental.finalize()).thenResolve(); - when(mockRental.createResourceRental(_, _, _)).thenReturn(testProcess); + when(mockRental.createResourceRental(_, _, _)).thenReturn(mockResourceRentalInstance); when(mockMarket.collectDraftOfferProposals(_)).thenReturn(new Subject()); jest.spyOn(DraftOfferProposalPool.prototype, "acquire").mockResolvedValue({} as OfferProposal); @@ -110,14 +110,14 @@ describe("Golem Network", () => { const glm = getGolemNetwork(); await glm.connect(); - const lease = await glm.oneOf({ + const rental = await glm.oneOf({ ...order, payment: { allocation, }, }); - expect(lease).toBe(testProcess); + expect(rental).toBe(mockResourceRentalInstance); await glm.disconnect(); @@ -135,10 +135,10 @@ describe("Golem Network", () => { const draftProposal$ = new Subject(); when(mockMarket.collectDraftOfferProposals(_)).thenReturn(draftProposal$); - const mockLeasePool = mock(ResourceRentalPool); - when(mockLeasePool.drainAndClear()).thenResolve(); - const leasePool = instance(mockLeasePool); - when(mockRental.createResourceRentalPool(_, _, _)).thenReturn(leasePool); + const mockRentalPool = mock(ResourceRentalPool); + when(mockRentalPool.drainAndClear()).thenResolve(); + const rentalPool = instance(mockRentalPool); + when(mockRental.createResourceRentalPool(_, _, _)).thenReturn(rentalPool); const glm = getGolemNetwork(); @@ -149,21 +149,21 @@ describe("Golem Network", () => { order, }); - expect(pool).toBe(leasePool); + expect(pool).toBe(rentalPool); await glm.disconnect(); - verify(mockLeasePool.drainAndClear()).once(); + verify(mockRentalPool.drainAndClear()).once(); verify(mockPayment.releaseAllocation(allocation)).once(); }); it("should not release the allocation if it was provided by the user", async () => { const allocation = instance(mock(Allocation)); when(mockMarket.collectDraftOfferProposals(_)).thenReturn(new Subject()); - const mockLeasePool = mock(ResourceRentalPool); - when(mockLeasePool.drainAndClear()).thenResolve(); - const leasePool = instance(mockLeasePool); - when(mockRental.createResourceRentalPool(_, _, _)).thenReturn(leasePool); + const mockRentalPool = mock(ResourceRentalPool); + when(mockRentalPool.drainAndClear()).thenResolve(); + const rentalPool = instance(mockRentalPool); + when(mockRental.createResourceRentalPool(_, _, _)).thenReturn(rentalPool); const glm = getGolemNetwork(); await glm.connect(); @@ -178,9 +178,9 @@ describe("Golem Network", () => { }, }); - expect(pool).toBe(leasePool); + expect(pool).toBe(rentalPool); await glm.disconnect(); - verify(mockLeasePool.drainAndClear()).once(); + verify(mockRentalPool.drainAndClear()).once(); verify(mockPayment.createAllocation(_)).never(); verify(mockPayment.releaseAllocation(allocation)).never(); }); diff --git a/src/resource-rental/rental.module.ts b/src/resource-rental/rental.module.ts index 91d04acb1..7bdf9b4d2 100644 --- a/src/resource-rental/rental.module.ts +++ b/src/resource-rental/rental.module.ts @@ -9,13 +9,13 @@ import { ResourceRentalPool, ResourceRentalPoolOptions } from "./resource-rental export interface RentalModule { /** - * Factory that creates a new lease process that's fully configured. + * Factory that creates a new resource rental that's fully configured. * This method will also create the payment process for the agreement. * */ createResourceRental(agreement: Agreement, allocation: Allocation, options?: ResourceRentalOptions): ResourceRental; /** - * Factory that creates new lease process pool that's fully configured + * Factory that creates new resource rental pool that's fully configured */ createResourceRentalPool( draftPool: DraftOfferProposalPool, @@ -42,16 +42,16 @@ export class RentalModuleImpl implements RentalModule { allocation, options?.payment, ); - const lease = new ResourceRental( + const rental = new ResourceRental( agreement, this.deps.storageProvider, paymentProcess, this.deps.marketModule, this.deps.activityModule, - this.deps.logger.child("lease-process"), + this.deps.logger.child("resource-rental"), options, ); - return lease; + return rental; } public createResourceRentalPool( From be756a72109aaaddec13d6a21524b7f879764bb0 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 12:42:46 +0200 Subject: [PATCH 05/10] docs: rename lease to rental in examples --- examples/advanced/local-image/serveLocalGvmi.ts | 4 ++-- examples/advanced/manual-pools.ts | 16 ++++++++-------- examples/advanced/override-module.ts | 6 +++--- examples/advanced/payment-filters.ts | 6 +++--- examples/advanced/proposal-filter.ts | 6 +++--- examples/advanced/proposal-predefined-filter.ts | 6 +++--- examples/advanced/proposal-selector.ts | 6 +++--- examples/advanced/reuse-allocation.ts | 16 ++++++++-------- examples/basic/events.ts | 6 +++--- examples/basic/many-of.ts | 16 ++++++++-------- examples/basic/one-of.ts | 6 +++--- examples/basic/run-and-stream.ts | 6 +++--- examples/basic/transfer.ts | 12 ++++++------ examples/basic/vpn.ts | 14 +++++++------- examples/experimental/deployment/new-api.ts | 12 ++++-------- examples/web/hello.html | 10 +++++----- 16 files changed, 72 insertions(+), 76 deletions(-) diff --git a/examples/advanced/local-image/serveLocalGvmi.ts b/examples/advanced/local-image/serveLocalGvmi.ts index 29cce32c1..45de3cc64 100644 --- a/examples/advanced/local-image/serveLocalGvmi.ts +++ b/examples/advanced/local-image/serveLocalGvmi.ts @@ -39,9 +39,9 @@ const getImagePath = (path: string) => new URL(path, import.meta.url).toString() }, }; - const lease = await glm.oneOf(order); + const rental = await glm.oneOf(order); // in our Dockerfile we have created a file called hello.txt, let's read it - const result = await lease + const result = await rental .getExeUnit() .then((exe) => exe.run("cat hello.txt")) .then((res) => res.stdout); diff --git a/examples/advanced/manual-pools.ts b/examples/advanced/manual-pools.ts index 490bbde41..6883e5897 100644 --- a/examples/advanced/manual-pools.ts +++ b/examples/advanced/manual-pools.ts @@ -55,29 +55,29 @@ const demandOptions = { market: glm.market, activity: glm.activity, payment: glm.payment, - lease: glm.lease, + rental: glm.rental, }; - const pool = depModules.lease.createLeaseProcessPool(proposalPool, allocation, { + const pool = depModules.rental.createResourceRentalPool(proposalPool, allocation, { replicas: { max: CONCURRENCY }, }); - const lease = await pool.acquire(); - const lease2 = await pool.acquire(); + const rental1 = await pool.acquire(); + const rental2 = await pool.acquire(); await Promise.allSettled([ - lease + rental1 .getExeUnit() .then((exe) => exe.run("echo Hello from first activity 👋")) .then((result) => console.log(result.stdout)), - lease2 + rental2 .getExeUnit() .then((exe) => exe.run("echo Hello from second activity 👋")) .then((result) => console.log(result.stdout)), ]); - await pool.release(lease); - await pool.release(lease2); + await pool.release(rental1); + await pool.release(rental2); proposalSubscription.unsubscribe(); await pool.drainAndClear(); diff --git a/examples/advanced/override-module.ts b/examples/advanced/override-module.ts index df49d26ba..bee1c3cbb 100644 --- a/examples/advanced/override-module.ts +++ b/examples/advanced/override-module.ts @@ -58,12 +58,12 @@ const order: MarketOrderSpec = { try { await glm.connect(); - const lease = await glm.oneOf(order); - await lease + const rental = await glm.oneOf(order); + await rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await lease.finalize(); + await rental.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/payment-filters.ts b/examples/advanced/payment-filters.ts index adcb8254a..d2fae4a6f 100644 --- a/examples/advanced/payment-filters.ts +++ b/examples/advanced/payment-filters.ts @@ -61,12 +61,12 @@ const order: MarketOrderSpec = { try { await glm.connect(); - const lease = await glm.oneOf(order); - await lease + const rental = await glm.oneOf(order); + await rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await lease.finalize(); + await rental.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/proposal-filter.ts b/examples/advanced/proposal-filter.ts index 5278af3c6..ab2457f85 100644 --- a/examples/advanced/proposal-filter.ts +++ b/examples/advanced/proposal-filter.ts @@ -35,12 +35,12 @@ const order: MarketOrderSpec = { try { await glm.connect(); - const lease = await glm.oneOf(order); - await lease + const rental = await glm.oneOf(order); + await rental .getExeUnit() .then((exe) => exe.run(`echo [provider:${exe.provider.name}] Hello, Golem! 👋`)) .then((res) => console.log(res.stdout)); - await lease.finalize(); + await rental.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/proposal-predefined-filter.ts b/examples/advanced/proposal-predefined-filter.ts index b584393e4..439b179f5 100644 --- a/examples/advanced/proposal-predefined-filter.ts +++ b/examples/advanced/proposal-predefined-filter.ts @@ -33,12 +33,12 @@ const order: MarketOrderSpec = { try { await glm.connect(); - const lease = await glm.oneOf(order); - await lease + const rental = await glm.oneOf(order); + await rental .getExeUnit() .then((exe) => exe.run(`echo [provider:${exe.provider.name}] Hello, Golem! 👋`)) .then((res) => console.log(res.stdout)); - await lease.finalize(); + await rental.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/proposal-selector.ts b/examples/advanced/proposal-selector.ts index 32c0da7b0..0b5000e20 100644 --- a/examples/advanced/proposal-selector.ts +++ b/examples/advanced/proposal-selector.ts @@ -41,12 +41,12 @@ const order: MarketOrderSpec = { try { await glm.connect(); - const lease = await glm.oneOf(order); - await lease + const rental = await glm.oneOf(order); + await rental .getExeUnit() .then((exe) => exe.run(`echo [provider:${exe.provider.name}] Hello, Golem! 👋`)) .then((res) => console.log(res.stdout)); - await lease.finalize(); + await rental.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/reuse-allocation.ts b/examples/advanced/reuse-allocation.ts index 5e1e99fa0..50154dba4 100644 --- a/examples/advanced/reuse-allocation.ts +++ b/examples/advanced/reuse-allocation.ts @@ -52,20 +52,20 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; }, }; - const lease1 = await glm.oneOf(firstOrder); - const lease2 = await glm.oneOf(secondOrder); + const rental1 = await glm.oneOf(firstOrder); + const rental2 = await glm.oneOf(secondOrder); - await lease1 + await rental1 .getExeUnit() - .then((exe) => exe.run("echo Running on first lease")) + .then((exe) => exe.run("echo Running on first rental")) .then((res) => console.log(res.stdout)); - await lease2 + await rental2 .getExeUnit() - .then((exe) => exe.run("echo Running on second lease")) + .then((exe) => exe.run("echo Running on second rental")) .then((res) => console.log(res.stdout)); - await lease1.finalize(); - await lease2.finalize(); + await rental1.finalize(); + await rental2.finalize(); await glm.payment.releaseAllocation(allocation); } catch (err) { console.error("Failed to run the example", err); diff --git a/examples/basic/events.ts b/examples/basic/events.ts index cde2b2b6d..7c3d99c77 100644 --- a/examples/basic/events.ts +++ b/examples/basic/events.ts @@ -35,7 +35,7 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; console.warn("Proposal rejected by provider", event); }); - const lease = await glm.oneOf({ + const rental = await glm.oneOf({ demand: { workload: { imageTag: "golem/alpine:latest" }, }, @@ -50,12 +50,12 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; }, }); - await lease + await rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await lease.finalize(); + await rental.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/basic/many-of.ts b/examples/basic/many-of.ts index 7b9f855fe..2c7e39ef7 100644 --- a/examples/basic/many-of.ts +++ b/examples/basic/many-of.ts @@ -1,5 +1,5 @@ /** - * This example demonstrates how easily lease multiple machines at once. + * This example demonstrates how easily rent multiple machines at once. */ import { GolemNetwork, MarketOrderSpec } from "@golem-sdk/golem-js"; @@ -29,26 +29,26 @@ const order: MarketOrderSpec = { try { await glm.connect(); - // create a pool that can grow up to 3 leases at the same time + // create a pool that can grow up to 3 rentals at the same time const pool = await glm.manyOf({ concurrency: 3, order, }); await Promise.allSettled([ - pool.withLease(async (lease) => - lease + pool.withRental(async (rental) => + rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem from the first machine! 👋")) .then((res) => console.log(res.stdout)), ), - pool.withLease(async (lease) => - lease + pool.withRental(async (rental) => + rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem from the second machine! 👋")) .then((res) => console.log(res.stdout)), ), - pool.withLease(async (lease) => - lease + pool.withRental(async (rental) => + rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem from the third machine! 👋")) .then((res) => console.log(res.stdout)), diff --git a/examples/basic/one-of.ts b/examples/basic/one-of.ts index b53fe2716..88ec22123 100644 --- a/examples/basic/one-of.ts +++ b/examples/basic/one-of.ts @@ -25,12 +25,12 @@ const order: MarketOrderSpec = { try { await glm.connect(); - const lease = await glm.oneOf(order); - await lease + const rental = await glm.oneOf(order); + await rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await lease.finalize(); + await rental.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/basic/run-and-stream.ts b/examples/basic/run-and-stream.ts index 546f08aa1..a3b0582d8 100644 --- a/examples/basic/run-and-stream.ts +++ b/examples/basic/run-and-stream.ts @@ -30,8 +30,8 @@ const order: MarketOrderSpec = { try { await glm.connect(); - const lease = await glm.oneOf(order); - const exe = await lease.getExeUnit(); + const rental = await glm.oneOf(order); + const exe = await rental.getExeUnit(); const remoteProcess = await exe.runAndStream( ` @@ -50,7 +50,7 @@ const order: MarketOrderSpec = { remoteProcess.stderr.on("data", (data) => console.error("stderr>", data)); await remoteProcess.waitForExit(); - await lease.finalize(); + await rental.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/basic/transfer.ts b/examples/basic/transfer.ts index c0b005879..ff851b3c3 100644 --- a/examples/basic/transfer.ts +++ b/examples/basic/transfer.ts @@ -30,11 +30,11 @@ const order: MarketOrderSpec = { concurrency: 2, order, }); - const lease1 = await pool.acquire(); - const lease2 = await pool.acquire(); + const rental1 = await pool.acquire(); + const rental2 = await pool.acquire(); - const exe1 = await lease1.getExeUnit(); - const exe2 = await lease2.getExeUnit(); + const exe1 = await rental1.getExeUnit(); + const exe2 = await rental2.getExeUnit(); await exe1 .beginBatch() @@ -54,8 +54,8 @@ const order: MarketOrderSpec = { console.log("File content: "); console.log(await readFile("./results.txt", { encoding: "utf-8" })); - await lease1.finalize(); - await lease2.finalize(); + await rental1.finalize(); + await rental2.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/basic/vpn.ts b/examples/basic/vpn.ts index d277aced8..b4381a411 100644 --- a/examples/basic/vpn.ts +++ b/examples/basic/vpn.ts @@ -26,23 +26,23 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; }, network, }; - // create a pool that can grow up to 2 leases at the same time + // create a pool that can grow up to 2 rentals at the same time const pool = await glm.manyOf({ concurrency: 2, order, }); - const lease1 = await pool.acquire(); - const lease2 = await pool.acquire(); - const exe1 = await lease1.getExeUnit(); - const exe2 = await lease2.getExeUnit(); + const rental1 = await pool.acquire(); + const rental2 = await pool.acquire(); + const exe1 = await rental1.getExeUnit(); + const exe2 = await rental2.getExeUnit(); await exe1 .run(`ping ${exe2.getIp()} -c 4`) .then((res) => console.log(`Response from provider: ${exe1.provider.name} (ip: ${exe1.getIp()})`, res.stdout)); await exe2 .run(`ping ${exe1.getIp()} -c 4`) .then((res) => console.log(`Response from provider: ${exe2.provider.name} (ip: ${exe2.getIp()})`, res.stdout)); - await pool.destroy(lease1); - await pool.destroy(lease2); + await pool.destroy(rental1); + await pool.destroy(rental2); await glm.destroyNetwork(network); } catch (err) { diff --git a/examples/experimental/deployment/new-api.ts b/examples/experimental/deployment/new-api.ts index 42ff943db..607e049bd 100644 --- a/examples/experimental/deployment/new-api.ts +++ b/examples/experimental/deployment/new-api.ts @@ -18,7 +18,7 @@ async function main() { .createNetwork("basic", { ip: "192.168.7.0/24", }) - .createLeaseProcessPool("app", { + .createResourceRentalPool("app", { demand: { workload: { imageTag: "golem/node:latest", @@ -32,17 +32,13 @@ async function main() { maxCpuPerHourPrice: 1, maxEnvPerHourPrice: 1, }, - withProviders: ["0x123123"], - withoutProviders: ["0x123123"], - withOperators: ["0x123123"], - withoutOperators: ["0x123123"], }, deployment: { replicas: 2, network: "basic", }, }) - .createLeaseProcessPool("db", { + .createResourceRentalPool("db", { demand: { workload: { imageTag: "golem/alpine:latest", @@ -72,8 +68,8 @@ async function main() { await deployment.start(); // Get your pool of activities for specified need - const appPool = deployment.getLeaseProcessPool("app"); - const dbPool = deployment.getLeaseProcessPool("db"); + const appPool = deployment.getResourceRentalPool("app"); + const dbPool = deployment.getResourceRentalPool("db"); // Get an instance out of the pool for use const appReplica1 = await appPool.acquire(); diff --git a/examples/web/hello.html b/examples/web/hello.html index 557cdbf93..57adfc71d 100644 --- a/examples/web/hello.html +++ b/examples/web/hello.html @@ -113,15 +113,15 @@

Results

try { appendResults("Establishing a connection to the Golem Network"); await glm.connect(); - appendResults("Request for leasing a provider machine"); - const lease = await glm.oneOf(order); - await lease + appendResults("Request for renting a provider machine"); + const rental = await glm.oneOf(order); + await rental .getExeUnit() .then(async (exe) => appendResults("Reply: " + (await exe.run(`echo 'Hello Golem! 👋 from ${exe.provider.name}!'`)).stdout), ); - await lease.finalize(); - appendResults("Finalized leasing process"); + await rental.finalize(); + appendResults("Finalized renting process"); } catch (err) { console.error("Failed to run the example", err); } finally { From 04a1de0c5764e7bce6f4f1d3ce94f0861951fdc5 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 12:49:25 +0200 Subject: [PATCH 06/10] docs: rename lease to rental in README --- README.md | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index e177d282f..a2f24b04f 100644 --- a/README.md +++ b/README.md @@ -108,7 +108,7 @@ yagna payment fund --network holesky yagna payment status --network holesky ``` -#### Obtain an `app-key` to use with SDK +### Obtain an `app-key` to use with SDK If you don't have any app-keys available from `yagna app-key list`, go ahead and create one with the command below. You will need this key in order to communicate with `yagna` from your application. You can set it @@ -147,13 +147,13 @@ const order: MarketOrderSpec = { try { await glm.connect(); - // Lease a machine - const lease = await glm.oneOf(order); - await lease + // Rent a machine + const rental = await glm.oneOf(order); + await rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await lease.finalize(); + await rental.finalize(); } catch (err) { console.error("Failed to run the example", err); } finally { @@ -188,27 +188,27 @@ const order: MarketOrderSpec = { try { await glm.connect(); - // create a pool that can grow up to 3 leases at the same time + // create a pool that can grow up to 3 rentals at the same time const pool = await glm.manyOf({ concurrency: 3, order, }); // run 3 tasks in parallel on 3 different machines await Promise.allSettled([ - pool.withLease(async (lease) => - lease + pool.withRental(async (rental) => + rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem from the first machine! 👋")) .then((res) => console.log(res.stdout)), ), - pool.withLease(async (lease) => - lease + pool.withRental(async (rental) => + rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem from the second machine! 👋")) .then((res) => console.log(res.stdout)), ), - pool.withLease(async (lease) => - lease + pool.withRental(async (rental) => + rental .getExeUnit() .then((exe) => exe.run("echo Hello, Golem from the third machine! 👋")) .then((res) => console.log(res.stdout)), @@ -270,8 +270,8 @@ securely between the nodes. ```ts const network = await glm.createNetwork({ ip: "192.168.7.0/24" }); // ... -const exe1 = await lease1.getExeUnit(); -const exe2 = await lease2.getExeUnit(); +const exe1 = await rental1.getExeUnit(); +const exe2 = await rental2.getExeUnit(); await exe1 .run(`ping ${exe2.getIp()} -c 4`) .then((res) => console.log(`Response from provider: ${exe1.provider.name} (ip: ${exe1.getIp()})`, res.stdout)); From de63aa768dbe7744e293babeb5c20967deb0e134 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 12:59:48 +0200 Subject: [PATCH 07/10] test: rename lease process pool test to resource rental pool --- .../e2e/{leaseProcessPool.spec.ts => resourceRentalPool.spec.ts} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename tests/e2e/{leaseProcessPool.spec.ts => resourceRentalPool.spec.ts} (100%) diff --git a/tests/e2e/leaseProcessPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts similarity index 100% rename from tests/e2e/leaseProcessPool.spec.ts rename to tests/e2e/resourceRentalPool.spec.ts From 8e1eb668ec89fb9b7652a5eebcc2505555d5bc31 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 13:03:40 +0200 Subject: [PATCH 08/10] chore: rename lease to rental in newly created test --- tests/e2e/resourceRentalPool.spec.ts | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/e2e/resourceRentalPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts index 3f529f4a9..a5d70963c 100644 --- a/tests/e2e/resourceRentalPool.spec.ts +++ b/tests/e2e/resourceRentalPool.spec.ts @@ -177,26 +177,26 @@ describe("ResourceRentalPool", () => { expect(Math.max(...poolSizesDuringWork)).toEqual(maxPoolSize); }); - it("should abort acquiring lease process by signal", async () => { + it("should abort acquiring resource rental by signal", async () => { const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); - const abortControler = new AbortController(); - abortControler.abort(); - await expect(pool.acquire(abortControler.signal)).rejects.toThrow("The signing of the agreement has been aborted"); + const abortController = new AbortController(); + abortController.abort(); + await expect(pool.acquire(abortController.signal)).rejects.toThrow("The signing of the agreement has been aborted"); }); - it("should abort acquiring lease process by timeout", async () => { + it("should abort acquiring resource rental by timeout", async () => { const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); await expect(pool.acquire(1_000)).rejects.toThrow("Could not sign any agreement in time"); }); - it("should finalize the lease process during execution", async () => { + it("should finalize the resource rental during execution", async () => { expect.assertions(1); const pool = glm.rental.createResourceRentalPool(proposalPool, allocation, { replicas: 1 }); - const leaseProcess = await pool.acquire(); - const exe = await leaseProcess.getExeUnit(); + const resourceRental = await pool.acquire(); + const exe = await resourceRental.getExeUnit(); return new Promise(async (res) => { - leaseProcess.events.on("finalized", async () => res(true)); - setTimeout(() => leaseProcess.finalize(), 8_000); + resourceRental.events.on("finalized", async () => res(true)); + setTimeout(() => resourceRental.finalize(), 8_000); await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( new GolemAbortError("Execution of script has been aborted"), ); From 76fe8e5f1082d4b45bfb85cb81e1ff80d6fac1fa Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 13:10:18 +0200 Subject: [PATCH 09/10] feat: rename finalize to stopAndFinalize --- README.md | 2 +- examples/advanced/override-module.ts | 2 +- examples/advanced/payment-filters.ts | 2 +- examples/advanced/proposal-filter.ts | 2 +- examples/advanced/proposal-predefined-filter.ts | 2 +- examples/advanced/proposal-selector.ts | 2 +- examples/advanced/reuse-allocation.ts | 4 ++-- examples/basic/events.ts | 2 +- examples/basic/one-of.ts | 2 +- examples/basic/run-and-stream.ts | 2 +- examples/basic/transfer.ts | 4 ++-- examples/web/hello.html | 2 +- src/experimental/job/job.test.ts | 2 +- src/experimental/job/job.ts | 2 +- src/golem-network/golem-network.test.ts | 8 ++++---- src/golem-network/golem-network.ts | 4 ++-- src/resource-rental/resource-rental-pool.ts | 2 +- src/resource-rental/resource-rental.ts | 3 ++- tests/e2e/resourceRentalPool.spec.ts | 2 +- 19 files changed, 26 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index a2f24b04f..9d471b76f 100644 --- a/README.md +++ b/README.md @@ -153,7 +153,7 @@ const order: MarketOrderSpec = { .getExeUnit() .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await rental.finalize(); + await rental.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/override-module.ts b/examples/advanced/override-module.ts index bee1c3cbb..8e7c5fa2d 100644 --- a/examples/advanced/override-module.ts +++ b/examples/advanced/override-module.ts @@ -63,7 +63,7 @@ const order: MarketOrderSpec = { .getExeUnit() .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await rental.finalize(); + await rental.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/payment-filters.ts b/examples/advanced/payment-filters.ts index d2fae4a6f..8d5e11e1c 100644 --- a/examples/advanced/payment-filters.ts +++ b/examples/advanced/payment-filters.ts @@ -66,7 +66,7 @@ const order: MarketOrderSpec = { .getExeUnit() .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await rental.finalize(); + await rental.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/proposal-filter.ts b/examples/advanced/proposal-filter.ts index ab2457f85..226ab980c 100644 --- a/examples/advanced/proposal-filter.ts +++ b/examples/advanced/proposal-filter.ts @@ -40,7 +40,7 @@ const order: MarketOrderSpec = { .getExeUnit() .then((exe) => exe.run(`echo [provider:${exe.provider.name}] Hello, Golem! 👋`)) .then((res) => console.log(res.stdout)); - await rental.finalize(); + await rental.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/proposal-predefined-filter.ts b/examples/advanced/proposal-predefined-filter.ts index 439b179f5..d3d6414fd 100644 --- a/examples/advanced/proposal-predefined-filter.ts +++ b/examples/advanced/proposal-predefined-filter.ts @@ -38,7 +38,7 @@ const order: MarketOrderSpec = { .getExeUnit() .then((exe) => exe.run(`echo [provider:${exe.provider.name}] Hello, Golem! 👋`)) .then((res) => console.log(res.stdout)); - await rental.finalize(); + await rental.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/proposal-selector.ts b/examples/advanced/proposal-selector.ts index 0b5000e20..23e4878f1 100644 --- a/examples/advanced/proposal-selector.ts +++ b/examples/advanced/proposal-selector.ts @@ -46,7 +46,7 @@ const order: MarketOrderSpec = { .getExeUnit() .then((exe) => exe.run(`echo [provider:${exe.provider.name}] Hello, Golem! 👋`)) .then((res) => console.log(res.stdout)); - await rental.finalize(); + await rental.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/advanced/reuse-allocation.ts b/examples/advanced/reuse-allocation.ts index 50154dba4..97a463ff6 100644 --- a/examples/advanced/reuse-allocation.ts +++ b/examples/advanced/reuse-allocation.ts @@ -64,8 +64,8 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; .then((exe) => exe.run("echo Running on second rental")) .then((res) => console.log(res.stdout)); - await rental1.finalize(); - await rental2.finalize(); + await rental1.stopAndFinalize(); + await rental2.stopAndFinalize(); await glm.payment.releaseAllocation(allocation); } catch (err) { console.error("Failed to run the example", err); diff --git a/examples/basic/events.ts b/examples/basic/events.ts index 7c3d99c77..3a6291d09 100644 --- a/examples/basic/events.ts +++ b/examples/basic/events.ts @@ -55,7 +55,7 @@ import { pinoPrettyLogger } from "@golem-sdk/pino-logger"; .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await rental.finalize(); + await rental.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/basic/one-of.ts b/examples/basic/one-of.ts index 88ec22123..c22007c08 100644 --- a/examples/basic/one-of.ts +++ b/examples/basic/one-of.ts @@ -30,7 +30,7 @@ const order: MarketOrderSpec = { .getExeUnit() .then((exe) => exe.run("echo Hello, Golem! 👋")) .then((res) => console.log(res.stdout)); - await rental.finalize(); + await rental.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/basic/run-and-stream.ts b/examples/basic/run-and-stream.ts index a3b0582d8..efcb19408 100644 --- a/examples/basic/run-and-stream.ts +++ b/examples/basic/run-and-stream.ts @@ -50,7 +50,7 @@ const order: MarketOrderSpec = { remoteProcess.stderr.on("data", (data) => console.error("stderr>", data)); await remoteProcess.waitForExit(); - await rental.finalize(); + await rental.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/basic/transfer.ts b/examples/basic/transfer.ts index ff851b3c3..890a46f53 100644 --- a/examples/basic/transfer.ts +++ b/examples/basic/transfer.ts @@ -54,8 +54,8 @@ const order: MarketOrderSpec = { console.log("File content: "); console.log(await readFile("./results.txt", { encoding: "utf-8" })); - await rental1.finalize(); - await rental2.finalize(); + await rental1.stopAndFinalize(); + await rental2.stopAndFinalize(); } catch (err) { console.error("Failed to run the example", err); } finally { diff --git a/examples/web/hello.html b/examples/web/hello.html index 57adfc71d..cb083025c 100644 --- a/examples/web/hello.html +++ b/examples/web/hello.html @@ -120,7 +120,7 @@

Results

.then(async (exe) => appendResults("Reply: " + (await exe.run(`echo 'Hello Golem! 👋 from ${exe.provider.name}!'`)).stdout), ); - await rental.finalize(); + await rental.stopAndFinalize(); appendResults("Finalized renting process"); } catch (err) { console.error("Failed to run the example", err); diff --git a/src/experimental/job/job.test.ts b/src/experimental/job/job.test.ts index 1850217c3..5df379040 100644 --- a/src/experimental/job/job.test.ts +++ b/src/experimental/job/job.test.ts @@ -49,7 +49,7 @@ describe("Job", () => { await expect(job.waitForResult()).rejects.toThrow("Canceled"); - verify(mockRental.finalize()).once(); + verify(mockRental.stopAndFinalize()).once(); }); }); }); diff --git a/src/experimental/job/job.ts b/src/experimental/job/job.ts index 8ac098731..3285654a2 100644 --- a/src/experimental/job/job.ts +++ b/src/experimental/job/job.ts @@ -138,7 +138,7 @@ export class Job { this.events.emit("started"); const onAbort = async () => { - await rental.finalize(); + await rental.stopAndFinalize(); this.events.emit("canceled"); }; diff --git a/src/golem-network/golem-network.test.ts b/src/golem-network/golem-network.test.ts index acc00a981..d02283f5b 100644 --- a/src/golem-network/golem-network.test.ts +++ b/src/golem-network/golem-network.test.ts @@ -73,7 +73,7 @@ describe("Golem Network", () => { const mockResourceRental = mock(ResourceRental); const mockResourceRentalInstance = instance(mockResourceRental); - when(mockResourceRental.finalize()).thenResolve(); + when(mockResourceRental.stopAndFinalize()).thenResolve(); when(mockRental.createResourceRental(_, _, _)).thenReturn(mockResourceRentalInstance); const draftProposal$ = new Subject(); @@ -93,7 +93,7 @@ describe("Golem Network", () => { await glm.disconnect(); - verify(mockResourceRental.finalize()).once(); + verify(mockResourceRental.stopAndFinalize()).once(); verify(mockPayment.releaseAllocation(allocation)).once(); }); it("should not release the allocation if it was provided by the user", async () => { @@ -101,7 +101,7 @@ describe("Golem Network", () => { const mockResourceRental = mock(ResourceRental); const mockResourceRentalInstance = instance(mockResourceRental); - when(mockResourceRental.finalize()).thenResolve(); + when(mockResourceRental.stopAndFinalize()).thenResolve(); when(mockRental.createResourceRental(_, _, _)).thenReturn(mockResourceRentalInstance); when(mockMarket.collectDraftOfferProposals(_)).thenReturn(new Subject()); @@ -121,7 +121,7 @@ describe("Golem Network", () => { await glm.disconnect(); - verify(mockResourceRental.finalize()).once(); + verify(mockResourceRental.stopAndFinalize()).once(); verify(mockPayment.createAllocation(_)).never(); verify(mockPayment.releaseAllocation(allocation)).never(); }); diff --git a/src/golem-network/golem-network.ts b/src/golem-network/golem-network.ts index fe1296697..7845a9c97 100644 --- a/src/golem-network/golem-network.ts +++ b/src/golem-network/golem-network.ts @@ -354,7 +354,7 @@ export class GolemNetwork { * .getExeUnit() * .then((exe) => exe.run("echo Hello, Golem! 👋")) * .then((res) => console.log(res.stdout)); - * await rental.finalize(); + * await rental.stopAndFinalize(); * ``` * * @param order @@ -402,7 +402,7 @@ export class GolemNetwork { this.cleanupTasks.push(async () => { // First finalize the rental (which will wait for all payments to be processed) // and only then release the allocation - await rental.finalize().catch((err) => this.logger.error("Error while finalizing rental", err)); + await rental.stopAndFinalize().catch((err) => this.logger.error("Error while finalizing rental", err)); if (order.network && networkNode) { await this.network .removeNetworkNode(order.network, networkNode) diff --git a/src/resource-rental/resource-rental-pool.ts b/src/resource-rental/resource-rental-pool.ts index 8d1f63b28..4dce07015 100644 --- a/src/resource-rental/resource-rental-pool.ts +++ b/src/resource-rental/resource-rental-pool.ts @@ -243,7 +243,7 @@ export class ResourceRentalPool { try { this.borrowed.delete(resourceRental); this.logger.debug("Destroying resource rental from the pool", { agreementId: resourceRental.agreement.id }); - await Promise.all([resourceRental.finalize(), this.removeNetworkNode(resourceRental)]); + await Promise.all([resourceRental.stopAndFinalize(), this.removeNetworkNode(resourceRental)]); this.events.emit("destroyed", resourceRental.agreement); } catch (error) { this.events.emit( diff --git a/src/resource-rental/resource-rental.ts b/src/resource-rental/resource-rental.ts index 9354aa64c..b4d33173d 100644 --- a/src/resource-rental/resource-rental.ts +++ b/src/resource-rental/resource-rental.ts @@ -49,10 +49,11 @@ export class ResourceRental { } /** + * Terminates the activity and agreement (stopping any ongoing work) and finalizes the payment process. * Resolves when the rental will be fully terminated and all pending business operations finalized. * If the rental is already finalized, it will resolve immediately. */ - async finalize() { + async stopAndFinalize() { // Prevent this task from being performed more than once if (!this.finalizePromise) { this.finalizePromise = (async () => { diff --git a/tests/e2e/resourceRentalPool.spec.ts b/tests/e2e/resourceRentalPool.spec.ts index a5d70963c..b1004b7d8 100644 --- a/tests/e2e/resourceRentalPool.spec.ts +++ b/tests/e2e/resourceRentalPool.spec.ts @@ -196,7 +196,7 @@ describe("ResourceRentalPool", () => { const exe = await resourceRental.getExeUnit(); return new Promise(async (res) => { resourceRental.events.on("finalized", async () => res(true)); - setTimeout(() => resourceRental.finalize(), 8_000); + setTimeout(() => resourceRental.stopAndFinalize(), 8_000); await expect(exe.run("sleep 10 && echo Hello World")).rejects.toThrow( new GolemAbortError("Execution of script has been aborted"), ); From 1c1d6ae9d930c47cac526f01a2c9ca1f3571be28 Mon Sep 17 00:00:00 2001 From: Seweryn Kras Date: Wed, 19 Jun 2024 13:57:11 +0200 Subject: [PATCH 10/10] test: update cypress test to include new wording --- tests/cypress/ui/hello-world.cy.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/cypress/ui/hello-world.cy.ts b/tests/cypress/ui/hello-world.cy.ts index d09d10eea..31597c681 100644 --- a/tests/cypress/ui/hello-world.cy.ts +++ b/tests/cypress/ui/hello-world.cy.ts @@ -7,6 +7,6 @@ describe("Test TaskExecutor API", () => { cy.get("#PAYMENT_NETWORK").clear().type(Cypress.env("PAYMENT_NETWORK")); cy.get("#echo").click(); cy.get("#results").should("include.text", "Hello Golem", { timeout: 60000 }); - cy.get("#results").should("include.text", "Finalized leasing process", { timeout: 10000 }); + cy.get("#results").should("include.text", "Finalized renting process", { timeout: 10000 }); }); });