Skip to content

Commit

Permalink
start wiring up monitor -> plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
ross-weir committed Aug 17, 2023
1 parent 977ac6d commit 43d5c4d
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 21 deletions.
25 changes: 13 additions & 12 deletions src/blockchain/blockchain_monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ interface MonitorState {

interface BlockchainMonitorEvent {
"monitor:mempool-tx": CustomEvent<SignedTransaction>;
"monitor:mempool-tx-drop": CustomEvent<SignedTransaction>;
"monitor:block": CustomEvent<unknown>;
}

Expand Down Expand Up @@ -74,6 +75,8 @@ export class BlockchainMonitor extends Component<BlockchainMonitorEvent> {
txId,
) => txId !== tx.id);

// remove txid from undefined state transactions map if present
// this resets the mempool drop detection counter for this txid
delete this.#state.mempoolTxChecks[tx.id];
}

Expand Down Expand Up @@ -110,17 +113,15 @@ export class BlockchainMonitor extends Component<BlockchainMonitorEvent> {
// end
}

// for txid in undefinedStateCheks.keys
// do
// # if a tx is not included in a block in dropChecks * `n` seconds,
// # then it's probably dropped from the mempool
// if undefinedStateCheks[txid] > maxChecks
// do # consider dropped
// plugins.all.onMempoolTxDrop(txid)
// delete undefinedStateCheks[txid]
// else # one more inconclusive check
// undefinedStateCheks[txid] += 1
// end
// end
for (const txId of Object.keys(this.#state.mempoolTxChecks)) {
// if a tx is not included in a block in dropChecks * `n` seconds,
// then it's probably dropped from the mempool
if (this.#state.mempoolTxChecks[txId] > this.#maxMempoolTxChecks) {
// TODO raise mempool dropped, so we still need to keep track of txns not just the id?
delete this.#state.mempoolTxChecks[txId];
} else {
this.#state.mempoolTxChecks[txId] += 1;
}
}
}
}
2 changes: 1 addition & 1 deletion src/blockchain/clients/explorer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class ExplorerClient extends Component implements BlockchainClient {
});
}

getBlock(height: number): Promise<unknown> {
getBlock(_height: number): Promise<unknown> {
throw new Error("Method not implemented.");
}

Expand Down
2 changes: 1 addition & 1 deletion src/blockchain/clients/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class NodeClient extends Component implements BlockchainClient {
});
}

getBlock(height: number): Promise<unknown> {
getBlock(_height: number): Promise<unknown> {
throw new Error("Method not implemented.");
}

Expand Down
1 change: 1 addition & 0 deletions src/blockchain/mod.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./clients/mod.ts";
export * from "./blockchain_monitor.ts";
17 changes: 15 additions & 2 deletions src/ergomatic.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { PluginManager, PluginManagerEvent } from "./plugins/plugin_manager.ts";
import { ErgomaticConfig } from "./config.ts";
import { Component } from "./component.ts";
import { BlockchainClient, BlockchainProvider } from "./blockchain/mod.ts";
import {
BlockchainClient,
BlockchainMonitor,
BlockchainProvider,
} from "./blockchain/mod.ts";

interface ErgomaticEvent {
"component:error": CustomEvent<{ component: Component; error: Error }>;
Expand All @@ -12,6 +16,7 @@ interface ErgomaticOpts {
config: ErgomaticConfig;
pluginManager?: PluginManager;
blockchainClient?: BlockchainClient;
blockchainMonitor?: BlockchainMonitor;
}

export class Ergomatic extends Component<ErgomaticEvent> {
Expand All @@ -25,10 +30,18 @@ export class Ergomatic extends Component<ErgomaticEvent> {
new BlockchainProvider(opts.config);
const pluginManager = opts.pluginManager ??
new PluginManager(opts.config, blockchainClient);
const blockchainMonitor = opts.blockchainMonitor ??
new BlockchainMonitor(opts.config, blockchainClient);

pluginManager.addEventListener("plugin:error", (e) => this.#bubbleEvent(e));
// TODO: handle errors in plugin handlers
blockchainMonitor.addEventListener(
"monitor:mempool-tx",
({ detail }) =>
pluginManager.activePlugins.forEach((p) => p.onMempoolTx(detail)),
);

this.#components = [pluginManager];
this.#components = [pluginManager, blockchainMonitor];
}

public async start(): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/_testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export function mkTestPluginManager(
opts?: TestPluginManagerOpts,
) {
const pluginsStub = opts?.plugins?.length
? stub(_internals, "plugins", () => opts?.plugins!)
? stub(_internals, "managedPlugins", () => opts?.plugins!)
: null;

const cleanup = () => {
Expand Down
6 changes: 6 additions & 0 deletions src/plugins/plugin.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Logger } from "std/log/mod.ts";
import { BlockchainClient } from "../blockchain/mod.ts";
import { SignedTransaction } from "@fleet-sdk/common";

export interface PluginDescriptor {
/** User friendly name of the plugin. */
Expand Down Expand Up @@ -54,5 +55,10 @@ export abstract class Plugin<T = unknown> {
return Promise.resolve();
}

/** Called when a new transaction is added to mempool. */
onMempoolTx(_tx: SignedTransaction): Promise<void> {
return Promise.resolve();
}

abstract get descriptor(): PluginDescriptor;
}
12 changes: 8 additions & 4 deletions src/plugins/plugin_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export interface ManagedPlugin {

export const _internals = {
/** Allow mocking managed plugins in tests. */
plugins(plugins: ManagedPlugin[]) {
managedPlugins(plugins: ManagedPlugin[]) {
return plugins;
},
};
Expand Down Expand Up @@ -87,8 +87,12 @@ export class PluginManager extends Component<PluginManagerEvent> {
await Promise.allSettled(promises);
}

get #plugins() {
return _internals.plugins(this.#_plugins);
get activePlugins(): ReadonlyArray<Plugin> {
return this.#pluginsByState(PluginState.Running).map((p) => p.plugin);
}

get #managedPlugins() {
return _internals.managedPlugins(this.#_plugins);
}

#handlePluginError(managedPlugin: ManagedPlugin, error: Error): void {
Expand All @@ -102,7 +106,7 @@ export class PluginManager extends Component<PluginManagerEvent> {
}

#pluginsByState(state: PluginState): ManagedPlugin[] {
return this.#plugins.filter((p) => p.state === state);
return this.#managedPlugins.filter((p) => p.state === state);
}

#createPlugin(
Expand Down
24 changes: 24 additions & 0 deletions src/plugins/plugin_manager_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,28 @@ describe("PluginManager", () => {
}
});
});

describe("activePlugins", () => {
it("should only return running plugins", () => {
const startedPlugin = mkTestManagedPlugin(PluginState.Running);
const startedPlugin2 = mkTestManagedPlugin(PluginState.Running);
const { pluginManager, cleanup } = mkTestPluginManager({
plugins: [
startedPlugin,
mkTestManagedPlugin(PluginState.Error),
mkTestManagedPlugin(PluginState.Stopped),
startedPlugin2,
],
});

try {
assertEquals(pluginManager.activePlugins, [
startedPlugin.plugin,
startedPlugin2.plugin,
]);
} finally {
cleanup();
}
});
});
});

0 comments on commit 43d5c4d

Please sign in to comment.