diff --git a/plugins/example_plugin/mod.ts b/plugins/example_plugin/mod.ts index 00380f6..dbfdea2 100644 --- a/plugins/example_plugin/mod.ts +++ b/plugins/example_plugin/mod.ts @@ -25,7 +25,9 @@ export class ExamplePlugin extends Plugin { const { tokenId, exitAtPage } = this.config; let currentPage = 0; - for await (const page of this.blockchainClient.getBoxesByTokenId(tokenId)) { + for await ( + const page of this.blockchainProvider.getBoxesByTokenId(tokenId) + ) { currentPage++; this.logger.info( diff --git a/src/blockchain/_testing.ts b/src/blockchain/_testing.ts new file mode 100644 index 0000000..4873ee4 --- /dev/null +++ b/src/blockchain/_testing.ts @@ -0,0 +1,10 @@ +import { ErgomaticConfig } from "../config.ts"; +import { BlockchainMonitor } from "./blockchain_monitor.ts"; +import { BlockchainClient } from "./clients/mod.ts"; + +export function mkTestBlockchainMonitor( + config: ErgomaticConfig, + blockchainClient: BlockchainClient, +) { + return new BlockchainMonitor(config, blockchainClient); +} diff --git a/src/blockchain/blockchain_client.ts b/src/blockchain/blockchain_client.ts deleted file mode 100644 index 97dd7ed..0000000 --- a/src/blockchain/blockchain_client.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { - AmountType, - Box, - SignedTransaction, - TokenId, - TransactionId, -} from "@fleet-sdk/common"; - -export interface BlockchainClient { - submitTx(signedTx: SignedTransaction): Promise; - - getBoxesByTokenId( - tokenId: TokenId, - ): AsyncGenerator[]>; -} diff --git a/src/blockchain/blockchain_monitor.ts b/src/blockchain/blockchain_monitor.ts new file mode 100644 index 0000000..826c553 --- /dev/null +++ b/src/blockchain/blockchain_monitor.ts @@ -0,0 +1,160 @@ +import { SignedTransaction, TransactionId } from "@fleet-sdk/common"; +import { Component } from "../component.ts"; +import { ErgomaticConfig } from "../config.ts"; +import { BlockchainClient } from "./clients/mod.ts"; + +export interface BlockchainSnapshot { + height: number; + mempool: SignedTransaction[]; +} + +interface MonitorState { + currentHeight: number; + /** map of txid -> bool indicating if mempool tx has been passed to plugins */ + mempoolTxDelivery: Record; + /** map of txid -> int indicating the number of re-checks */ + mempoolTxChecks: Record; + pastMempoolTxIds: TransactionId[]; + lastPeerMsgTimestamp: number; +} + +type MonitorEvent = CustomEvent<[T, Readonly]>; + +interface BlockchainMonitorEvent { + "monitor:mempool-tx": MonitorEvent; + "monitor:mempool-tx-drop": MonitorEvent; + "monitor:included-tx": MonitorEvent; + "monitor:new-block": MonitorEvent; +} + +export class BlockchainMonitor extends Component { + readonly #blockchainClient: BlockchainClient; + readonly #pollIntervalMs: number; + readonly #maxMempoolTxChecks: number; + readonly #state: MonitorState; + #taskHandle?: number; + + constructor( + config: ErgomaticConfig, + blockchainClient: BlockchainClient, + pollIntervalMs: number = 500, + maxMempoolTxChecks: number = 10, + ) { + super(config, "BlockchainMonitor"); + + this.#pollIntervalMs = pollIntervalMs; + this.#maxMempoolTxChecks = maxMempoolTxChecks; + this.#blockchainClient = blockchainClient; + this.#state = { + currentHeight: 0, + mempoolTxDelivery: {}, + mempoolTxChecks: {}, + pastMempoolTxIds: [], + lastPeerMsgTimestamp: 0, + }; + } + + start(): Promise { + // TODO: raise component:error event if monitor throws exception + this.#taskHandle = setInterval(() => this.#monitor(), this.#pollIntervalMs); + + return super.start(); + } + + stop(): Promise { + clearInterval(this.#taskHandle); + + return super.stop(); + } + + async #monitor() { + this.logger.debug("Gathering blockchain state"); + + const { currentHeight, lastPeerMsgTimestamp } = await this.#blockchainClient + .getInfo(); + + if (lastPeerMsgTimestamp === this.#state.lastPeerMsgTimestamp) { + return; + } + + this.#state.lastPeerMsgTimestamp = lastPeerMsgTimestamp!; + + const mempool = []; + // the loop following this where we go through all the mempool txs could + // go in here as well but each time the `monitor:mempool-tx` event is raised + // it could provide a different mempool snapshot to plugins. + // + // instead, collect the full mempool first so the plugins can receive a full consistent snapshot. + for await (const page of this.#blockchainClient.getMempool()) { + mempool.push(...page); + } + + const snapshot: Readonly = Object.freeze({ + height: currentHeight, + mempool, + }); + + for (const tx of mempool) { + if (!this.#state.mempoolTxDelivery[tx.id]) { + this.#state.mempoolTxDelivery[tx.id] = true; + + this.dispatchEvent( + new CustomEvent("monitor:mempool-tx", { detail: [tx, snapshot] }), + ); + } + + this.#state.pastMempoolTxIds = this.#state.pastMempoolTxIds.filter(( + txId, + ) => txId !== tx.id); + + // remove txid from undefined state transactions map if present + // this resets the mempool drop detection counter for this txid + delete this.#state.mempoolTxChecks[tx.id]; + } + + // if tx was present in previous mempool, but not in the + // current, it may have been dropped or included in a block + for (const txId of this.#state.pastMempoolTxIds) { + this.#state.mempoolTxChecks[txId] = + (this.#state.mempoolTxChecks[txId] ?? 0) + 1; + } + + this.#state.pastMempoolTxIds = mempool.map((tx) => tx.id); + + if (currentHeight > this.#state.currentHeight) { + const newBlock = await this.#blockchainClient.getBlock( + currentHeight, + ) as any; + + this.dispatchEvent( + new CustomEvent("monitor:new-block", { detail: [newBlock, snapshot] }), + ); + + this.#state.currentHeight = currentHeight; + + for (const tx of newBlock.blockTransactions) { + this.dispatchEvent( + new CustomEvent("monitor:included-tx", { detail: [tx, snapshot] }), + ); + + // stop tracking mempool delivery for this txid + delete this.#state.mempoolTxDelivery[tx.id]; + + // prevent `onMempoolTxDrop` event for this txid as + // it is now included in a block + delete this.#state.mempoolTxChecks[tx.id]; + } + } + + for (const txId of Object.keys(this.#state.mempoolTxChecks)) { + // if a tx is not included in a block in dropChecks * `n` seconds, + // then it's probably dropped from the mempool + if (this.#state.mempoolTxChecks[txId] > this.#maxMempoolTxChecks) { + // TODO raise mempool dropped, so we still need to keep track of txns not just the id? + delete this.#state.mempoolTxChecks[txId]; + } else { + this.#state.mempoolTxChecks[txId] += 1; + } + } + } +} diff --git a/src/blockchain/blockchain_provider.ts b/src/blockchain/blockchain_provider.ts deleted file mode 100644 index cf4919e..0000000 --- a/src/blockchain/blockchain_provider.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { - AmountType, - Box, - SignedTransaction, - TokenId, - TransactionId, -} from "@fleet-sdk/common"; -import { Component } from "../component.ts"; -import { ErgomaticConfig } from "../config.ts"; -import { BlockchainClient } from "./blockchain_client.ts"; -import { ExplorerClient } from "./clients/mod.ts"; - -export class BlockchainProvider extends Component implements BlockchainClient { - readonly #explorer: BlockchainClient; - - constructor(config: ErgomaticConfig) { - super(config, "BlockchainProvider"); - - this.#explorer = new ExplorerClient(config); - } - - submitTx(signedTx: SignedTransaction): Promise { - return this.#explorer.submitTx(signedTx); - } - - getBoxesByTokenId( - tokenId: TokenId, - ): AsyncGenerator[]> { - return this.#explorer.getBoxesByTokenId(tokenId); - } -} diff --git a/src/blockchain/clients/blockchain_client.ts b/src/blockchain/clients/blockchain_client.ts new file mode 100644 index 0000000..3fc7f4f --- /dev/null +++ b/src/blockchain/clients/blockchain_client.ts @@ -0,0 +1,72 @@ +import { + AmountType, + Box, + SignedTransaction, + TokenId, + TransactionId, +} from "@fleet-sdk/common"; +import { Component } from "../../component.ts"; +import { ErgomaticConfig } from "../../config.ts"; +import { ExplorerClient } from "./explorer.ts"; +import { NodeClient } from "./node.ts"; + +/** + * RestrictedBlockchainClient is a blockchain client that is exclusively used by plugins. + * The API is intentionally restricted to prevent plugins issuing excessive API requests + * that are instead provided to plugins via a snapshot of the blockchain state. + */ +export interface RestrictedBlockchainClient { + submitTx(signedTx: SignedTransaction): Promise; + + getBoxesByTokenId( + tokenId: TokenId, + ): AsyncGenerator[]>; +} + +export interface BlockchainInfo { + currentHeight: number; + lastPeerMsgTimestamp?: number; +} + +export interface BlockchainClient extends RestrictedBlockchainClient { + getMempool(): AsyncGenerator; + + getInfo(): Promise; + + getBlock(height: number): Promise; +} + +export class DefaultBlockchainClient extends Component + implements BlockchainClient { + readonly #explorer: BlockchainClient; + readonly #node: BlockchainClient; + + constructor(config: ErgomaticConfig) { + super(config, "DefaultBlockchainProvider"); + + this.#explorer = new ExplorerClient(config); + this.#node = new NodeClient(config); + } + + getBlock(height: number): Promise { + return this.#node.getBlock(height); + } + + getInfo(): Promise { + return this.#node.getInfo(); + } + + getMempool(): AsyncGenerator { + return this.#node.getMempool(); + } + + submitTx(signedTx: SignedTransaction): Promise { + return this.#node.submitTx(signedTx); + } + + getBoxesByTokenId( + tokenId: TokenId, + ): AsyncGenerator[]> { + return this.#explorer.getBoxesByTokenId(tokenId); + } +} diff --git a/src/blockchain/clients/blockchain_provider.ts b/src/blockchain/clients/blockchain_provider.ts new file mode 100644 index 0000000..8aebcd0 --- /dev/null +++ b/src/blockchain/clients/blockchain_provider.ts @@ -0,0 +1,39 @@ +import { + AmountType, + Box, + SignedTransaction, + TokenId, + TransactionId, +} from "@fleet-sdk/common"; +import { Component } from "../../component.ts"; +import { ErgomaticConfig } from "../../config.ts"; +import { + BlockchainClient, + RestrictedBlockchainClient, +} from "./blockchain_client.ts"; +import { ExplorerClient } from "./explorer.ts"; +import { NodeClient } from "./node.ts"; + +/** Restricted blockchain client used by plugins */ +export class BlockchainProvider extends Component + implements RestrictedBlockchainClient { + readonly #explorer: BlockchainClient; + readonly #node: BlockchainClient; + + constructor(config: ErgomaticConfig) { + super(config, "BlockchainProvider"); + + this.#explorer = new ExplorerClient(config); + this.#node = new NodeClient(config); + } + + submitTx(signedTx: SignedTransaction): Promise { + return this.#node.submitTx(signedTx); + } + + getBoxesByTokenId( + tokenId: TokenId, + ): AsyncGenerator[]> { + return this.#explorer.getBoxesByTokenId(tokenId); + } +} diff --git a/src/blockchain/clients/explorer.ts b/src/blockchain/clients/explorer.ts index 2c14e5f..31f8418 100644 --- a/src/blockchain/clients/explorer.ts +++ b/src/blockchain/clients/explorer.ts @@ -6,15 +6,18 @@ import { TransactionId, } from "@fleet-sdk/common"; import axios, { AxiosInstance } from "axios"; -import { BlockchainClient } from "../blockchain_client.ts"; +import { BlockchainClient, BlockchainInfo } from "./blockchain_client.ts"; import { ErgomaticConfig } from "../../config.ts"; +import { Component } from "../../component.ts"; -export class ExplorerClient implements BlockchainClient { +export class ExplorerClient extends Component implements BlockchainClient { readonly #http: AxiosInstance; #pageSize = 100; #timeoutMs: number; constructor(config: ErgomaticConfig, httpTimeoutMs: number = 10000) { + super(config, "ExplorerClient"); + // axios timeout is incompatible with deno due to a missing nodejs API // use signals for timeouts instead. this.#timeoutMs = httpTimeoutMs; @@ -24,6 +27,18 @@ export class ExplorerClient implements BlockchainClient { }); } + getBlock(_height: number): Promise { + throw new Error("Method not implemented."); + } + + getInfo(): Promise { + throw new Error("Method not implemented."); + } + + getMempool(): AsyncGenerator { + throw new Error(`${this.name} does not support getMempool operation`); + } + async submitTx(signedTx: SignedTransaction): Promise { const response = await this.#http.post( "/mempool/transactions/submit", diff --git a/src/blockchain/clients/mod.ts b/src/blockchain/clients/mod.ts index 3e99d9a..bb0c861 100644 --- a/src/blockchain/clients/mod.ts +++ b/src/blockchain/clients/mod.ts @@ -1 +1,2 @@ -export * from "./explorer.ts"; +export * from "./blockchain_client.ts"; +export * from "./blockchain_provider.ts"; diff --git a/src/blockchain/clients/node.ts b/src/blockchain/clients/node.ts new file mode 100644 index 0000000..9dfa320 --- /dev/null +++ b/src/blockchain/clients/node.ts @@ -0,0 +1,89 @@ +import { + AmountType, + Box, + SignedTransaction, + TokenId, + TransactionId, +} from "@fleet-sdk/common"; +import { BlockchainClient, BlockchainInfo } from "./blockchain_client.ts"; +import { Component } from "../../component.ts"; +import { ErgomaticConfig } from "../../config.ts"; +import axios, { AxiosInstance } from "axios"; + +export class NodeClient extends Component implements BlockchainClient { + readonly #http: AxiosInstance; + #timeoutMs: number; + + constructor(config: ErgomaticConfig, httpTimeoutMs: number = 10000) { + super(config, "NodeClient"); + + this.#timeoutMs = httpTimeoutMs; + this.#http = axios.create({ + baseURL: config.node.endpoint, + }); + } + + getBlock(_height: number): Promise { + throw new Error("Method not implemented."); + } + + async getInfo(): Promise { + const response = await this.#http.get( + "/info", + this.#defaultRequestConfig, + ); + + return response.data; + } + + async submitTx( + signedTx: SignedTransaction, + ): Promise { + const response = await this.#http.post( + "/transactions", + signedTx, + this.#defaultRequestConfig, + ); + + return response.data; + } + + // deno-lint-ignore require-yield + async *getBoxesByTokenId( + _tokenId: TokenId, + ): AsyncGenerator[]> { + throw new Error( + `${this.name} does not support getBoxesByTokenId operation`, + ); + } + + async *getMempool(): AsyncGenerator { + let offset = 0; + // the max limit is defined as 100 in the node api spec but is not enforced + // take advantage of this to reduce the number of requests + // this function is still paginated with the expectation that this could + // be changed in the future. + const limit = 100000; + + while (true) { + const { data } = await this.#http.get("/transactions/unconfirmed", { + params: { offset, limit }, + ...this.#defaultRequestConfig, + }); + + if (data.length) { + yield data; + } + + if (data.length < limit) { + break; + } + + offset += limit; + } + } + + get #defaultRequestConfig() { + return { signal: AbortSignal.timeout(this.#timeoutMs) }; + } +} diff --git a/src/blockchain/mod.ts b/src/blockchain/mod.ts index 2902dce..81ccc9a 100644 --- a/src/blockchain/mod.ts +++ b/src/blockchain/mod.ts @@ -1,2 +1,2 @@ -export * from "./blockchain_provider.ts"; -export * from "./blockchain_client.ts"; +export * from "./clients/mod.ts"; +export * from "./blockchain_monitor.ts"; diff --git a/src/cli_test.ts b/src/cli_test.ts deleted file mode 100644 index b957d51..0000000 --- a/src/cli_test.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { assertEquals } from "std/testing/asserts.ts"; - -Deno.test("example", () => { - assertEquals(1, 1); -}); diff --git a/src/ergomatic.ts b/src/ergomatic.ts index 4d06ba1..26adf28 100644 --- a/src/ergomatic.ts +++ b/src/ergomatic.ts @@ -1,7 +1,12 @@ import { PluginManager, PluginManagerEvent } from "./plugins/plugin_manager.ts"; import { ErgomaticConfig } from "./config.ts"; import { Component } from "./component.ts"; -import { BlockchainClient, BlockchainProvider } from "./blockchain/mod.ts"; +import { + BlockchainClient, + BlockchainMonitor, + BlockchainProvider, + DefaultBlockchainClient, +} from "./blockchain/mod.ts"; interface ErgomaticEvent { "component:error": CustomEvent<{ component: Component; error: Error }>; @@ -12,6 +17,8 @@ interface ErgomaticOpts { config: ErgomaticConfig; pluginManager?: PluginManager; blockchainClient?: BlockchainClient; + blockchainProvider?: BlockchainProvider; + blockchainMonitor?: BlockchainMonitor; } export class Ergomatic extends Component { @@ -21,14 +28,18 @@ export class Ergomatic extends Component { constructor(opts: ErgomaticOpts) { super(opts.config, "Ergomatic"); - const blockchainClient = opts.blockchainClient ?? + const blockchainProvider = opts.blockchainProvider ?? new BlockchainProvider(opts.config); + const blockchainClient = opts.blockchainClient ?? + new DefaultBlockchainClient(opts.config); + const blockchainMonitor = opts.blockchainMonitor ?? + new BlockchainMonitor(opts.config, blockchainClient); const pluginManager = opts.pluginManager ?? - new PluginManager(opts.config, blockchainClient); + new PluginManager(opts.config, blockchainProvider, blockchainMonitor); pluginManager.addEventListener("plugin:error", (e) => this.#bubbleEvent(e)); - this.#components = [pluginManager]; + this.#components = [pluginManager, blockchainMonitor]; } public async start(): Promise { diff --git a/src/plugins/_testing.ts b/src/plugins/_testing.ts index 9b34ab2..2a57e2a 100644 --- a/src/plugins/_testing.ts +++ b/src/plugins/_testing.ts @@ -4,8 +4,12 @@ import { _internals, ManagedPlugin, PluginState } from "./plugin_manager.ts"; import { ErgomaticConfig } from "../config.ts"; import { PluginManager } from "./mod.ts"; import { stub } from "std/testing/mock.ts"; -import { BlockchainClient, BlockchainProvider } from "../blockchain/mod.ts"; +import { + BlockchainProvider, + DefaultBlockchainClient, +} from "../blockchain/mod.ts"; import { testConfig } from "../_testing.ts"; +import { mkTestBlockchainMonitor } from "../blockchain/_testing.ts"; export class TestPlugin extends Plugin { get descriptor(): PluginDescriptor { @@ -25,7 +29,7 @@ export function mkTestPluginManager( opts?: TestPluginManagerOpts, ) { const pluginsStub = opts?.plugins?.length - ? stub(_internals, "plugins", () => opts?.plugins!) + ? stub(_internals, "managedPlugins", () => opts?.plugins!) : null; const cleanup = () => { @@ -34,11 +38,15 @@ export function mkTestPluginManager( const config = opts?.config ?? testConfig(); const pluginMap = opts?.pluginMap ?? testPluginMap; + const provider = new BlockchainProvider(config); + const client = new DefaultBlockchainClient(config); + const monitor = mkTestBlockchainMonitor(config, client); return { pluginManager: new PluginManager( config, - new BlockchainProvider(config), + provider, + monitor, pluginMap, ), cleanup, @@ -50,7 +58,7 @@ export function mkTestManagedPlugin( ): ManagedPlugin { const plugin = new TestPlugin({ logger: getLogger(), - blockchainClient: {} as BlockchainClient, // TODO + blockchainProvider: {} as BlockchainProvider, // TODO config: {}, }); diff --git a/src/plugins/plugin.ts b/src/plugins/plugin.ts index 9e826ba..342d90c 100644 --- a/src/plugins/plugin.ts +++ b/src/plugins/plugin.ts @@ -1,5 +1,6 @@ import { Logger } from "std/log/mod.ts"; -import { BlockchainClient } from "../blockchain/mod.ts"; +import { BlockchainProvider, BlockchainSnapshot } from "../blockchain/mod.ts"; +import { SignedTransaction } from "@fleet-sdk/common"; export interface PluginDescriptor { /** User friendly name of the plugin. */ @@ -20,7 +21,7 @@ export type PluginConstructor = { export interface PluginArgs { config: T; logger: Logger; - blockchainClient: BlockchainClient; + blockchainProvider: BlockchainProvider; } export abstract class Plugin { @@ -30,12 +31,12 @@ export abstract class Plugin { /** Logger configured to log output of this plugin. */ protected readonly logger: Logger; - protected readonly blockchainClient: BlockchainClient; + protected readonly blockchainProvider: BlockchainProvider; - constructor({ config, logger, blockchainClient }: PluginArgs) { + constructor({ config, logger, blockchainProvider }: PluginArgs) { this.config = config; this.logger = logger; - this.blockchainClient = blockchainClient; + this.blockchainProvider = blockchainProvider; } /** @@ -54,5 +55,37 @@ export abstract class Plugin { return Promise.resolve(); } + /** Called when a new transaction is added to mempool. */ + onMempoolTx( + _tx: SignedTransaction, + _snapshot: Readonly, + ): Promise { + return Promise.resolve(); + } + + /** Called when a transaction is dropped from mempool. */ + onMempoolTxDrop( + _tx: SignedTransaction, + _snapshot: Readonly, + ): Promise { + return Promise.resolve(); + } + + /** Called when a transaction has been included in a block. */ + onIncludedTx( + _tx: SignedTransaction, + _snapshot: Readonly, + ): Promise { + return Promise.resolve(); + } + + /** Called when a new block is added to the blockchain. */ + onNewBlock( + _block: any, + _snapshot: Readonly, + ): Promise { + return Promise.resolve(); + } + abstract get descriptor(): PluginDescriptor; } diff --git a/src/plugins/plugin_manager.ts b/src/plugins/plugin_manager.ts index 92a2d61..a4bae0e 100644 --- a/src/plugins/plugin_manager.ts +++ b/src/plugins/plugin_manager.ts @@ -4,7 +4,7 @@ import { createLogger } from "../log.ts"; import { ErgomaticConfigError } from "../error.ts"; import { pluginConstructorMap } from "../../plugins/mod.ts"; import { Component } from "../component.ts"; -import { BlockchainClient, BlockchainProvider } from "../blockchain/mod.ts"; +import { BlockchainMonitor, BlockchainProvider } from "../blockchain/mod.ts"; export interface PluginManagerEvent { "plugin:error": CustomEvent<{ plugin: Plugin; error: Error }>; @@ -23,25 +23,25 @@ export interface ManagedPlugin { export const _internals = { /** Allow mocking managed plugins in tests. */ - plugins(plugins: ManagedPlugin[]) { + managedPlugins(plugins: ManagedPlugin[]) { return plugins; }, }; export class PluginManager extends Component { readonly #pluginConstructorMap: Record; - readonly #blockchainClient: BlockchainClient; + readonly #blockchainProvider: BlockchainProvider; #_plugins: ManagedPlugin[]; constructor( config: ErgomaticConfig, - blockchainClient?: BlockchainClient, + blockchainProvider: BlockchainProvider, + blockchainMonitor: BlockchainMonitor, pluginCtorMap = pluginConstructorMap, ) { super(config, "PluginManager"); - this.#blockchainClient = blockchainClient ?? - new BlockchainProvider(config); + this.#blockchainProvider = blockchainProvider; this.#pluginConstructorMap = pluginCtorMap; this.#_plugins = config.plugins.filter((p) => p.enabled).map(( pluginEntry, @@ -49,6 +49,8 @@ export class PluginManager extends Component { plugin: this.#createPlugin(config, pluginEntry), state: PluginState.Stopped, })); + + this.#setupEventHandlers(blockchainMonitor); } public async start(): Promise { @@ -87,8 +89,8 @@ export class PluginManager extends Component { await Promise.allSettled(promises); } - get #plugins() { - return _internals.plugins(this.#_plugins); + get #managedPlugins() { + return _internals.managedPlugins(this.#_plugins); } #handlePluginError(managedPlugin: ManagedPlugin, error: Error): void { @@ -102,7 +104,7 @@ export class PluginManager extends Component { } #pluginsByState(state: PluginState): ManagedPlugin[] { - return this.#plugins.filter((p) => p.state === state); + return this.#managedPlugins.filter((p) => p.state === state); } #createPlugin( @@ -121,8 +123,47 @@ export class PluginManager extends Component { return new pluginCtor({ config: pluginEntry.config, - blockchainClient: this.#blockchainClient, + blockchainProvider: this.#blockchainProvider, logger: createLogger(pluginEntry.id, config.logLevel), }); } + + #setupEventHandlers(blockchainMonitor: BlockchainMonitor) { + blockchainMonitor.addEventListener( + "monitor:mempool-tx", + ({ detail }) => + this.#pluginsByState(PluginState.Running).forEach((p) => + p.plugin.onMempoolTx(...detail).catch((e) => + this.#handlePluginError(p, e) + ) + ), + ); + blockchainMonitor.addEventListener( + "monitor:mempool-tx-drop", + ({ detail }) => + this.#pluginsByState(PluginState.Running).forEach((p) => + p.plugin.onMempoolTxDrop(...detail).catch((e) => + this.#handlePluginError(p, e) + ) + ), + ); + blockchainMonitor.addEventListener( + "monitor:included-tx", + ({ detail }) => + this.#pluginsByState(PluginState.Running).forEach((p) => + p.plugin.onIncludedTx(...detail).catch((e) => + this.#handlePluginError(p, e) + ) + ), + ); + blockchainMonitor.addEventListener( + "monitor:new-block", + ({ detail }) => + this.#pluginsByState(PluginState.Running).forEach((p) => + p.plugin.onNewBlock(...detail).catch((e) => + this.#handlePluginError(p, e) + ) + ), + ); + } } diff --git a/src/plugins/plugin_manager_test.ts b/src/plugins/plugin_manager_test.ts index 382af96..4d09004 100644 --- a/src/plugins/plugin_manager_test.ts +++ b/src/plugins/plugin_manager_test.ts @@ -10,16 +10,26 @@ import { mkTestPluginManager, testPluginMap, } from "./_testing.ts"; -import { BlockchainClient, BlockchainProvider } from "../blockchain/mod.ts"; +import { + BlockchainClient, + BlockchainMonitor, + BlockchainProvider, + DefaultBlockchainClient, +} from "../blockchain/mod.ts"; import { testConfig } from "../_testing.ts"; +import { mkTestBlockchainMonitor } from "../blockchain/_testing.ts"; describe("PluginManager", () => { let config: ErgomaticConfig; + let blockchainProvider: BlockchainProvider; let blockchainClient: BlockchainClient; + let blockchainMonitor: BlockchainMonitor; beforeEach(() => { config = testConfig(); - blockchainClient = new BlockchainProvider(config); + blockchainProvider = new BlockchainProvider(config); + blockchainClient = new DefaultBlockchainClient(config); + blockchainMonitor = mkTestBlockchainMonitor(config, blockchainClient); }); describe("constructor", () => { @@ -27,7 +37,13 @@ describe("PluginManager", () => { config.plugins[0]!.id = "invalid"; assertThrows( - () => new PluginManager(config, blockchainClient, testPluginMap), + () => + new PluginManager( + config, + blockchainProvider, + blockchainMonitor, + testPluginMap, + ), ErgomaticConfigError, "Unknown plugin ID", ); @@ -36,10 +52,20 @@ describe("PluginManager", () => { config.plugins[0]!.id = "invalid"; config.plugins[0]!.enabled = false; - new PluginManager(config, blockchainClient, testPluginMap); + new PluginManager( + config, + blockchainProvider, + blockchainMonitor, + testPluginMap, + ); }); it("should create PluginManager instance", () => { - new PluginManager(config, blockchainClient, testPluginMap); + new PluginManager( + config, + blockchainProvider, + blockchainMonitor, + testPluginMap, + ); }); });