From 17e851eaaa2623f48bc97c1b71800bd2c1311bd8 Mon Sep 17 00:00:00 2001 From: ThierryM1212 Date: Fri, 8 Dec 2023 15:53:59 +0100 Subject: [PATCH] implement streamboxes, getboxes use streamboxes to fetch all unspent boxes, remove the get node helper function --- .../src/ergo-node/ergoNodeProvider.spec.ts | 82 ++++++++++--- .../src/ergo-node/ergoNodeProvider.ts | 115 +++++++++--------- 2 files changed, 126 insertions(+), 71 deletions(-) diff --git a/packages/blockchain-providers/src/ergo-node/ergoNodeProvider.spec.ts b/packages/blockchain-providers/src/ergo-node/ergoNodeProvider.spec.ts index 3ccbc9d3..5a8cc6da 100644 --- a/packages/blockchain-providers/src/ergo-node/ergoNodeProvider.spec.ts +++ b/packages/blockchain-providers/src/ergo-node/ergoNodeProvider.spec.ts @@ -15,15 +15,18 @@ import { mockUTXOByAddress } from "_test-vectors"; import { afterEach, describe, expect, it, vi, vitest } from "vitest"; +import { mockChunkedResponse } from "../utils"; import * as rest from "../utils/rest"; -import { ErgoNodeProvider, getErgoNodeProvider } from "./ergoNodeProvider"; +import { ErgoNodeProvider } from "./ergoNodeProvider"; describe("Test node client", async () => { afterEach(() => { vi.restoreAllMocks(); }); - const nodeClient = getErgoNodeProvider("https://test0.com:9053/"); + const nodeClient = new ErgoNodeProvider({ + url: "https://test0.com:9053/" + }); const testOptions = { url: "https://test0.com:9053/" @@ -57,8 +60,6 @@ describe("Test node client", async () => { let boxes = await nodeClient.getBoxes({ where: { address: "9g16ZMPo22b3qaRL7HezyQt2HSW2ZBF6YR3WW9cYQjgQwYKxxoT" }, from: "blockchain", - limit: 5, - offset: 5, sort: "asc" }); expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); @@ -96,7 +97,8 @@ describe("Test node client", async () => { where: { ergoTree: "0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3" }, - from: "mempool" + from: "mempool", + sort: "desc" }); expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); const boxes2 = await nodeClient.getBoxes({ @@ -104,12 +106,10 @@ describe("Test node client", async () => { ergoTree: "0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3" }, from: "mempool", - sort: "asc", - offset: 1, - limit: 1 + sort: "asc" }); expect(boxes2.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); - expect(boxes[boxes.length - 2].boxId).toBe(boxes2[0].boxId); + expect(boxes[boxes.length - 1].boxId).toBe(boxes2[0].boxId); }); it("getBoxes - tokenId", async () => { @@ -121,12 +121,13 @@ describe("Test node client", async () => { expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); boxes = await nodeClient.getBoxes({ where: { tokenId: "fbbaac7337d051c10fc3da0ccb864f4d32d40027551e1c3ea3ce361f39b91e40" }, - from: "mempool" + from: "blockchain+mempool" }); expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); boxes = await nodeClient.getBoxes({ where: { tokenId: "fbbaac7337d051c10fc3da0ccb864f4d32d40027551e1c3ea3ce361f39b91e40" }, - from: "blockchain+mempool" + from: "mempool", + sort: "desc" }); expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); const boxes2 = await nodeClient.getBoxes({ @@ -134,16 +135,14 @@ describe("Test node client", async () => { tokenId: "fbbaac7337d051c10fc3da0ccb864f4d32d40027551e1c3ea3ce361f39b91e40" }, from: "mempool", - sort: "asc", - offset: 1, - limit: 1 + sort: "asc" }); expect(boxes2.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); - expect(boxes[boxes.length - 2].boxId).toBe(boxes2[0].boxId); + expect(boxes[boxes.length - 1].boxId).toBe(boxes2[0].boxId); }); it("Should throw not supported error when streamBoxes is called", async () => { - expect(nodeClient.streamBoxes).to.throw(NotSupportedError); + expect(true).toBe(true); }); it("Should throw not supported error when reduceTransaction is called", async () => { @@ -322,6 +321,14 @@ describe("Test node client", async () => { ); expect(utxos.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); }); + it("getUnspentMempoolBoxesByErgotree", async () => { + vi.spyOn(rest, "post").mockImplementation(() => Promise.resolve(mockTransactionList.items)); + const utxos = await nodeClient.getUnspentMempoolBoxesByErgotree( + "0008cd03b4cf5eb18d1f45f73472bc96578a87f6d967015c59c636c7a0b139348ce826b0", + 0 + ); + expect(utxos.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); + }); it("sendTransaction - success", async () => { vi.spyOn(rest, "post").mockImplementation(() => Promise.resolve(mockPostTxSuccess)); @@ -414,4 +421,47 @@ describe("Test node client", async () => { ); expect(utxos.map((b) => new ErgoBox(b))).toBeInstanceOf(Array); }); + + it("Should stream boxes with default params", async () => { + const fetchSpy = vi + .spyOn(global, "fetch") + .mockResolvedValue( + mockChunkedResponse([ + JSON.stringify(mockUTXOByAddress.splice(0, 2)), + JSON.stringify(mockUTXOByAddress), + JSON.stringify([]) + ]) + ); + + let boxesCount = 0; + for await (const boxes of nodeClient.streamBoxes({ + where: { + ergoTree: "0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3" + }, + from: "blockchain" + })) { + boxesCount += boxes.length; + } + + expect(boxesCount).toBe(3); + expect(fetchSpy).toBeCalledTimes(3); + + const [firstCallBody, secondCallBody, thirdCallBody] = fetchSpy.mock.calls.map((call) => + JSON.parse(call[1]!.body as string) + ); + expect(firstCallBody).to.be.equal( + "0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3" + ); + expect(secondCallBody).to.be.equal( + "0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3" + ); + expect(thirdCallBody).to.be.equal( + "0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3" + ); + + const [firstURL, secondURL, thirdURL] = fetchSpy.mock.calls.map((call) => call[0]); + expect(firstURL).to.include("offset=0&limit=50"); + expect(secondURL).to.include("offset=50&limit=50"); + expect(thirdURL).to.include("offset=100&limit=50"); + }); }); diff --git a/packages/blockchain-providers/src/ergo-node/ergoNodeProvider.ts b/packages/blockchain-providers/src/ergo-node/ergoNodeProvider.ts index 90748dfa..55b60492 100644 --- a/packages/blockchain-providers/src/ergo-node/ergoNodeProvider.ts +++ b/packages/blockchain-providers/src/ergo-node/ergoNodeProvider.ts @@ -9,10 +9,13 @@ import { NewToken, NonMandatoryRegisters, NotSupportedError, + orderBy, SignedTransaction, + some, SortingDirection, TokenId, - TransactionId + TransactionId, + uniqBy } from "@fleet-sdk/common"; import { ErgoAddress, ErgoBox } from "@fleet-sdk/core"; import { RequireExactlyOne } from "type-fest"; @@ -27,32 +30,7 @@ import { TransactionEvaluationSuccess, TransactionReductionResult } from "../types"; -import { DEFAULT_HEADERS, Fetcher, get, post, RequestOptions, ResponseParser } from "../utils/rest"; - -/** - * Get a node client - * @param url : url of the node including the port - * @param parser : Default JSON, parse the response text and stringify the body for post. Use json-bigint - * to avoid overflow of javascript number. - * @param fetcher : Default fetch, fetcher to reteive the data - * @param headers : Headers for the get and post request - * @returns ErgoNodeProvider - */ -export function getErgoNodeProvider( - url: URL | string, - parser: ResponseParser = JSON, - fetcher: Fetcher = fetch, - headers: Headers = DEFAULT_HEADERS -): ErgoNodeProvider { - const nodeOptions: RequestOptions = { - url: url, - parser: parser, - fetcher: fetcher, - headers: headers - }; - - return new ErgoNodeProvider(nodeOptions); -} +import { DEFAULT_HEADERS, get, post, RequestOptions } from "../utils/rest"; export type TokenInfo = { id: TokenId; @@ -99,18 +77,6 @@ export type NodeBoxQuery = BoxQuery & { /** The query to filter boxes. Only one filter can be provided to node client */ where: RequireExactlyOne; - /** - * Limit the number of outputs when applicable. - * @default 0 - */ - limit?: number; - - /** - * Amount of result to skip from the begining. - * @default 0 - */ - offset?: number; - /** * Since an amount of result from the begining. * @default 'desc' @@ -122,6 +88,15 @@ export class ErgoNodeProvider implements IBlockchainProvider { private _nodeOptions: RequestOptions; constructor(nodeOptions: RequestOptions) { + if (!nodeOptions.fetcher) { + nodeOptions.fetcher = fetch; + } + if (!nodeOptions.parser) { + nodeOptions.parser = JSON; + } + if (!nodeOptions.headers) { + nodeOptions.headers = DEFAULT_HEADERS; + } this._nodeOptions = nodeOptions; } @@ -132,16 +107,53 @@ export class ErgoNodeProvider implements IBlockchainProvider { * @returns {ChainProviderBox[]} */ async getBoxes(query: NodeBoxQuery): Promise { - let limit = 0, - offset = 0, - sort: SortingDirection = "desc", - output: ChainProviderBox[] = []; - if (query.limit) { - limit = query.limit; - } - if (query.offset) { - offset = query.offset; + let boxes: ChainProviderBox[] = []; + for await (const chunk of this.streamBoxes(query)) { + boxes = boxes.concat(chunk); } + + return orderBy(boxes, (box) => box.creationHeight, query.sort); + } + + /** + * Stream the unspent boxes matching the query by chunk + * @param {NodeBoxQuery} query + */ + async *streamBoxes(query: NodeBoxQuery): AsyncIterable { + const returnedBoxIds = new Set(); + const CHUNK_SIZE = 50; + let offset = 0, + isEmpty = false; + do { + let boxes = await this.getBoxesChunk(query, CHUNK_SIZE, offset); + + if (some(boxes)) { + // boxes can be moved from the mempool to the blockchain while streaming, + // so we need to filter out boxes that have already been returned. + if (boxes.some((box) => returnedBoxIds.has(box.boxId))) { + boxes = boxes.filter((b) => !returnedBoxIds.has(b.boxId)); + } + + if (some(boxes)) { + boxes = uniqBy(boxes, (box) => box.boxId); + boxes.forEach((box) => returnedBoxIds.add(box.boxId)); + + yield boxes; + } + } + + isEmpty = boxes.length === 0; + offset += CHUNK_SIZE; + } while (!isEmpty); + } + + async getBoxesChunk( + query: NodeBoxQuery, + limit: number, + offset: number + ): Promise { + let sort: SortingDirection = "desc", + output: ChainProviderBox[] = []; if (query.sort) { sort = query.sort; } @@ -242,13 +254,6 @@ export class ErgoNodeProvider implements IBlockchainProvider { return output; } - /** - * Not supported operation ny node client - */ - streamBoxes(query: BoxQuery): AsyncIterable { - throw new NotSupportedError("Method not implemented." + JSON.stringify(query)); - } - /** * Get the last headers objects * @param query HeaderQuery