From 70bec5464f4699d774a83f635f3f85208253bf2e Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Wed, 16 Aug 2023 20:59:12 +1000 Subject: [PATCH 01/15] start blockchain monitor component --- src/blockchain/blockchain_monitor.ts | 127 ++++++++++++++++++ .../{ => clients}/blockchain_client.ts | 2 + .../{ => clients}/blockchain_provider.ts | 15 ++- src/blockchain/clients/explorer.ts | 11 +- src/blockchain/clients/mod.ts | 3 +- src/blockchain/clients/node.ts | 56 ++++++++ src/blockchain/mod.ts | 3 +- src/cli_test.ts | 5 - 8 files changed, 208 insertions(+), 14 deletions(-) create mode 100644 src/blockchain/blockchain_monitor.ts rename src/blockchain/{ => clients}/blockchain_client.ts (87%) rename src/blockchain/{ => clients}/blockchain_provider.ts (62%) create mode 100644 src/blockchain/clients/node.ts delete mode 100644 src/cli_test.ts diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts new file mode 100644 index 0000000..206990e --- /dev/null +++ b/src/blockchain/blockchain_monitor.ts @@ -0,0 +1,127 @@ +import { TransactionId } from "@fleet-sdk/common"; +import { Component } from "../component.ts"; +import { ErgomaticConfig } from "../config.ts"; +import { BlockchainClient, BlockchainProvider } from "./clients/mod.ts"; + +interface MonitorState { + currentHeight: number; + mempoolTxDelivery: Record; + mempoolTxChecks: Record; + maxMempoolTxChecks: number; + pastMempoolTxIds: TransactionId[]; +} + +export class BlockchainMonitor extends Component { + readonly #blockchainClient: BlockchainClient; + readonly #pollInterval: number; + readonly #state: MonitorState; + #taskHandle?: number; + + constructor( + config: ErgomaticConfig, + blockchainClient?: BlockchainClient, + pollInterval: number = 10000, + ) { + super(config, "BlockchainMonitor"); + + this.#pollInterval = pollInterval; + this.#blockchainClient = blockchainClient ?? new BlockchainProvider(config); + this.#state = { + currentHeight: 0, + mempoolTxDelivery: {}, + mempoolTxChecks: {}, + maxMempoolTxChecks: 10, + pastMempoolTxIds: [], + }; + } + + start(): Promise { + // TODO: raise component:error event if monitor throws exception + this.#taskHandle = setInterval(() => this.#monitor(), this.#pollInterval); + + return super.start(); + } + + stop(): Promise { + clearInterval(this.#taskHandle); + + return super.stop(); + } + + async #monitor() { + this.logger.debug("Gathering blockchain state"); + + // mempool = getMempool() + // for tx in mempool + // do + // if mempoolDelivery[tx.txid] == false + // do + // plugins.all.onMempoolTx(tx) + // mempoolDelivery[tx.txid] = true + // end + + // pastMempool.removeIfExists(tx.txid) + + // # remove txid from undefined state transactions map, + // # if present + // delete undefinedStateCheks[tx.txid] + // end + const mempool = await this.#blockchainClient.getMempool(); + + for (const tx of mempool) { + if (!this.#state.mempoolTxDelivery[tx.id]) { + this.#state.mempoolTxDelivery[tx.id] = true; + + // TODO: emit onMempoolTx event + } + + // pastMempool.removeIfExists(tx.txid) + + // # remove txid from undefined state transactions map, + // # if present + // delete undefinedStateCheks[tx.txid] + } + + // # if tx was present in previous mempool, but not in the + // # current, it may have been dropped or included in a block + // for txid in pastMempool + // do + // undefinedStateCheks[txid] = (undefinedStateCheks[txid] ?? 0) + 1 + // end + // pastMempool = mempool.map(tx => tx.txid); + + // height = getCurrentHeight() + // if height > currentHeight + // do + // newBlock = getBlock(height) + // plugins.all.onNewBlock(newBlock) + + // currentHeight = height + + // for tx in newBlock.txs + // do + // plugins.all.onIncludedTx(tx) + + // # stop tracking txid mempool delivery for this txid + // delete mempoolDelivery[tx.txid] + + // # prevent `onMempoolTxDrop` event for this txid as + // # it is now included in a block + // delete undefinedStateCheks[tx.txid] + // end + // end + + // for txid in undefinedStateCheks.keys + // do + // # if a tx is not included in a block in dropChecks * `n` seconds, + // # then it's probably dropped from the mempool + // if undefinedStateCheks[txid] > maxChecks + // do # consider dropped + // plugins.all.onMempoolTxDrop(txid) + // delete undefinedStateCheks[txid] + // else # one more inconclusive check + // undefinedStateCheks[txid] += 1 + // end + // end + } +} diff --git a/src/blockchain/blockchain_client.ts b/src/blockchain/clients/blockchain_client.ts similarity index 87% rename from src/blockchain/blockchain_client.ts rename to src/blockchain/clients/blockchain_client.ts index 97dd7ed..2f93529 100644 --- a/src/blockchain/blockchain_client.ts +++ b/src/blockchain/clients/blockchain_client.ts @@ -12,4 +12,6 @@ export interface BlockchainClient { getBoxesByTokenId( tokenId: TokenId, ): AsyncGenerator[]>; + + getMempool(): Promise; } diff --git a/src/blockchain/blockchain_provider.ts b/src/blockchain/clients/blockchain_provider.ts similarity index 62% rename from src/blockchain/blockchain_provider.ts rename to src/blockchain/clients/blockchain_provider.ts index cf4919e..1292671 100644 --- a/src/blockchain/blockchain_provider.ts +++ b/src/blockchain/clients/blockchain_provider.ts @@ -5,22 +5,29 @@ import { TokenId, TransactionId, } from "@fleet-sdk/common"; -import { Component } from "../component.ts"; -import { ErgomaticConfig } from "../config.ts"; +import { Component } from "../../component.ts"; +import { ErgomaticConfig } from "../../config.ts"; import { BlockchainClient } from "./blockchain_client.ts"; -import { ExplorerClient } from "./clients/mod.ts"; +import { ExplorerClient } from "./explorer.ts"; +import { NodeClient } from "./node.ts"; export class BlockchainProvider extends Component implements BlockchainClient { readonly #explorer: BlockchainClient; + readonly #node: BlockchainClient; constructor(config: ErgomaticConfig) { super(config, "BlockchainProvider"); this.#explorer = new ExplorerClient(config); + this.#node = new NodeClient(config); + } + + getMempool(): Promise { + return this.#node.getMempool(); } submitTx(signedTx: SignedTransaction): Promise { - return this.#explorer.submitTx(signedTx); + return this.#node.submitTx(signedTx); } getBoxesByTokenId( diff --git a/src/blockchain/clients/explorer.ts b/src/blockchain/clients/explorer.ts index 2c14e5f..78ec5a2 100644 --- a/src/blockchain/clients/explorer.ts +++ b/src/blockchain/clients/explorer.ts @@ -6,15 +6,18 @@ import { TransactionId, } from "@fleet-sdk/common"; import axios, { AxiosInstance } from "axios"; -import { BlockchainClient } from "../blockchain_client.ts"; +import { BlockchainClient } from "./blockchain_client.ts"; import { ErgomaticConfig } from "../../config.ts"; +import { Component } from "../../component.ts"; -export class ExplorerClient implements BlockchainClient { +export class ExplorerClient extends Component implements BlockchainClient { readonly #http: AxiosInstance; #pageSize = 100; #timeoutMs: number; constructor(config: ErgomaticConfig, httpTimeoutMs: number = 10000) { + super(config, "ExplorerClient"); + // axios timeout is incompatible with deno due to a missing nodejs API // use signals for timeouts instead. this.#timeoutMs = httpTimeoutMs; @@ -24,6 +27,10 @@ export class ExplorerClient implements BlockchainClient { }); } + getMempool(): Promise { + throw new Error(`${this.name} does not support getMempool operation`); + } + async submitTx(signedTx: SignedTransaction): Promise { const response = await this.#http.post( "/mempool/transactions/submit", diff --git a/src/blockchain/clients/mod.ts b/src/blockchain/clients/mod.ts index 3e99d9a..bb0c861 100644 --- a/src/blockchain/clients/mod.ts +++ b/src/blockchain/clients/mod.ts @@ -1 +1,2 @@ -export * from "./explorer.ts"; +export * from "./blockchain_client.ts"; +export * from "./blockchain_provider.ts"; diff --git a/src/blockchain/clients/node.ts b/src/blockchain/clients/node.ts new file mode 100644 index 0000000..e0fdc0b --- /dev/null +++ b/src/blockchain/clients/node.ts @@ -0,0 +1,56 @@ +import { + AmountType, + Box, + SignedTransaction, + TokenId, + TransactionId, +} from "@fleet-sdk/common"; +import { BlockchainClient } from "./blockchain_client.ts"; +import { Component } from "../../component.ts"; +import { ErgomaticConfig } from "../../config.ts"; +import axios, { AxiosInstance } from "axios"; + +export class NodeClient extends Component implements BlockchainClient { + readonly #http: AxiosInstance; + #timeoutMs: number; + + constructor(config: ErgomaticConfig, httpTimeoutMs: number = 10000) { + super(config, "NodeClient"); + + this.#timeoutMs = httpTimeoutMs; + this.#http = axios.create({ + baseURL: config.node.endpoint, + }); + } + + submitTx( + signedTx: SignedTransaction, + ): Promise { + return this.#http.post( + "/transactions", + signedTx, + this.#defaultRequestConfig, + ); + } + + // deno-lint-ignore require-yield + async *getBoxesByTokenId( + _tokenId: TokenId, + ): AsyncGenerator[]> { + throw new Error( + `${this.name} does not support getBoxesByTokenId operation`, + ); + } + + getMempool(): Promise { + // TODO: this might need pagination + return this.#http.get( + "/transactions/unconfirmed", + this.#defaultRequestConfig, + ); + } + + get #defaultRequestConfig() { + return { signal: AbortSignal.timeout(this.#timeoutMs) }; + } +} diff --git a/src/blockchain/mod.ts b/src/blockchain/mod.ts index 2902dce..293b0c3 100644 --- a/src/blockchain/mod.ts +++ b/src/blockchain/mod.ts @@ -1,2 +1 @@ -export * from "./blockchain_provider.ts"; -export * from "./blockchain_client.ts"; +export * from "./clients/mod.ts"; diff --git a/src/cli_test.ts b/src/cli_test.ts deleted file mode 100644 index b957d51..0000000 --- a/src/cli_test.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { assertEquals } from "std/testing/asserts.ts"; - -Deno.test("example", () => { - assertEquals(1, 1); -}); From 977ac6db0fe7378427a94250ad22ee1434cde93e Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Thu, 17 Aug 2023 14:09:10 +1000 Subject: [PATCH 02/15] more blockchain monitor work --- src/blockchain/blockchain_monitor.ts | 93 +++++++++---------- src/blockchain/clients/blockchain_client.ts | 4 + src/blockchain/clients/blockchain_provider.ts | 8 ++ src/blockchain/clients/explorer.ts | 8 ++ src/blockchain/clients/node.ts | 8 ++ 5 files changed, 74 insertions(+), 47 deletions(-) diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts index 206990e..9947f1b 100644 --- a/src/blockchain/blockchain_monitor.ts +++ b/src/blockchain/blockchain_monitor.ts @@ -1,19 +1,26 @@ -import { TransactionId } from "@fleet-sdk/common"; +import { SignedTransaction, TransactionId } from "@fleet-sdk/common"; import { Component } from "../component.ts"; import { ErgomaticConfig } from "../config.ts"; import { BlockchainClient, BlockchainProvider } from "./clients/mod.ts"; interface MonitorState { currentHeight: number; + /** map of txid -> bool indicating if mempool tx has been passed to plugins */ mempoolTxDelivery: Record; + /** map of txid -> int indicating the number of re-checks */ mempoolTxChecks: Record; - maxMempoolTxChecks: number; pastMempoolTxIds: TransactionId[]; } -export class BlockchainMonitor extends Component { +interface BlockchainMonitorEvent { + "monitor:mempool-tx": CustomEvent; + "monitor:block": CustomEvent; +} + +export class BlockchainMonitor extends Component { readonly #blockchainClient: BlockchainClient; readonly #pollInterval: number; + readonly #maxMempoolTxChecks: number; readonly #state: MonitorState; #taskHandle?: number; @@ -21,16 +28,17 @@ export class BlockchainMonitor extends Component { config: ErgomaticConfig, blockchainClient?: BlockchainClient, pollInterval: number = 10000, + maxMempoolTxChecks: number = 10, ) { super(config, "BlockchainMonitor"); this.#pollInterval = pollInterval; + this.#maxMempoolTxChecks = maxMempoolTxChecks; this.#blockchainClient = blockchainClient ?? new BlockchainProvider(config); this.#state = { currentHeight: 0, mempoolTxDelivery: {}, mempoolTxChecks: {}, - maxMempoolTxChecks: 10, pastMempoolTxIds: [], }; } @@ -51,65 +59,56 @@ export class BlockchainMonitor extends Component { async #monitor() { this.logger.debug("Gathering blockchain state"); - // mempool = getMempool() - // for tx in mempool - // do - // if mempoolDelivery[tx.txid] == false - // do - // plugins.all.onMempoolTx(tx) - // mempoolDelivery[tx.txid] = true - // end - - // pastMempool.removeIfExists(tx.txid) - - // # remove txid from undefined state transactions map, - // # if present - // delete undefinedStateCheks[tx.txid] - // end const mempool = await this.#blockchainClient.getMempool(); for (const tx of mempool) { if (!this.#state.mempoolTxDelivery[tx.id]) { this.#state.mempoolTxDelivery[tx.id] = true; - // TODO: emit onMempoolTx event + this.dispatchEvent( + new CustomEvent("monitor:mempool-tx", { detail: tx }), + ); } - // pastMempool.removeIfExists(tx.txid) + this.#state.pastMempoolTxIds = this.#state.pastMempoolTxIds.filter(( + txId, + ) => txId !== tx.id); - // # remove txid from undefined state transactions map, - // # if present - // delete undefinedStateCheks[tx.txid] + delete this.#state.mempoolTxChecks[tx.id]; } - // # if tx was present in previous mempool, but not in the - // # current, it may have been dropped or included in a block - // for txid in pastMempool - // do - // undefinedStateCheks[txid] = (undefinedStateCheks[txid] ?? 0) + 1 - // end - // pastMempool = mempool.map(tx => tx.txid); + // if tx was present in previous mempool, but not in the + // current, it may have been dropped or included in a block + for (const txId of this.#state.pastMempoolTxIds) { + this.#state.mempoolTxChecks[txId] = + (this.#state.mempoolTxChecks[txId] ?? 0) + 1; + } - // height = getCurrentHeight() - // if height > currentHeight - // do - // newBlock = getBlock(height) - // plugins.all.onNewBlock(newBlock) + this.#state.pastMempoolTxIds = mempool.map((tx) => tx.id); - // currentHeight = height + const currentHeight = await this.#blockchainClient.getCurrentHeight(); - // for tx in newBlock.txs - // do - // plugins.all.onIncludedTx(tx) + if (currentHeight > this.#state.currentHeight) { + const newBlock = await this.#blockchainClient.getBlock(currentHeight); - // # stop tracking txid mempool delivery for this txid - // delete mempoolDelivery[tx.txid] + this.dispatchEvent( + new CustomEvent("monitor:block", { detail: newBlock }), + ); - // # prevent `onMempoolTxDrop` event for this txid as - // # it is now included in a block - // delete undefinedStateCheks[tx.txid] - // end - // end + this.#state.currentHeight = currentHeight; + + // for tx in newBlock.txs + // do + // plugins.all.onIncludedTx(tx) + + // # stop tracking txid mempool delivery for this txid + // delete mempoolDelivery[tx.txid] + + // # prevent `onMempoolTxDrop` event for this txid as + // # it is now included in a block + // delete undefinedStateCheks[tx.txid] + // end + } // for txid in undefinedStateCheks.keys // do diff --git a/src/blockchain/clients/blockchain_client.ts b/src/blockchain/clients/blockchain_client.ts index 2f93529..734a4fb 100644 --- a/src/blockchain/clients/blockchain_client.ts +++ b/src/blockchain/clients/blockchain_client.ts @@ -14,4 +14,8 @@ export interface BlockchainClient { ): AsyncGenerator[]>; getMempool(): Promise; + + getCurrentHeight(): Promise; + + getBlock(height: number): Promise; } diff --git a/src/blockchain/clients/blockchain_provider.ts b/src/blockchain/clients/blockchain_provider.ts index 1292671..dc0392a 100644 --- a/src/blockchain/clients/blockchain_provider.ts +++ b/src/blockchain/clients/blockchain_provider.ts @@ -22,6 +22,14 @@ export class BlockchainProvider extends Component implements BlockchainClient { this.#node = new NodeClient(config); } + getBlock(height: number): Promise { + return this.#node.getBlock(height); + } + + getCurrentHeight(): Promise { + return this.#node.getCurrentHeight(); + } + getMempool(): Promise { return this.#node.getMempool(); } diff --git a/src/blockchain/clients/explorer.ts b/src/blockchain/clients/explorer.ts index 78ec5a2..9481f3b 100644 --- a/src/blockchain/clients/explorer.ts +++ b/src/blockchain/clients/explorer.ts @@ -27,6 +27,14 @@ export class ExplorerClient extends Component implements BlockchainClient { }); } + getBlock(height: number): Promise { + throw new Error("Method not implemented."); + } + + getCurrentHeight(): Promise { + throw new Error("Method not implemented."); + } + getMempool(): Promise { throw new Error(`${this.name} does not support getMempool operation`); } diff --git a/src/blockchain/clients/node.ts b/src/blockchain/clients/node.ts index e0fdc0b..d2d0616 100644 --- a/src/blockchain/clients/node.ts +++ b/src/blockchain/clients/node.ts @@ -23,6 +23,14 @@ export class NodeClient extends Component implements BlockchainClient { }); } + getBlock(height: number): Promise { + throw new Error("Method not implemented."); + } + + getCurrentHeight(): Promise { + throw new Error("Method not implemented."); + } + submitTx( signedTx: SignedTransaction, ): Promise { From 43d5c4dfd02378888b24714579a19fb3240522b2 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Thu, 17 Aug 2023 15:58:15 +1000 Subject: [PATCH 03/15] start wiring up monitor -> plugins --- src/blockchain/blockchain_monitor.ts | 25 +++++++++++++------------ src/blockchain/clients/explorer.ts | 2 +- src/blockchain/clients/node.ts | 2 +- src/blockchain/mod.ts | 1 + src/ergomatic.ts | 17 +++++++++++++++-- src/plugins/_testing.ts | 2 +- src/plugins/plugin.ts | 6 ++++++ src/plugins/plugin_manager.ts | 12 ++++++++---- src/plugins/plugin_manager_test.ts | 24 ++++++++++++++++++++++++ 9 files changed, 70 insertions(+), 21 deletions(-) diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts index 9947f1b..0e14deb 100644 --- a/src/blockchain/blockchain_monitor.ts +++ b/src/blockchain/blockchain_monitor.ts @@ -14,6 +14,7 @@ interface MonitorState { interface BlockchainMonitorEvent { "monitor:mempool-tx": CustomEvent; + "monitor:mempool-tx-drop": CustomEvent; "monitor:block": CustomEvent; } @@ -74,6 +75,8 @@ export class BlockchainMonitor extends Component { txId, ) => txId !== tx.id); + // remove txid from undefined state transactions map if present + // this resets the mempool drop detection counter for this txid delete this.#state.mempoolTxChecks[tx.id]; } @@ -110,17 +113,15 @@ export class BlockchainMonitor extends Component { // end } - // for txid in undefinedStateCheks.keys - // do - // # if a tx is not included in a block in dropChecks * `n` seconds, - // # then it's probably dropped from the mempool - // if undefinedStateCheks[txid] > maxChecks - // do # consider dropped - // plugins.all.onMempoolTxDrop(txid) - // delete undefinedStateCheks[txid] - // else # one more inconclusive check - // undefinedStateCheks[txid] += 1 - // end - // end + for (const txId of Object.keys(this.#state.mempoolTxChecks)) { + // if a tx is not included in a block in dropChecks * `n` seconds, + // then it's probably dropped from the mempool + if (this.#state.mempoolTxChecks[txId] > this.#maxMempoolTxChecks) { + // TODO raise mempool dropped, so we still need to keep track of txns not just the id? + delete this.#state.mempoolTxChecks[txId]; + } else { + this.#state.mempoolTxChecks[txId] += 1; + } + } } } diff --git a/src/blockchain/clients/explorer.ts b/src/blockchain/clients/explorer.ts index 9481f3b..18a4494 100644 --- a/src/blockchain/clients/explorer.ts +++ b/src/blockchain/clients/explorer.ts @@ -27,7 +27,7 @@ export class ExplorerClient extends Component implements BlockchainClient { }); } - getBlock(height: number): Promise { + getBlock(_height: number): Promise { throw new Error("Method not implemented."); } diff --git a/src/blockchain/clients/node.ts b/src/blockchain/clients/node.ts index d2d0616..943c9b0 100644 --- a/src/blockchain/clients/node.ts +++ b/src/blockchain/clients/node.ts @@ -23,7 +23,7 @@ export class NodeClient extends Component implements BlockchainClient { }); } - getBlock(height: number): Promise { + getBlock(_height: number): Promise { throw new Error("Method not implemented."); } diff --git a/src/blockchain/mod.ts b/src/blockchain/mod.ts index 293b0c3..81ccc9a 100644 --- a/src/blockchain/mod.ts +++ b/src/blockchain/mod.ts @@ -1 +1,2 @@ export * from "./clients/mod.ts"; +export * from "./blockchain_monitor.ts"; diff --git a/src/ergomatic.ts b/src/ergomatic.ts index 4d06ba1..8ca26ba 100644 --- a/src/ergomatic.ts +++ b/src/ergomatic.ts @@ -1,7 +1,11 @@ import { PluginManager, PluginManagerEvent } from "./plugins/plugin_manager.ts"; import { ErgomaticConfig } from "./config.ts"; import { Component } from "./component.ts"; -import { BlockchainClient, BlockchainProvider } from "./blockchain/mod.ts"; +import { + BlockchainClient, + BlockchainMonitor, + BlockchainProvider, +} from "./blockchain/mod.ts"; interface ErgomaticEvent { "component:error": CustomEvent<{ component: Component; error: Error }>; @@ -12,6 +16,7 @@ interface ErgomaticOpts { config: ErgomaticConfig; pluginManager?: PluginManager; blockchainClient?: BlockchainClient; + blockchainMonitor?: BlockchainMonitor; } export class Ergomatic extends Component { @@ -25,10 +30,18 @@ export class Ergomatic extends Component { new BlockchainProvider(opts.config); const pluginManager = opts.pluginManager ?? new PluginManager(opts.config, blockchainClient); + const blockchainMonitor = opts.blockchainMonitor ?? + new BlockchainMonitor(opts.config, blockchainClient); pluginManager.addEventListener("plugin:error", (e) => this.#bubbleEvent(e)); + // TODO: handle errors in plugin handlers + blockchainMonitor.addEventListener( + "monitor:mempool-tx", + ({ detail }) => + pluginManager.activePlugins.forEach((p) => p.onMempoolTx(detail)), + ); - this.#components = [pluginManager]; + this.#components = [pluginManager, blockchainMonitor]; } public async start(): Promise { diff --git a/src/plugins/_testing.ts b/src/plugins/_testing.ts index 9b34ab2..c87ed8b 100644 --- a/src/plugins/_testing.ts +++ b/src/plugins/_testing.ts @@ -25,7 +25,7 @@ export function mkTestPluginManager( opts?: TestPluginManagerOpts, ) { const pluginsStub = opts?.plugins?.length - ? stub(_internals, "plugins", () => opts?.plugins!) + ? stub(_internals, "managedPlugins", () => opts?.plugins!) : null; const cleanup = () => { diff --git a/src/plugins/plugin.ts b/src/plugins/plugin.ts index 9e826ba..cf5082f 100644 --- a/src/plugins/plugin.ts +++ b/src/plugins/plugin.ts @@ -1,5 +1,6 @@ import { Logger } from "std/log/mod.ts"; import { BlockchainClient } from "../blockchain/mod.ts"; +import { SignedTransaction } from "@fleet-sdk/common"; export interface PluginDescriptor { /** User friendly name of the plugin. */ @@ -54,5 +55,10 @@ export abstract class Plugin { return Promise.resolve(); } + /** Called when a new transaction is added to mempool. */ + onMempoolTx(_tx: SignedTransaction): Promise { + return Promise.resolve(); + } + abstract get descriptor(): PluginDescriptor; } diff --git a/src/plugins/plugin_manager.ts b/src/plugins/plugin_manager.ts index 92a2d61..3702a8f 100644 --- a/src/plugins/plugin_manager.ts +++ b/src/plugins/plugin_manager.ts @@ -23,7 +23,7 @@ export interface ManagedPlugin { export const _internals = { /** Allow mocking managed plugins in tests. */ - plugins(plugins: ManagedPlugin[]) { + managedPlugins(plugins: ManagedPlugin[]) { return plugins; }, }; @@ -87,8 +87,12 @@ export class PluginManager extends Component { await Promise.allSettled(promises); } - get #plugins() { - return _internals.plugins(this.#_plugins); + get activePlugins(): ReadonlyArray { + return this.#pluginsByState(PluginState.Running).map((p) => p.plugin); + } + + get #managedPlugins() { + return _internals.managedPlugins(this.#_plugins); } #handlePluginError(managedPlugin: ManagedPlugin, error: Error): void { @@ -102,7 +106,7 @@ export class PluginManager extends Component { } #pluginsByState(state: PluginState): ManagedPlugin[] { - return this.#plugins.filter((p) => p.state === state); + return this.#managedPlugins.filter((p) => p.state === state); } #createPlugin( diff --git a/src/plugins/plugin_manager_test.ts b/src/plugins/plugin_manager_test.ts index 382af96..bb00a52 100644 --- a/src/plugins/plugin_manager_test.ts +++ b/src/plugins/plugin_manager_test.ts @@ -164,4 +164,28 @@ describe("PluginManager", () => { } }); }); + + describe("activePlugins", () => { + it("should only return running plugins", () => { + const startedPlugin = mkTestManagedPlugin(PluginState.Running); + const startedPlugin2 = mkTestManagedPlugin(PluginState.Running); + const { pluginManager, cleanup } = mkTestPluginManager({ + plugins: [ + startedPlugin, + mkTestManagedPlugin(PluginState.Error), + mkTestManagedPlugin(PluginState.Stopped), + startedPlugin2, + ], + }); + + try { + assertEquals(pluginManager.activePlugins, [ + startedPlugin.plugin, + startedPlugin2.plugin, + ]); + } finally { + cleanup(); + } + }); + }); }); From e5cc3bac5766809aa12c9fca0d011966249e13d4 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Thu, 17 Aug 2023 18:26:06 +1000 Subject: [PATCH 04/15] remove default class creation --- src/blockchain/blockchain_monitor.ts | 6 +++--- src/plugins/plugin_manager.ts | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts index 0e14deb..04c3c89 100644 --- a/src/blockchain/blockchain_monitor.ts +++ b/src/blockchain/blockchain_monitor.ts @@ -1,7 +1,7 @@ import { SignedTransaction, TransactionId } from "@fleet-sdk/common"; import { Component } from "../component.ts"; import { ErgomaticConfig } from "../config.ts"; -import { BlockchainClient, BlockchainProvider } from "./clients/mod.ts"; +import { BlockchainClient } from "./clients/mod.ts"; interface MonitorState { currentHeight: number; @@ -27,7 +27,7 @@ export class BlockchainMonitor extends Component { constructor( config: ErgomaticConfig, - blockchainClient?: BlockchainClient, + blockchainClient: BlockchainClient, pollInterval: number = 10000, maxMempoolTxChecks: number = 10, ) { @@ -35,7 +35,7 @@ export class BlockchainMonitor extends Component { this.#pollInterval = pollInterval; this.#maxMempoolTxChecks = maxMempoolTxChecks; - this.#blockchainClient = blockchainClient ?? new BlockchainProvider(config); + this.#blockchainClient = blockchainClient; this.#state = { currentHeight: 0, mempoolTxDelivery: {}, diff --git a/src/plugins/plugin_manager.ts b/src/plugins/plugin_manager.ts index 3702a8f..a67ac08 100644 --- a/src/plugins/plugin_manager.ts +++ b/src/plugins/plugin_manager.ts @@ -4,7 +4,7 @@ import { createLogger } from "../log.ts"; import { ErgomaticConfigError } from "../error.ts"; import { pluginConstructorMap } from "../../plugins/mod.ts"; import { Component } from "../component.ts"; -import { BlockchainClient, BlockchainProvider } from "../blockchain/mod.ts"; +import { BlockchainClient } from "../blockchain/mod.ts"; export interface PluginManagerEvent { "plugin:error": CustomEvent<{ plugin: Plugin; error: Error }>; @@ -35,13 +35,12 @@ export class PluginManager extends Component { constructor( config: ErgomaticConfig, - blockchainClient?: BlockchainClient, + blockchainClient: BlockchainClient, pluginCtorMap = pluginConstructorMap, ) { super(config, "PluginManager"); - this.#blockchainClient = blockchainClient ?? - new BlockchainProvider(config); + this.#blockchainClient = blockchainClient; this.#pluginConstructorMap = pluginCtorMap; this.#_plugins = config.plugins.filter((p) => p.enabled).map(( pluginEntry, From 0bec2addbf52ec0d103210d72e74cab5588b49c7 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Thu, 17 Aug 2023 18:43:09 +1000 Subject: [PATCH 05/15] small restructure --- src/blockchain/_testing.ts | 10 ++++++ src/ergomatic.ts | 10 ++---- src/plugins/_testing.ts | 6 +++- src/plugins/plugin_manager.ts | 23 ++++++++++--- src/plugins/plugin_manager_test.ts | 55 +++++++++++++++--------------- 5 files changed, 62 insertions(+), 42 deletions(-) create mode 100644 src/blockchain/_testing.ts diff --git a/src/blockchain/_testing.ts b/src/blockchain/_testing.ts new file mode 100644 index 0000000..4873ee4 --- /dev/null +++ b/src/blockchain/_testing.ts @@ -0,0 +1,10 @@ +import { ErgomaticConfig } from "../config.ts"; +import { BlockchainMonitor } from "./blockchain_monitor.ts"; +import { BlockchainClient } from "./clients/mod.ts"; + +export function mkTestBlockchainMonitor( + config: ErgomaticConfig, + blockchainClient: BlockchainClient, +) { + return new BlockchainMonitor(config, blockchainClient); +} diff --git a/src/ergomatic.ts b/src/ergomatic.ts index 8ca26ba..9e7ee11 100644 --- a/src/ergomatic.ts +++ b/src/ergomatic.ts @@ -28,18 +28,12 @@ export class Ergomatic extends Component { const blockchainClient = opts.blockchainClient ?? new BlockchainProvider(opts.config); - const pluginManager = opts.pluginManager ?? - new PluginManager(opts.config, blockchainClient); const blockchainMonitor = opts.blockchainMonitor ?? new BlockchainMonitor(opts.config, blockchainClient); + const pluginManager = opts.pluginManager ?? + new PluginManager(opts.config, blockchainClient, blockchainMonitor); pluginManager.addEventListener("plugin:error", (e) => this.#bubbleEvent(e)); - // TODO: handle errors in plugin handlers - blockchainMonitor.addEventListener( - "monitor:mempool-tx", - ({ detail }) => - pluginManager.activePlugins.forEach((p) => p.onMempoolTx(detail)), - ); this.#components = [pluginManager, blockchainMonitor]; } diff --git a/src/plugins/_testing.ts b/src/plugins/_testing.ts index c87ed8b..ab21a78 100644 --- a/src/plugins/_testing.ts +++ b/src/plugins/_testing.ts @@ -6,6 +6,7 @@ import { PluginManager } from "./mod.ts"; import { stub } from "std/testing/mock.ts"; import { BlockchainClient, BlockchainProvider } from "../blockchain/mod.ts"; import { testConfig } from "../_testing.ts"; +import { mkTestBlockchainMonitor } from "../blockchain/_testing.ts"; export class TestPlugin extends Plugin { get descriptor(): PluginDescriptor { @@ -34,11 +35,14 @@ export function mkTestPluginManager( const config = opts?.config ?? testConfig(); const pluginMap = opts?.pluginMap ?? testPluginMap; + const client = new BlockchainProvider(config); + const monitor = mkTestBlockchainMonitor(config, client); return { pluginManager: new PluginManager( config, - new BlockchainProvider(config), + client, + monitor, pluginMap, ), cleanup, diff --git a/src/plugins/plugin_manager.ts b/src/plugins/plugin_manager.ts index a67ac08..3a51d8c 100644 --- a/src/plugins/plugin_manager.ts +++ b/src/plugins/plugin_manager.ts @@ -4,7 +4,7 @@ import { createLogger } from "../log.ts"; import { ErgomaticConfigError } from "../error.ts"; import { pluginConstructorMap } from "../../plugins/mod.ts"; import { Component } from "../component.ts"; -import { BlockchainClient } from "../blockchain/mod.ts"; +import { BlockchainClient, BlockchainMonitor } from "../blockchain/mod.ts"; export interface PluginManagerEvent { "plugin:error": CustomEvent<{ plugin: Plugin; error: Error }>; @@ -31,16 +31,19 @@ export const _internals = { export class PluginManager extends Component { readonly #pluginConstructorMap: Record; readonly #blockchainClient: BlockchainClient; + readonly #blockchainMonitor: BlockchainMonitor; #_plugins: ManagedPlugin[]; constructor( config: ErgomaticConfig, blockchainClient: BlockchainClient, + blockchainMonitor: BlockchainMonitor, pluginCtorMap = pluginConstructorMap, ) { super(config, "PluginManager"); this.#blockchainClient = blockchainClient; + this.#blockchainMonitor = blockchainMonitor; this.#pluginConstructorMap = pluginCtorMap; this.#_plugins = config.plugins.filter((p) => p.enabled).map(( pluginEntry, @@ -53,6 +56,20 @@ export class PluginManager extends Component { public async start(): Promise { this.logger.debug("Starting plugins"); + // these event listeners aren't cleaned up in stop() + // probably wont be a issue because starting/stopping ergomatic is unlikely to be a thing. + // But if it does happen for some reason then event handlers will be called twice after the 2nd start. + // TODO: fix this if it is a problem + this.#blockchainMonitor.addEventListener( + "monitor:mempool-tx", + ({ detail }) => + this.#pluginsByState(PluginState.Running).forEach((p) => + p.plugin.onMempoolTx(detail).catch((e) => + this.#handlePluginError(p, e) + ) + ), + ); + const promises = this.#pluginsByState(PluginState.Stopped).map(async ( managedPlugin, ) => { @@ -86,10 +103,6 @@ export class PluginManager extends Component { await Promise.allSettled(promises); } - get activePlugins(): ReadonlyArray { - return this.#pluginsByState(PluginState.Running).map((p) => p.plugin); - } - get #managedPlugins() { return _internals.managedPlugins(this.#_plugins); } diff --git a/src/plugins/plugin_manager_test.ts b/src/plugins/plugin_manager_test.ts index bb00a52..0047fe3 100644 --- a/src/plugins/plugin_manager_test.ts +++ b/src/plugins/plugin_manager_test.ts @@ -10,16 +10,23 @@ import { mkTestPluginManager, testPluginMap, } from "./_testing.ts"; -import { BlockchainClient, BlockchainProvider } from "../blockchain/mod.ts"; +import { + BlockchainClient, + BlockchainMonitor, + BlockchainProvider, +} from "../blockchain/mod.ts"; import { testConfig } from "../_testing.ts"; +import { mkTestBlockchainMonitor } from "../blockchain/_testing.ts"; describe("PluginManager", () => { let config: ErgomaticConfig; let blockchainClient: BlockchainClient; + let blockchainMonitor: BlockchainMonitor; beforeEach(() => { config = testConfig(); blockchainClient = new BlockchainProvider(config); + blockchainMonitor = mkTestBlockchainMonitor(config, blockchainClient); }); describe("constructor", () => { @@ -27,7 +34,13 @@ describe("PluginManager", () => { config.plugins[0]!.id = "invalid"; assertThrows( - () => new PluginManager(config, blockchainClient, testPluginMap), + () => + new PluginManager( + config, + blockchainClient, + blockchainMonitor, + testPluginMap, + ), ErgomaticConfigError, "Unknown plugin ID", ); @@ -36,10 +49,20 @@ describe("PluginManager", () => { config.plugins[0]!.id = "invalid"; config.plugins[0]!.enabled = false; - new PluginManager(config, blockchainClient, testPluginMap); + new PluginManager( + config, + blockchainClient, + blockchainMonitor, + testPluginMap, + ); }); it("should create PluginManager instance", () => { - new PluginManager(config, blockchainClient, testPluginMap); + new PluginManager( + config, + blockchainClient, + blockchainMonitor, + testPluginMap, + ); }); }); @@ -164,28 +187,4 @@ describe("PluginManager", () => { } }); }); - - describe("activePlugins", () => { - it("should only return running plugins", () => { - const startedPlugin = mkTestManagedPlugin(PluginState.Running); - const startedPlugin2 = mkTestManagedPlugin(PluginState.Running); - const { pluginManager, cleanup } = mkTestPluginManager({ - plugins: [ - startedPlugin, - mkTestManagedPlugin(PluginState.Error), - mkTestManagedPlugin(PluginState.Stopped), - startedPlugin2, - ], - }); - - try { - assertEquals(pluginManager.activePlugins, [ - startedPlugin.plugin, - startedPlugin2.plugin, - ]); - } finally { - cleanup(); - } - }); - }); }); From 9c464d485d6425b9596cc4920f831cb930a05060 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Thu, 17 Aug 2023 18:54:08 +1000 Subject: [PATCH 06/15] setup event listeners once --- src/plugins/plugin.ts | 5 +++++ src/plugins/plugin_manager.ts | 40 +++++++++++++++++++++-------------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/plugins/plugin.ts b/src/plugins/plugin.ts index cf5082f..0f92ad6 100644 --- a/src/plugins/plugin.ts +++ b/src/plugins/plugin.ts @@ -60,5 +60,10 @@ export abstract class Plugin { return Promise.resolve(); } + /** Called when a transaction is dropped from mempool. */ + onMempoolTxDrop(_tx: SignedTransaction): Promise { + return Promise.resolve(); + } + abstract get descriptor(): PluginDescriptor; } diff --git a/src/plugins/plugin_manager.ts b/src/plugins/plugin_manager.ts index 3a51d8c..570822b 100644 --- a/src/plugins/plugin_manager.ts +++ b/src/plugins/plugin_manager.ts @@ -31,7 +31,6 @@ export const _internals = { export class PluginManager extends Component { readonly #pluginConstructorMap: Record; readonly #blockchainClient: BlockchainClient; - readonly #blockchainMonitor: BlockchainMonitor; #_plugins: ManagedPlugin[]; constructor( @@ -43,7 +42,6 @@ export class PluginManager extends Component { super(config, "PluginManager"); this.#blockchainClient = blockchainClient; - this.#blockchainMonitor = blockchainMonitor; this.#pluginConstructorMap = pluginCtorMap; this.#_plugins = config.plugins.filter((p) => p.enabled).map(( pluginEntry, @@ -51,25 +49,13 @@ export class PluginManager extends Component { plugin: this.#createPlugin(config, pluginEntry), state: PluginState.Stopped, })); + + this.#wireEvents(blockchainMonitor); } public async start(): Promise { this.logger.debug("Starting plugins"); - // these event listeners aren't cleaned up in stop() - // probably wont be a issue because starting/stopping ergomatic is unlikely to be a thing. - // But if it does happen for some reason then event handlers will be called twice after the 2nd start. - // TODO: fix this if it is a problem - this.#blockchainMonitor.addEventListener( - "monitor:mempool-tx", - ({ detail }) => - this.#pluginsByState(PluginState.Running).forEach((p) => - p.plugin.onMempoolTx(detail).catch((e) => - this.#handlePluginError(p, e) - ) - ), - ); - const promises = this.#pluginsByState(PluginState.Stopped).map(async ( managedPlugin, ) => { @@ -141,4 +127,26 @@ export class PluginManager extends Component { logger: createLogger(pluginEntry.id, config.logLevel), }); } + + #wireEvents(blockchainMonitor: BlockchainMonitor) { + blockchainMonitor.addEventListener( + "monitor:mempool-tx", + ({ detail }) => + this.#pluginsByState(PluginState.Running).forEach((p) => + p.plugin.onMempoolTx(detail).catch((e) => + this.#handlePluginError(p, e) + ) + ), + ); + + blockchainMonitor.addEventListener( + "monitor:mempool-tx-drop", + ({ detail }) => + this.#pluginsByState(PluginState.Running).forEach((p) => + p.plugin.onMempoolTxDrop(detail).catch((e) => + this.#handlePluginError(p, e) + ) + ), + ); + } } From 23684d2e34930122e11c645ffcac300929d5cbd7 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Thu, 17 Aug 2023 18:54:41 +1000 Subject: [PATCH 07/15] rename method --- src/plugins/plugin_manager.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/plugins/plugin_manager.ts b/src/plugins/plugin_manager.ts index 570822b..6ddd85a 100644 --- a/src/plugins/plugin_manager.ts +++ b/src/plugins/plugin_manager.ts @@ -50,7 +50,7 @@ export class PluginManager extends Component { state: PluginState.Stopped, })); - this.#wireEvents(blockchainMonitor); + this.#setupEventHandlers(blockchainMonitor); } public async start(): Promise { @@ -128,7 +128,7 @@ export class PluginManager extends Component { }); } - #wireEvents(blockchainMonitor: BlockchainMonitor) { + #setupEventHandlers(blockchainMonitor: BlockchainMonitor) { blockchainMonitor.addEventListener( "monitor:mempool-tx", ({ detail }) => From b711da4ae234d2bc7da8d5116837f83d1730f15e Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Thu, 17 Aug 2023 19:11:08 +1000 Subject: [PATCH 08/15] add remaining events --- src/blockchain/blockchain_monitor.ts | 5 +++-- src/plugins/plugin.ts | 10 ++++++++++ src/plugins/plugin_manager.ts | 19 ++++++++++++++++++- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts index 04c3c89..d6d246c 100644 --- a/src/blockchain/blockchain_monitor.ts +++ b/src/blockchain/blockchain_monitor.ts @@ -15,7 +15,8 @@ interface MonitorState { interface BlockchainMonitorEvent { "monitor:mempool-tx": CustomEvent; "monitor:mempool-tx-drop": CustomEvent; - "monitor:block": CustomEvent; + "monitor:included-tx": CustomEvent; + "monitor:new-block": CustomEvent; } export class BlockchainMonitor extends Component { @@ -95,7 +96,7 @@ export class BlockchainMonitor extends Component { const newBlock = await this.#blockchainClient.getBlock(currentHeight); this.dispatchEvent( - new CustomEvent("monitor:block", { detail: newBlock }), + new CustomEvent("monitor:new-block", { detail: newBlock }), ); this.#state.currentHeight = currentHeight; diff --git a/src/plugins/plugin.ts b/src/plugins/plugin.ts index 0f92ad6..e8f2256 100644 --- a/src/plugins/plugin.ts +++ b/src/plugins/plugin.ts @@ -65,5 +65,15 @@ export abstract class Plugin { return Promise.resolve(); } + /** Called when a transaction has been included in a block. */ + onIncludedTx(_tx: SignedTransaction): Promise { + return Promise.resolve(); + } + + /** Called when a new block is added to the blockchain. */ + onNewBlock(block: any): Promise { + return Promise.resolve(); + } + abstract get descriptor(): PluginDescriptor; } diff --git a/src/plugins/plugin_manager.ts b/src/plugins/plugin_manager.ts index 6ddd85a..d777784 100644 --- a/src/plugins/plugin_manager.ts +++ b/src/plugins/plugin_manager.ts @@ -138,7 +138,6 @@ export class PluginManager extends Component { ) ), ); - blockchainMonitor.addEventListener( "monitor:mempool-tx-drop", ({ detail }) => @@ -148,5 +147,23 @@ export class PluginManager extends Component { ) ), ); + blockchainMonitor.addEventListener( + "monitor:included-tx", + ({ detail }) => + this.#pluginsByState(PluginState.Running).forEach((p) => + p.plugin.onIncludedTx(detail).catch((e) => + this.#handlePluginError(p, e) + ) + ), + ); + blockchainMonitor.addEventListener( + "monitor:new-block", + ({ detail }) => + this.#pluginsByState(PluginState.Running).forEach((p) => + p.plugin.onNewBlock(detail).catch((e) => + this.#handlePluginError(p, e) + ) + ), + ); } } From da2f2aa758376ceb794c0c5704140d33e375fc39 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Thu, 17 Aug 2023 19:28:17 +1000 Subject: [PATCH 09/15] impl rest of monitor func --- src/blockchain/blockchain_monitor.ts | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts index d6d246c..0330ff9 100644 --- a/src/blockchain/blockchain_monitor.ts +++ b/src/blockchain/blockchain_monitor.ts @@ -93,7 +93,9 @@ export class BlockchainMonitor extends Component { const currentHeight = await this.#blockchainClient.getCurrentHeight(); if (currentHeight > this.#state.currentHeight) { - const newBlock = await this.#blockchainClient.getBlock(currentHeight); + const newBlock = await this.#blockchainClient.getBlock( + currentHeight, + ) as any; this.dispatchEvent( new CustomEvent("monitor:new-block", { detail: newBlock }), @@ -101,17 +103,18 @@ export class BlockchainMonitor extends Component { this.#state.currentHeight = currentHeight; - // for tx in newBlock.txs - // do - // plugins.all.onIncludedTx(tx) + for (const tx of newBlock.blockTransactions) { + this.dispatchEvent( + new CustomEvent("monitor:included-tx", { detail: tx }), + ); - // # stop tracking txid mempool delivery for this txid - // delete mempoolDelivery[tx.txid] + // stop tracking mempool delivery for this txid + delete this.#state.mempoolTxDelivery[tx.id]; - // # prevent `onMempoolTxDrop` event for this txid as - // # it is now included in a block - // delete undefinedStateCheks[tx.txid] - // end + // prevent `onMempoolTxDrop` event for this txid as + // it is now included in a block + delete this.#state.mempoolTxChecks[tx.id]; + } } for (const txId of Object.keys(this.#state.mempoolTxChecks)) { From 221243c6a9aa9977ec155d760ce6e17976f012c9 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Fri, 18 Aug 2023 15:41:33 +1000 Subject: [PATCH 10/15] split out plugin blockchain client, make mempool func generator --- plugins/example_plugin/mod.ts | 4 +- src/blockchain/blockchain_monitor.ts | 10 ++++- src/blockchain/clients/blockchain_client.ts | 41 ++++++++++++++++++- src/blockchain/clients/blockchain_provider.ts | 30 +++++++------- src/blockchain/clients/explorer.ts | 2 +- src/blockchain/clients/node.ts | 26 +++++++++--- src/ergomatic.ts | 9 +++- src/plugins/_testing.ts | 13 ++++-- src/plugins/plugin.ts | 10 ++--- src/plugins/plugin_manager.ts | 10 ++--- src/plugins/plugin_manager_test.ts | 12 ++++-- 11 files changed, 123 insertions(+), 44 deletions(-) diff --git a/plugins/example_plugin/mod.ts b/plugins/example_plugin/mod.ts index 00380f6..dbfdea2 100644 --- a/plugins/example_plugin/mod.ts +++ b/plugins/example_plugin/mod.ts @@ -25,7 +25,9 @@ export class ExamplePlugin extends Plugin { const { tokenId, exitAtPage } = this.config; let currentPage = 0; - for await (const page of this.blockchainClient.getBoxesByTokenId(tokenId)) { + for await ( + const page of this.blockchainProvider.getBoxesByTokenId(tokenId) + ) { currentPage++; this.logger.info( diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts index 0330ff9..f1d13c1 100644 --- a/src/blockchain/blockchain_monitor.ts +++ b/src/blockchain/blockchain_monitor.ts @@ -61,7 +61,15 @@ export class BlockchainMonitor extends Component { async #monitor() { this.logger.debug("Gathering blockchain state"); - const mempool = await this.#blockchainClient.getMempool(); + const mempool = []; + // the loop following this where we go through all the mempool txs could + // go in here as well but each time the `monitor:mempool-tx` event is raised + // it could provide a different mempool snapshot to plugins. + // + // instead, collect the full mempool first so the plugins can receive a full consistent snapshot. + for await (const page of this.#blockchainClient.getMempool()) { + mempool.push(...page); + } for (const tx of mempool) { if (!this.#state.mempoolTxDelivery[tx.id]) { diff --git a/src/blockchain/clients/blockchain_client.ts b/src/blockchain/clients/blockchain_client.ts index 734a4fb..4ad4e1a 100644 --- a/src/blockchain/clients/blockchain_client.ts +++ b/src/blockchain/clients/blockchain_client.ts @@ -5,6 +5,10 @@ import { TokenId, TransactionId, } from "@fleet-sdk/common"; +import { Component } from "../../component.ts"; +import { ErgomaticConfig } from "../../config.ts"; +import { ExplorerClient } from "./explorer.ts"; +import { NodeClient } from "./node.ts"; export interface BlockchainClient { submitTx(signedTx: SignedTransaction): Promise; @@ -13,9 +17,44 @@ export interface BlockchainClient { tokenId: TokenId, ): AsyncGenerator[]>; - getMempool(): Promise; + getMempool(): AsyncGenerator; getCurrentHeight(): Promise; getBlock(height: number): Promise; } + +export class DefaultBlockchainClient extends Component + implements BlockchainClient { + readonly #explorer: BlockchainClient; + readonly #node: BlockchainClient; + + constructor(config: ErgomaticConfig) { + super(config, "DefaultBlockchainProvider"); + + this.#explorer = new ExplorerClient(config); + this.#node = new NodeClient(config); + } + + getBlock(height: number): Promise { + return this.#node.getBlock(height); + } + + getCurrentHeight(): Promise { + return this.#node.getCurrentHeight(); + } + + getMempool(): AsyncGenerator { + return this.#node.getMempool(); + } + + submitTx(signedTx: SignedTransaction): Promise { + return this.#node.submitTx(signedTx); + } + + getBoxesByTokenId( + tokenId: TokenId, + ): AsyncGenerator[]> { + return this.#explorer.getBoxesByTokenId(tokenId); + } +} diff --git a/src/blockchain/clients/blockchain_provider.ts b/src/blockchain/clients/blockchain_provider.ts index dc0392a..01c9547 100644 --- a/src/blockchain/clients/blockchain_provider.ts +++ b/src/blockchain/clients/blockchain_provider.ts @@ -11,29 +11,31 @@ import { BlockchainClient } from "./blockchain_client.ts"; import { ExplorerClient } from "./explorer.ts"; import { NodeClient } from "./node.ts"; -export class BlockchainProvider extends Component implements BlockchainClient { +/** + * BlockchainProvider is a blockchain client that is exclusively used by plugins. + * The API is intentionally restricted to prevent plugins issuing excessive API requests + * that are instead provided to plugins via a snapshot of the blockchain state. + */ +export interface BlockchainProvider { + submitTx(signedTx: SignedTransaction): Promise; + + getBoxesByTokenId( + tokenId: TokenId, + ): AsyncGenerator[]>; +} + +export class DefaultBlockchainProvider extends Component + implements BlockchainProvider { readonly #explorer: BlockchainClient; readonly #node: BlockchainClient; constructor(config: ErgomaticConfig) { - super(config, "BlockchainProvider"); + super(config, "DefaultBlockchainProvider"); this.#explorer = new ExplorerClient(config); this.#node = new NodeClient(config); } - getBlock(height: number): Promise { - return this.#node.getBlock(height); - } - - getCurrentHeight(): Promise { - return this.#node.getCurrentHeight(); - } - - getMempool(): Promise { - return this.#node.getMempool(); - } - submitTx(signedTx: SignedTransaction): Promise { return this.#node.submitTx(signedTx); } diff --git a/src/blockchain/clients/explorer.ts b/src/blockchain/clients/explorer.ts index 18a4494..6b05410 100644 --- a/src/blockchain/clients/explorer.ts +++ b/src/blockchain/clients/explorer.ts @@ -35,7 +35,7 @@ export class ExplorerClient extends Component implements BlockchainClient { throw new Error("Method not implemented."); } - getMempool(): Promise { + getMempool(): AsyncGenerator { throw new Error(`${this.name} does not support getMempool operation`); } diff --git a/src/blockchain/clients/node.ts b/src/blockchain/clients/node.ts index 943c9b0..ee61fd3 100644 --- a/src/blockchain/clients/node.ts +++ b/src/blockchain/clients/node.ts @@ -50,12 +50,26 @@ export class NodeClient extends Component implements BlockchainClient { ); } - getMempool(): Promise { - // TODO: this might need pagination - return this.#http.get( - "/transactions/unconfirmed", - this.#defaultRequestConfig, - ); + async *getMempool(): AsyncGenerator { + let offset = 0; + const limit = 100; // highest value supported by node + + while (true) { + const { data } = await this.#http.get("/transactions/unconfirmed", { + params: { offset, limit }, + ...this.#defaultRequestConfig, + }); + + if (data.length) { + yield data; + } + + if (data.length < limit) { + break; + } + + offset += limit; + } } get #defaultRequestConfig() { diff --git a/src/ergomatic.ts b/src/ergomatic.ts index 9e7ee11..45adcc2 100644 --- a/src/ergomatic.ts +++ b/src/ergomatic.ts @@ -5,6 +5,8 @@ import { BlockchainClient, BlockchainMonitor, BlockchainProvider, + DefaultBlockchainClient, + DefaultBlockchainProvider, } from "./blockchain/mod.ts"; interface ErgomaticEvent { @@ -16,6 +18,7 @@ interface ErgomaticOpts { config: ErgomaticConfig; pluginManager?: PluginManager; blockchainClient?: BlockchainClient; + blockchainProvider?: BlockchainProvider; blockchainMonitor?: BlockchainMonitor; } @@ -26,12 +29,14 @@ export class Ergomatic extends Component { constructor(opts: ErgomaticOpts) { super(opts.config, "Ergomatic"); + const blockchainProvider = opts.blockchainClient ?? + new DefaultBlockchainProvider(opts.config); const blockchainClient = opts.blockchainClient ?? - new BlockchainProvider(opts.config); + new DefaultBlockchainClient(opts.config); const blockchainMonitor = opts.blockchainMonitor ?? new BlockchainMonitor(opts.config, blockchainClient); const pluginManager = opts.pluginManager ?? - new PluginManager(opts.config, blockchainClient, blockchainMonitor); + new PluginManager(opts.config, blockchainProvider, blockchainMonitor); pluginManager.addEventListener("plugin:error", (e) => this.#bubbleEvent(e)); diff --git a/src/plugins/_testing.ts b/src/plugins/_testing.ts index ab21a78..bf23f5e 100644 --- a/src/plugins/_testing.ts +++ b/src/plugins/_testing.ts @@ -4,7 +4,11 @@ import { _internals, ManagedPlugin, PluginState } from "./plugin_manager.ts"; import { ErgomaticConfig } from "../config.ts"; import { PluginManager } from "./mod.ts"; import { stub } from "std/testing/mock.ts"; -import { BlockchainClient, BlockchainProvider } from "../blockchain/mod.ts"; +import { + BlockchainProvider, + DefaultBlockchainClient, + DefaultBlockchainProvider, +} from "../blockchain/mod.ts"; import { testConfig } from "../_testing.ts"; import { mkTestBlockchainMonitor } from "../blockchain/_testing.ts"; @@ -35,13 +39,14 @@ export function mkTestPluginManager( const config = opts?.config ?? testConfig(); const pluginMap = opts?.pluginMap ?? testPluginMap; - const client = new BlockchainProvider(config); + const provider = new DefaultBlockchainProvider(config); + const client = new DefaultBlockchainClient(config); const monitor = mkTestBlockchainMonitor(config, client); return { pluginManager: new PluginManager( config, - client, + provider, monitor, pluginMap, ), @@ -54,7 +59,7 @@ export function mkTestManagedPlugin( ): ManagedPlugin { const plugin = new TestPlugin({ logger: getLogger(), - blockchainClient: {} as BlockchainClient, // TODO + blockchainProvider: {} as BlockchainProvider, // TODO config: {}, }); diff --git a/src/plugins/plugin.ts b/src/plugins/plugin.ts index e8f2256..0c6a0e7 100644 --- a/src/plugins/plugin.ts +++ b/src/plugins/plugin.ts @@ -1,5 +1,5 @@ import { Logger } from "std/log/mod.ts"; -import { BlockchainClient } from "../blockchain/mod.ts"; +import { BlockchainProvider } from "../blockchain/mod.ts"; import { SignedTransaction } from "@fleet-sdk/common"; export interface PluginDescriptor { @@ -21,7 +21,7 @@ export type PluginConstructor = { export interface PluginArgs { config: T; logger: Logger; - blockchainClient: BlockchainClient; + blockchainProvider: BlockchainProvider; } export abstract class Plugin { @@ -31,12 +31,12 @@ export abstract class Plugin { /** Logger configured to log output of this plugin. */ protected readonly logger: Logger; - protected readonly blockchainClient: BlockchainClient; + protected readonly blockchainProvider: BlockchainProvider; - constructor({ config, logger, blockchainClient }: PluginArgs) { + constructor({ config, logger, blockchainProvider }: PluginArgs) { this.config = config; this.logger = logger; - this.blockchainClient = blockchainClient; + this.blockchainProvider = blockchainProvider; } /** diff --git a/src/plugins/plugin_manager.ts b/src/plugins/plugin_manager.ts index d777784..54ea72d 100644 --- a/src/plugins/plugin_manager.ts +++ b/src/plugins/plugin_manager.ts @@ -4,7 +4,7 @@ import { createLogger } from "../log.ts"; import { ErgomaticConfigError } from "../error.ts"; import { pluginConstructorMap } from "../../plugins/mod.ts"; import { Component } from "../component.ts"; -import { BlockchainClient, BlockchainMonitor } from "../blockchain/mod.ts"; +import { BlockchainMonitor, BlockchainProvider } from "../blockchain/mod.ts"; export interface PluginManagerEvent { "plugin:error": CustomEvent<{ plugin: Plugin; error: Error }>; @@ -30,18 +30,18 @@ export const _internals = { export class PluginManager extends Component { readonly #pluginConstructorMap: Record; - readonly #blockchainClient: BlockchainClient; + readonly #blockchainProvider: BlockchainProvider; #_plugins: ManagedPlugin[]; constructor( config: ErgomaticConfig, - blockchainClient: BlockchainClient, + blockchainProvider: BlockchainProvider, blockchainMonitor: BlockchainMonitor, pluginCtorMap = pluginConstructorMap, ) { super(config, "PluginManager"); - this.#blockchainClient = blockchainClient; + this.#blockchainProvider = blockchainProvider; this.#pluginConstructorMap = pluginCtorMap; this.#_plugins = config.plugins.filter((p) => p.enabled).map(( pluginEntry, @@ -123,7 +123,7 @@ export class PluginManager extends Component { return new pluginCtor({ config: pluginEntry.config, - blockchainClient: this.#blockchainClient, + blockchainProvider: this.#blockchainProvider, logger: createLogger(pluginEntry.id, config.logLevel), }); } diff --git a/src/plugins/plugin_manager_test.ts b/src/plugins/plugin_manager_test.ts index 0047fe3..93cba25 100644 --- a/src/plugins/plugin_manager_test.ts +++ b/src/plugins/plugin_manager_test.ts @@ -14,18 +14,22 @@ import { BlockchainClient, BlockchainMonitor, BlockchainProvider, + DefaultBlockchainClient, + DefaultBlockchainProvider, } from "../blockchain/mod.ts"; import { testConfig } from "../_testing.ts"; import { mkTestBlockchainMonitor } from "../blockchain/_testing.ts"; describe("PluginManager", () => { let config: ErgomaticConfig; + let blockchainProvider: BlockchainProvider; let blockchainClient: BlockchainClient; let blockchainMonitor: BlockchainMonitor; beforeEach(() => { config = testConfig(); - blockchainClient = new BlockchainProvider(config); + blockchainProvider = new DefaultBlockchainProvider(config); + blockchainClient = new DefaultBlockchainClient(config); blockchainMonitor = mkTestBlockchainMonitor(config, blockchainClient); }); @@ -37,7 +41,7 @@ describe("PluginManager", () => { () => new PluginManager( config, - blockchainClient, + blockchainProvider, blockchainMonitor, testPluginMap, ), @@ -51,7 +55,7 @@ describe("PluginManager", () => { new PluginManager( config, - blockchainClient, + blockchainProvider, blockchainMonitor, testPluginMap, ); @@ -59,7 +63,7 @@ describe("PluginManager", () => { it("should create PluginManager instance", () => { new PluginManager( config, - blockchainClient, + blockchainProvider, blockchainMonitor, testPluginMap, ); From 5d8709effaf44207a9d7b43aa0dbe571524bfe4d Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Fri, 18 Aug 2023 15:47:14 +1000 Subject: [PATCH 11/15] get all mempool txns in one request with high limit --- src/blockchain/blockchain_monitor.ts | 8 ++++---- src/blockchain/clients/node.ts | 6 +++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts index f1d13c1..9c7d6db 100644 --- a/src/blockchain/blockchain_monitor.ts +++ b/src/blockchain/blockchain_monitor.ts @@ -21,7 +21,7 @@ interface BlockchainMonitorEvent { export class BlockchainMonitor extends Component { readonly #blockchainClient: BlockchainClient; - readonly #pollInterval: number; + readonly #pollIntervalMs: number; readonly #maxMempoolTxChecks: number; readonly #state: MonitorState; #taskHandle?: number; @@ -29,12 +29,12 @@ export class BlockchainMonitor extends Component { constructor( config: ErgomaticConfig, blockchainClient: BlockchainClient, - pollInterval: number = 10000, + pollIntervalMs: number = 500, maxMempoolTxChecks: number = 10, ) { super(config, "BlockchainMonitor"); - this.#pollInterval = pollInterval; + this.#pollIntervalMs = pollIntervalMs; this.#maxMempoolTxChecks = maxMempoolTxChecks; this.#blockchainClient = blockchainClient; this.#state = { @@ -47,7 +47,7 @@ export class BlockchainMonitor extends Component { start(): Promise { // TODO: raise component:error event if monitor throws exception - this.#taskHandle = setInterval(() => this.#monitor(), this.#pollInterval); + this.#taskHandle = setInterval(() => this.#monitor(), this.#pollIntervalMs); return super.start(); } diff --git a/src/blockchain/clients/node.ts b/src/blockchain/clients/node.ts index ee61fd3..5dcbfb4 100644 --- a/src/blockchain/clients/node.ts +++ b/src/blockchain/clients/node.ts @@ -52,7 +52,11 @@ export class NodeClient extends Component implements BlockchainClient { async *getMempool(): AsyncGenerator { let offset = 0; - const limit = 100; // highest value supported by node + // the max limit is defined as 100 in the node api spec but is not enforced + // take advantage of this to reduce the number of requests + // this function is still paginated with the expectation that this could + // be changed in the future. + const limit = 100000; while (true) { const { data } = await this.#http.get("/transactions/unconfirmed", { From 0f4c8bded91b8d7921dcb3e87e67c67fc6238d67 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Fri, 18 Aug 2023 16:00:04 +1000 Subject: [PATCH 12/15] rename interface to restricted blockchain client --- src/blockchain/clients/blockchain_client.ts | 9 ++++++- src/blockchain/clients/blockchain_provider.ts | 24 ++++++------------- src/ergomatic.ts | 5 ++-- src/plugins/_testing.ts | 3 +-- src/plugins/plugin_manager_test.ts | 3 +-- 5 files changed, 19 insertions(+), 25 deletions(-) diff --git a/src/blockchain/clients/blockchain_client.ts b/src/blockchain/clients/blockchain_client.ts index 4ad4e1a..622998d 100644 --- a/src/blockchain/clients/blockchain_client.ts +++ b/src/blockchain/clients/blockchain_client.ts @@ -10,13 +10,20 @@ import { ErgomaticConfig } from "../../config.ts"; import { ExplorerClient } from "./explorer.ts"; import { NodeClient } from "./node.ts"; -export interface BlockchainClient { +/** + * RestrictedBlockchainClient is a blockchain client that is exclusively used by plugins. + * The API is intentionally restricted to prevent plugins issuing excessive API requests + * that are instead provided to plugins via a snapshot of the blockchain state. + */ +export interface RestrictedBlockchainClient { submitTx(signedTx: SignedTransaction): Promise; getBoxesByTokenId( tokenId: TokenId, ): AsyncGenerator[]>; +} +export interface BlockchainClient extends RestrictedBlockchainClient { getMempool(): AsyncGenerator; getCurrentHeight(): Promise; diff --git a/src/blockchain/clients/blockchain_provider.ts b/src/blockchain/clients/blockchain_provider.ts index 01c9547..4290ed2 100644 --- a/src/blockchain/clients/blockchain_provider.ts +++ b/src/blockchain/clients/blockchain_provider.ts @@ -7,30 +7,20 @@ import { } from "@fleet-sdk/common"; import { Component } from "../../component.ts"; import { ErgomaticConfig } from "../../config.ts"; -import { BlockchainClient } from "./blockchain_client.ts"; +import { + BlockchainClient, + RestrictedBlockchainClient, +} from "./blockchain_client.ts"; import { ExplorerClient } from "./explorer.ts"; import { NodeClient } from "./node.ts"; -/** - * BlockchainProvider is a blockchain client that is exclusively used by plugins. - * The API is intentionally restricted to prevent plugins issuing excessive API requests - * that are instead provided to plugins via a snapshot of the blockchain state. - */ -export interface BlockchainProvider { - submitTx(signedTx: SignedTransaction): Promise; - - getBoxesByTokenId( - tokenId: TokenId, - ): AsyncGenerator[]>; -} - -export class DefaultBlockchainProvider extends Component - implements BlockchainProvider { +export class BlockchainProvider extends Component + implements RestrictedBlockchainClient { readonly #explorer: BlockchainClient; readonly #node: BlockchainClient; constructor(config: ErgomaticConfig) { - super(config, "DefaultBlockchainProvider"); + super(config, "BlockchainProvider"); this.#explorer = new ExplorerClient(config); this.#node = new NodeClient(config); diff --git a/src/ergomatic.ts b/src/ergomatic.ts index 45adcc2..26adf28 100644 --- a/src/ergomatic.ts +++ b/src/ergomatic.ts @@ -6,7 +6,6 @@ import { BlockchainMonitor, BlockchainProvider, DefaultBlockchainClient, - DefaultBlockchainProvider, } from "./blockchain/mod.ts"; interface ErgomaticEvent { @@ -29,8 +28,8 @@ export class Ergomatic extends Component { constructor(opts: ErgomaticOpts) { super(opts.config, "Ergomatic"); - const blockchainProvider = opts.blockchainClient ?? - new DefaultBlockchainProvider(opts.config); + const blockchainProvider = opts.blockchainProvider ?? + new BlockchainProvider(opts.config); const blockchainClient = opts.blockchainClient ?? new DefaultBlockchainClient(opts.config); const blockchainMonitor = opts.blockchainMonitor ?? diff --git a/src/plugins/_testing.ts b/src/plugins/_testing.ts index bf23f5e..2a57e2a 100644 --- a/src/plugins/_testing.ts +++ b/src/plugins/_testing.ts @@ -7,7 +7,6 @@ import { stub } from "std/testing/mock.ts"; import { BlockchainProvider, DefaultBlockchainClient, - DefaultBlockchainProvider, } from "../blockchain/mod.ts"; import { testConfig } from "../_testing.ts"; import { mkTestBlockchainMonitor } from "../blockchain/_testing.ts"; @@ -39,7 +38,7 @@ export function mkTestPluginManager( const config = opts?.config ?? testConfig(); const pluginMap = opts?.pluginMap ?? testPluginMap; - const provider = new DefaultBlockchainProvider(config); + const provider = new BlockchainProvider(config); const client = new DefaultBlockchainClient(config); const monitor = mkTestBlockchainMonitor(config, client); diff --git a/src/plugins/plugin_manager_test.ts b/src/plugins/plugin_manager_test.ts index 93cba25..4d09004 100644 --- a/src/plugins/plugin_manager_test.ts +++ b/src/plugins/plugin_manager_test.ts @@ -15,7 +15,6 @@ import { BlockchainMonitor, BlockchainProvider, DefaultBlockchainClient, - DefaultBlockchainProvider, } from "../blockchain/mod.ts"; import { testConfig } from "../_testing.ts"; import { mkTestBlockchainMonitor } from "../blockchain/_testing.ts"; @@ -28,7 +27,7 @@ describe("PluginManager", () => { beforeEach(() => { config = testConfig(); - blockchainProvider = new DefaultBlockchainProvider(config); + blockchainProvider = new BlockchainProvider(config); blockchainClient = new DefaultBlockchainClient(config); blockchainMonitor = mkTestBlockchainMonitor(config, blockchainClient); }); From 437854076cafaa3cde693945bb3aa2b59972109c Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Fri, 18 Aug 2023 16:01:50 +1000 Subject: [PATCH 13/15] add comment --- src/blockchain/clients/blockchain_provider.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/blockchain/clients/blockchain_provider.ts b/src/blockchain/clients/blockchain_provider.ts index 4290ed2..8aebcd0 100644 --- a/src/blockchain/clients/blockchain_provider.ts +++ b/src/blockchain/clients/blockchain_provider.ts @@ -14,6 +14,7 @@ import { import { ExplorerClient } from "./explorer.ts"; import { NodeClient } from "./node.ts"; +/** Restricted blockchain client used by plugins */ export class BlockchainProvider extends Component implements RestrictedBlockchainClient { readonly #explorer: BlockchainClient; From 6656e9de1656f0bc088f8c457c080d65f393b507 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Fri, 18 Aug 2023 18:53:34 +1000 Subject: [PATCH 14/15] pass chain snapshot to plugins --- src/blockchain/blockchain_monitor.ts | 37 ++++++++++++++++----- src/blockchain/clients/blockchain_client.ts | 11 ++++-- src/blockchain/clients/explorer.ts | 4 +-- src/blockchain/clients/node.ts | 17 +++++++--- src/plugins/plugin.ts | 22 +++++++++--- src/plugins/plugin_manager.ts | 8 ++--- 6 files changed, 71 insertions(+), 28 deletions(-) diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts index 9c7d6db..16fffd3 100644 --- a/src/blockchain/blockchain_monitor.ts +++ b/src/blockchain/blockchain_monitor.ts @@ -3,6 +3,11 @@ import { Component } from "../component.ts"; import { ErgomaticConfig } from "../config.ts"; import { BlockchainClient } from "./clients/mod.ts"; +export interface BlockchainSnapshot { + height: number; + mempool: SignedTransaction[]; +} + interface MonitorState { currentHeight: number; /** map of txid -> bool indicating if mempool tx has been passed to plugins */ @@ -10,13 +15,16 @@ interface MonitorState { /** map of txid -> int indicating the number of re-checks */ mempoolTxChecks: Record; pastMempoolTxIds: TransactionId[]; + lastPeerMsgTimestamp: number; } +type MonitorEvent = CustomEvent<[T, Readonly]>; + interface BlockchainMonitorEvent { - "monitor:mempool-tx": CustomEvent; - "monitor:mempool-tx-drop": CustomEvent; - "monitor:included-tx": CustomEvent; - "monitor:new-block": CustomEvent; + "monitor:mempool-tx": MonitorEvent; + "monitor:mempool-tx-drop": MonitorEvent; + "monitor:included-tx": MonitorEvent; + "monitor:new-block": MonitorEvent; } export class BlockchainMonitor extends Component { @@ -42,6 +50,7 @@ export class BlockchainMonitor extends Component { mempoolTxDelivery: {}, mempoolTxChecks: {}, pastMempoolTxIds: [], + lastPeerMsgTimestamp: 0, }; } @@ -61,6 +70,13 @@ export class BlockchainMonitor extends Component { async #monitor() { this.logger.debug("Gathering blockchain state"); + const { currentHeight, lastPeerMsgTimestamp } = await this.#blockchainClient + .getInfo(); + + if (lastPeerMsgTimestamp === this.#state.lastPeerMsgTimestamp) { + return; + } + const mempool = []; // the loop following this where we go through all the mempool txs could // go in here as well but each time the `monitor:mempool-tx` event is raised @@ -71,12 +87,17 @@ export class BlockchainMonitor extends Component { mempool.push(...page); } + const snapshot: Readonly = Object.freeze({ + height: currentHeight, + mempool, + }); + for (const tx of mempool) { if (!this.#state.mempoolTxDelivery[tx.id]) { this.#state.mempoolTxDelivery[tx.id] = true; this.dispatchEvent( - new CustomEvent("monitor:mempool-tx", { detail: tx }), + new CustomEvent("monitor:mempool-tx", { detail: [tx, snapshot] }), ); } @@ -98,22 +119,20 @@ export class BlockchainMonitor extends Component { this.#state.pastMempoolTxIds = mempool.map((tx) => tx.id); - const currentHeight = await this.#blockchainClient.getCurrentHeight(); - if (currentHeight > this.#state.currentHeight) { const newBlock = await this.#blockchainClient.getBlock( currentHeight, ) as any; this.dispatchEvent( - new CustomEvent("monitor:new-block", { detail: newBlock }), + new CustomEvent("monitor:new-block", { detail: [newBlock, snapshot] }), ); this.#state.currentHeight = currentHeight; for (const tx of newBlock.blockTransactions) { this.dispatchEvent( - new CustomEvent("monitor:included-tx", { detail: tx }), + new CustomEvent("monitor:included-tx", { detail: [tx, snapshot] }), ); // stop tracking mempool delivery for this txid diff --git a/src/blockchain/clients/blockchain_client.ts b/src/blockchain/clients/blockchain_client.ts index 622998d..3fc7f4f 100644 --- a/src/blockchain/clients/blockchain_client.ts +++ b/src/blockchain/clients/blockchain_client.ts @@ -23,10 +23,15 @@ export interface RestrictedBlockchainClient { ): AsyncGenerator[]>; } +export interface BlockchainInfo { + currentHeight: number; + lastPeerMsgTimestamp?: number; +} + export interface BlockchainClient extends RestrictedBlockchainClient { getMempool(): AsyncGenerator; - getCurrentHeight(): Promise; + getInfo(): Promise; getBlock(height: number): Promise; } @@ -47,8 +52,8 @@ export class DefaultBlockchainClient extends Component return this.#node.getBlock(height); } - getCurrentHeight(): Promise { - return this.#node.getCurrentHeight(); + getInfo(): Promise { + return this.#node.getInfo(); } getMempool(): AsyncGenerator { diff --git a/src/blockchain/clients/explorer.ts b/src/blockchain/clients/explorer.ts index 6b05410..31f8418 100644 --- a/src/blockchain/clients/explorer.ts +++ b/src/blockchain/clients/explorer.ts @@ -6,7 +6,7 @@ import { TransactionId, } from "@fleet-sdk/common"; import axios, { AxiosInstance } from "axios"; -import { BlockchainClient } from "./blockchain_client.ts"; +import { BlockchainClient, BlockchainInfo } from "./blockchain_client.ts"; import { ErgomaticConfig } from "../../config.ts"; import { Component } from "../../component.ts"; @@ -31,7 +31,7 @@ export class ExplorerClient extends Component implements BlockchainClient { throw new Error("Method not implemented."); } - getCurrentHeight(): Promise { + getInfo(): Promise { throw new Error("Method not implemented."); } diff --git a/src/blockchain/clients/node.ts b/src/blockchain/clients/node.ts index 5dcbfb4..9dfa320 100644 --- a/src/blockchain/clients/node.ts +++ b/src/blockchain/clients/node.ts @@ -5,7 +5,7 @@ import { TokenId, TransactionId, } from "@fleet-sdk/common"; -import { BlockchainClient } from "./blockchain_client.ts"; +import { BlockchainClient, BlockchainInfo } from "./blockchain_client.ts"; import { Component } from "../../component.ts"; import { ErgomaticConfig } from "../../config.ts"; import axios, { AxiosInstance } from "axios"; @@ -27,18 +27,25 @@ export class NodeClient extends Component implements BlockchainClient { throw new Error("Method not implemented."); } - getCurrentHeight(): Promise { - throw new Error("Method not implemented."); + async getInfo(): Promise { + const response = await this.#http.get( + "/info", + this.#defaultRequestConfig, + ); + + return response.data; } - submitTx( + async submitTx( signedTx: SignedTransaction, ): Promise { - return this.#http.post( + const response = await this.#http.post( "/transactions", signedTx, this.#defaultRequestConfig, ); + + return response.data; } // deno-lint-ignore require-yield diff --git a/src/plugins/plugin.ts b/src/plugins/plugin.ts index 0c6a0e7..342d90c 100644 --- a/src/plugins/plugin.ts +++ b/src/plugins/plugin.ts @@ -1,5 +1,5 @@ import { Logger } from "std/log/mod.ts"; -import { BlockchainProvider } from "../blockchain/mod.ts"; +import { BlockchainProvider, BlockchainSnapshot } from "../blockchain/mod.ts"; import { SignedTransaction } from "@fleet-sdk/common"; export interface PluginDescriptor { @@ -56,22 +56,34 @@ export abstract class Plugin { } /** Called when a new transaction is added to mempool. */ - onMempoolTx(_tx: SignedTransaction): Promise { + onMempoolTx( + _tx: SignedTransaction, + _snapshot: Readonly, + ): Promise { return Promise.resolve(); } /** Called when a transaction is dropped from mempool. */ - onMempoolTxDrop(_tx: SignedTransaction): Promise { + onMempoolTxDrop( + _tx: SignedTransaction, + _snapshot: Readonly, + ): Promise { return Promise.resolve(); } /** Called when a transaction has been included in a block. */ - onIncludedTx(_tx: SignedTransaction): Promise { + onIncludedTx( + _tx: SignedTransaction, + _snapshot: Readonly, + ): Promise { return Promise.resolve(); } /** Called when a new block is added to the blockchain. */ - onNewBlock(block: any): Promise { + onNewBlock( + _block: any, + _snapshot: Readonly, + ): Promise { return Promise.resolve(); } diff --git a/src/plugins/plugin_manager.ts b/src/plugins/plugin_manager.ts index 54ea72d..a4bae0e 100644 --- a/src/plugins/plugin_manager.ts +++ b/src/plugins/plugin_manager.ts @@ -133,7 +133,7 @@ export class PluginManager extends Component { "monitor:mempool-tx", ({ detail }) => this.#pluginsByState(PluginState.Running).forEach((p) => - p.plugin.onMempoolTx(detail).catch((e) => + p.plugin.onMempoolTx(...detail).catch((e) => this.#handlePluginError(p, e) ) ), @@ -142,7 +142,7 @@ export class PluginManager extends Component { "monitor:mempool-tx-drop", ({ detail }) => this.#pluginsByState(PluginState.Running).forEach((p) => - p.plugin.onMempoolTxDrop(detail).catch((e) => + p.plugin.onMempoolTxDrop(...detail).catch((e) => this.#handlePluginError(p, e) ) ), @@ -151,7 +151,7 @@ export class PluginManager extends Component { "monitor:included-tx", ({ detail }) => this.#pluginsByState(PluginState.Running).forEach((p) => - p.plugin.onIncludedTx(detail).catch((e) => + p.plugin.onIncludedTx(...detail).catch((e) => this.#handlePluginError(p, e) ) ), @@ -160,7 +160,7 @@ export class PluginManager extends Component { "monitor:new-block", ({ detail }) => this.#pluginsByState(PluginState.Running).forEach((p) => - p.plugin.onNewBlock(detail).catch((e) => + p.plugin.onNewBlock(...detail).catch((e) => this.#handlePluginError(p, e) ) ), From 174cd42a599d63181147495e3c7e741b33370bd6 Mon Sep 17 00:00:00 2001 From: ross-weir <29697678+ross-weir@users.noreply.github.com> Date: Fri, 18 Aug 2023 19:18:13 +1000 Subject: [PATCH 15/15] update lastPeerMsgTimestamp state --- src/blockchain/blockchain_monitor.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts index 16fffd3..826c553 100644 --- a/src/blockchain/blockchain_monitor.ts +++ b/src/blockchain/blockchain_monitor.ts @@ -77,6 +77,8 @@ export class BlockchainMonitor extends Component { return; } + this.#state.lastPeerMsgTimestamp = lastPeerMsgTimestamp!; + const mempool = []; // the loop following this where we go through all the mempool txs could // go in here as well but each time the `monitor:mempool-tx` event is raised