Skip to content

Commit

Permalink
implement streamboxes, getboxes use streamboxes to fetch all unspent …
Browse files Browse the repository at this point in the history
…boxes, remove the get node helper function
  • Loading branch information
ThierryM1212 committed Dec 8, 2023
1 parent 9613f03 commit 17e851e
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ import {
mockUTXOByAddress
} from "_test-vectors";
import { afterEach, describe, expect, it, vi, vitest } from "vitest";
import { mockChunkedResponse } from "../utils";
import * as rest from "../utils/rest";
import { ErgoNodeProvider, getErgoNodeProvider } from "./ergoNodeProvider";
import { ErgoNodeProvider } from "./ergoNodeProvider";

describe("Test node client", async () => {
afterEach(() => {
vi.restoreAllMocks();
});

const nodeClient = getErgoNodeProvider("https://test0.com:9053/");
const nodeClient = new ErgoNodeProvider({
url: "https://test0.com:9053/"
});

const testOptions = {
url: "https://test0.com:9053/"
Expand Down Expand Up @@ -57,8 +60,6 @@ describe("Test node client", async () => {
let boxes = await nodeClient.getBoxes({
where: { address: "9g16ZMPo22b3qaRL7HezyQt2HSW2ZBF6YR3WW9cYQjgQwYKxxoT" },
from: "blockchain",
limit: 5,
offset: 5,
sort: "asc"
});
expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
Expand Down Expand Up @@ -96,20 +97,19 @@ describe("Test node client", async () => {
where: {
ergoTree: "0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3"
},
from: "mempool"
from: "mempool",
sort: "desc"
});
expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
const boxes2 = await nodeClient.getBoxes({
where: {
ergoTree: "0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3"
},
from: "mempool",
sort: "asc",
offset: 1,
limit: 1
sort: "asc"
});
expect(boxes2.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
expect(boxes[boxes.length - 2].boxId).toBe(boxes2[0].boxId);
expect(boxes[boxes.length - 1].boxId).toBe(boxes2[0].boxId);
});

it("getBoxes - tokenId", async () => {
Expand All @@ -121,29 +121,28 @@ describe("Test node client", async () => {
expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
boxes = await nodeClient.getBoxes({
where: { tokenId: "fbbaac7337d051c10fc3da0ccb864f4d32d40027551e1c3ea3ce361f39b91e40" },
from: "mempool"
from: "blockchain+mempool"
});
expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
boxes = await nodeClient.getBoxes({
where: { tokenId: "fbbaac7337d051c10fc3da0ccb864f4d32d40027551e1c3ea3ce361f39b91e40" },
from: "blockchain+mempool"
from: "mempool",
sort: "desc"
});
expect(boxes.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
const boxes2 = await nodeClient.getBoxes({
where: {
tokenId: "fbbaac7337d051c10fc3da0ccb864f4d32d40027551e1c3ea3ce361f39b91e40"
},
from: "mempool",
sort: "asc",
offset: 1,
limit: 1
sort: "asc"
});
expect(boxes2.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
expect(boxes[boxes.length - 2].boxId).toBe(boxes2[0].boxId);
expect(boxes[boxes.length - 1].boxId).toBe(boxes2[0].boxId);
});

it("Should throw not supported error when streamBoxes is called", async () => {
expect(nodeClient.streamBoxes).to.throw(NotSupportedError);
expect(true).toBe(true);
});

it("Should throw not supported error when reduceTransaction is called", async () => {
Expand Down Expand Up @@ -322,6 +321,14 @@ describe("Test node client", async () => {
);
expect(utxos.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
});
it("getUnspentMempoolBoxesByErgotree", async () => {
vi.spyOn(rest, "post").mockImplementation(() => Promise.resolve(mockTransactionList.items));
const utxos = await nodeClient.getUnspentMempoolBoxesByErgotree(
"0008cd03b4cf5eb18d1f45f73472bc96578a87f6d967015c59c636c7a0b139348ce826b0",
0
);
expect(utxos.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
});

it("sendTransaction - success", async () => {
vi.spyOn(rest, "post").mockImplementation(() => Promise.resolve(mockPostTxSuccess));
Expand Down Expand Up @@ -414,4 +421,47 @@ describe("Test node client", async () => {
);
expect(utxos.map((b) => new ErgoBox(b))).toBeInstanceOf(Array<ErgoBox>);
});

it("Should stream boxes with default params", async () => {
const fetchSpy = vi
.spyOn(global, "fetch")
.mockResolvedValue(
mockChunkedResponse([
JSON.stringify(mockUTXOByAddress.splice(0, 2)),
JSON.stringify(mockUTXOByAddress),
JSON.stringify([])
])
);

let boxesCount = 0;
for await (const boxes of nodeClient.streamBoxes({
where: {
ergoTree: "0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3"
},
from: "blockchain"
})) {
boxesCount += boxes.length;
}

expect(boxesCount).toBe(3);
expect(fetchSpy).toBeCalledTimes(3);

const [firstCallBody, secondCallBody, thirdCallBody] = fetchSpy.mock.calls.map((call) =>
JSON.parse(call[1]!.body as string)
);
expect(firstCallBody).to.be.equal(
"0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3"
);
expect(secondCallBody).to.be.equal(
"0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3"
);
expect(thirdCallBody).to.be.equal(
"0008cd02c35a808c1c713fc1ae169e33da7492eee8f913a2045a7d56a3ca3103b5525ff3"
);

const [firstURL, secondURL, thirdURL] = fetchSpy.mock.calls.map((call) => call[0]);
expect(firstURL).to.include("offset=0&limit=50");
expect(secondURL).to.include("offset=50&limit=50");
expect(thirdURL).to.include("offset=100&limit=50");
});
});
115 changes: 60 additions & 55 deletions packages/blockchain-providers/src/ergo-node/ergoNodeProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import {
NewToken,
NonMandatoryRegisters,
NotSupportedError,
orderBy,
SignedTransaction,
some,
SortingDirection,
TokenId,
TransactionId
TransactionId,
uniqBy
} from "@fleet-sdk/common";
import { ErgoAddress, ErgoBox } from "@fleet-sdk/core";
import { RequireExactlyOne } from "type-fest";
Expand All @@ -27,32 +30,7 @@ import {
TransactionEvaluationSuccess,
TransactionReductionResult
} from "../types";
import { DEFAULT_HEADERS, Fetcher, get, post, RequestOptions, ResponseParser } from "../utils/rest";

/**
* Get a node client
* @param url : url of the node including the port
* @param parser : Default JSON, parse the response text and stringify the body for post. Use json-bigint
* to avoid overflow of javascript number.
* @param fetcher : Default fetch, fetcher to reteive the data
* @param headers : Headers for the get and post request
* @returns ErgoNodeProvider
*/
export function getErgoNodeProvider(
url: URL | string,
parser: ResponseParser = JSON,
fetcher: Fetcher = fetch,
headers: Headers = DEFAULT_HEADERS
): ErgoNodeProvider {
const nodeOptions: RequestOptions = {
url: url,
parser: parser,
fetcher: fetcher,
headers: headers
};

return new ErgoNodeProvider(nodeOptions);
}
import { DEFAULT_HEADERS, get, post, RequestOptions } from "../utils/rest";

export type TokenInfo = {
id: TokenId;
Expand Down Expand Up @@ -99,18 +77,6 @@ export type NodeBoxQuery<W extends NodeBoxWhere> = BoxQuery<W> & {
/** The query to filter boxes. Only one filter can be provided to node client */
where: RequireExactlyOne<W>;

/**
* Limit the number of outputs when applicable.
* @default 0
*/
limit?: number;

/**
* Amount of result to skip from the begining.
* @default 0
*/
offset?: number;

/**
* Since an amount of result from the begining.
* @default 'desc'
Expand All @@ -122,6 +88,15 @@ export class ErgoNodeProvider implements IBlockchainProvider<BoxWhere> {
private _nodeOptions: RequestOptions;

constructor(nodeOptions: RequestOptions) {
if (!nodeOptions.fetcher) {
nodeOptions.fetcher = fetch;
}
if (!nodeOptions.parser) {
nodeOptions.parser = JSON;
}
if (!nodeOptions.headers) {
nodeOptions.headers = DEFAULT_HEADERS;
}
this._nodeOptions = nodeOptions;
}

Expand All @@ -132,16 +107,53 @@ export class ErgoNodeProvider implements IBlockchainProvider<BoxWhere> {
* @returns {ChainProviderBox[]}
*/
async getBoxes(query: NodeBoxQuery<NodeBoxWhere>): Promise<ChainProviderBox[]> {
let limit = 0,
offset = 0,
sort: SortingDirection = "desc",
output: ChainProviderBox[] = [];
if (query.limit) {
limit = query.limit;
}
if (query.offset) {
offset = query.offset;
let boxes: ChainProviderBox[] = [];
for await (const chunk of this.streamBoxes(query)) {
boxes = boxes.concat(chunk);
}

return orderBy(boxes, (box) => box.creationHeight, query.sort);
}

/**
* Stream the unspent boxes matching the query by chunk
* @param {NodeBoxQuery} query
*/
async *streamBoxes(query: NodeBoxQuery<NodeBoxWhere>): AsyncIterable<ChainProviderBox[]> {
const returnedBoxIds = new Set<string>();
const CHUNK_SIZE = 50;
let offset = 0,
isEmpty = false;
do {
let boxes = await this.getBoxesChunk(query, CHUNK_SIZE, offset);

if (some(boxes)) {
// boxes can be moved from the mempool to the blockchain while streaming,
// so we need to filter out boxes that have already been returned.
if (boxes.some((box) => returnedBoxIds.has(box.boxId))) {
boxes = boxes.filter((b) => !returnedBoxIds.has(b.boxId));
}

if (some(boxes)) {
boxes = uniqBy(boxes, (box) => box.boxId);
boxes.forEach((box) => returnedBoxIds.add(box.boxId));

yield boxes;
}
}

isEmpty = boxes.length === 0;
offset += CHUNK_SIZE;
} while (!isEmpty);
}

async getBoxesChunk(
query: NodeBoxQuery<NodeBoxWhere>,
limit: number,
offset: number
): Promise<ChainProviderBox[]> {
let sort: SortingDirection = "desc",
output: ChainProviderBox[] = [];
if (query.sort) {
sort = query.sort;
}
Expand Down Expand Up @@ -242,13 +254,6 @@ export class ErgoNodeProvider implements IBlockchainProvider<BoxWhere> {
return output;
}

/**
* Not supported operation ny node client
*/
streamBoxes(query: BoxQuery<BoxWhere>): AsyncIterable<ChainProviderBox[]> {
throw new NotSupportedError("Method not implemented." + JSON.stringify(query));
}

/**
* Get the last headers objects
* @param query HeaderQuery
Expand Down

0 comments on commit 17e851e

Please sign in to comment.