Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockchain monitor component #15

Merged
merged 15 commits into from
Aug 22, 2023
4 changes: 3 additions & 1 deletion plugins/example_plugin/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ export class ExamplePlugin extends Plugin<ExamplePluginConfig> {
const { tokenId, exitAtPage } = this.config;
let currentPage = 0;

for await (const page of this.blockchainClient.getBoxesByTokenId(tokenId)) {
for await (
const page of this.blockchainProvider.getBoxesByTokenId(tokenId)
) {
currentPage++;

this.logger.info(
Expand Down
10 changes: 10 additions & 0 deletions src/blockchain/_testing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { ErgomaticConfig } from "../config.ts";
import { BlockchainMonitor } from "./blockchain_monitor.ts";
import { BlockchainClient } from "./clients/mod.ts";

export function mkTestBlockchainMonitor(
config: ErgomaticConfig,
blockchainClient: BlockchainClient,
) {
return new BlockchainMonitor(config, blockchainClient);
}
15 changes: 0 additions & 15 deletions src/blockchain/blockchain_client.ts

This file was deleted.

160 changes: 160 additions & 0 deletions src/blockchain/blockchain_monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { SignedTransaction, TransactionId } from "@fleet-sdk/common";
import { Component } from "../component.ts";
import { ErgomaticConfig } from "../config.ts";
import { BlockchainClient } from "./clients/mod.ts";

export interface BlockchainSnapshot {
height: number;
mempool: SignedTransaction[];
}

interface MonitorState {
currentHeight: number;
/** map of txid -> bool indicating if mempool tx has been passed to plugins */
mempoolTxDelivery: Record<TransactionId, boolean>;
/** map of txid -> int indicating the number of re-checks */
mempoolTxChecks: Record<TransactionId, number>;
pastMempoolTxIds: TransactionId[];
lastPeerMsgTimestamp: number;
}

type MonitorEvent<T> = CustomEvent<[T, Readonly<BlockchainSnapshot>]>;

interface BlockchainMonitorEvent {
"monitor:mempool-tx": MonitorEvent<SignedTransaction>;
"monitor:mempool-tx-drop": MonitorEvent<SignedTransaction>;
"monitor:included-tx": MonitorEvent<SignedTransaction>;
"monitor:new-block": MonitorEvent<unknown>;
}

export class BlockchainMonitor extends Component<BlockchainMonitorEvent> {
readonly #blockchainClient: BlockchainClient;
readonly #pollIntervalMs: number;
readonly #maxMempoolTxChecks: number;
readonly #state: MonitorState;
#taskHandle?: number;

constructor(
config: ErgomaticConfig,
blockchainClient: BlockchainClient,
pollIntervalMs: number = 500,
maxMempoolTxChecks: number = 10,
) {
super(config, "BlockchainMonitor");

this.#pollIntervalMs = pollIntervalMs;
this.#maxMempoolTxChecks = maxMempoolTxChecks;
this.#blockchainClient = blockchainClient;
this.#state = {
currentHeight: 0,
mempoolTxDelivery: {},
mempoolTxChecks: {},
pastMempoolTxIds: [],
lastPeerMsgTimestamp: 0,
};
}

start(): Promise<void> {
// TODO: raise component:error event if monitor throws exception
this.#taskHandle = setInterval(() => this.#monitor(), this.#pollIntervalMs);

return super.start();
}

stop(): Promise<void> {
clearInterval(this.#taskHandle);

return super.stop();
}

async #monitor() {
this.logger.debug("Gathering blockchain state");

const { currentHeight, lastPeerMsgTimestamp } = await this.#blockchainClient
.getInfo();

if (lastPeerMsgTimestamp === this.#state.lastPeerMsgTimestamp) {
return;
}

this.#state.lastPeerMsgTimestamp = lastPeerMsgTimestamp!;

const mempool = [];
// the loop following this where we go through all the mempool txs could
// go in here as well but each time the `monitor:mempool-tx` event is raised
// it could provide a different mempool snapshot to plugins.
//
// instead, collect the full mempool first so the plugins can receive a full consistent snapshot.
for await (const page of this.#blockchainClient.getMempool()) {
mempool.push(...page);
}

const snapshot: Readonly<BlockchainSnapshot> = Object.freeze({
height: currentHeight,
mempool,
});

for (const tx of mempool) {
if (!this.#state.mempoolTxDelivery[tx.id]) {
this.#state.mempoolTxDelivery[tx.id] = true;

this.dispatchEvent(
new CustomEvent("monitor:mempool-tx", { detail: [tx, snapshot] }),
);
}

this.#state.pastMempoolTxIds = this.#state.pastMempoolTxIds.filter((
txId,
) => txId !== tx.id);

// remove txid from undefined state transactions map if present
// this resets the mempool drop detection counter for this txid
delete this.#state.mempoolTxChecks[tx.id];
}

// if tx was present in previous mempool, but not in the
// current, it may have been dropped or included in a block
for (const txId of this.#state.pastMempoolTxIds) {
this.#state.mempoolTxChecks[txId] =
(this.#state.mempoolTxChecks[txId] ?? 0) + 1;
}

this.#state.pastMempoolTxIds = mempool.map((tx) => tx.id);

if (currentHeight > this.#state.currentHeight) {
const newBlock = await this.#blockchainClient.getBlock(
currentHeight,
) as any;

this.dispatchEvent(
new CustomEvent("monitor:new-block", { detail: [newBlock, snapshot] }),
);

this.#state.currentHeight = currentHeight;

for (const tx of newBlock.blockTransactions) {
this.dispatchEvent(
new CustomEvent("monitor:included-tx", { detail: [tx, snapshot] }),
);

// stop tracking mempool delivery for this txid
delete this.#state.mempoolTxDelivery[tx.id];

// prevent `onMempoolTxDrop` event for this txid as
// it is now included in a block
delete this.#state.mempoolTxChecks[tx.id];
}
}

for (const txId of Object.keys(this.#state.mempoolTxChecks)) {
// if a tx is not included in a block in dropChecks * `n` seconds,
// then it's probably dropped from the mempool
if (this.#state.mempoolTxChecks[txId] > this.#maxMempoolTxChecks) {
// TODO raise mempool dropped, so we still need to keep track of txns not just the id?
Copy link
Member Author

@ross-weir ross-weir Aug 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@arobsn when a tx is dropped from mempool should we include the transaction when raising the event for plugins or do you think txId be enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I would go with the full transaction to keep hooks homogeneous and avoid more code on plugins side (IMO plugins must be as concise as possible to make auditing process easier).

delete this.#state.mempoolTxChecks[txId];
} else {
this.#state.mempoolTxChecks[txId] += 1;
}
}
}
}
31 changes: 0 additions & 31 deletions src/blockchain/blockchain_provider.ts

This file was deleted.

72 changes: 72 additions & 0 deletions src/blockchain/clients/blockchain_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {
AmountType,
Box,
SignedTransaction,
TokenId,
TransactionId,
} from "@fleet-sdk/common";
import { Component } from "../../component.ts";
import { ErgomaticConfig } from "../../config.ts";
import { ExplorerClient } from "./explorer.ts";
import { NodeClient } from "./node.ts";

/**
* RestrictedBlockchainClient is a blockchain client that is exclusively used by plugins.
* The API is intentionally restricted to prevent plugins issuing excessive API requests
* that are instead provided to plugins via a snapshot of the blockchain state.
*/
export interface RestrictedBlockchainClient {
submitTx(signedTx: SignedTransaction): Promise<TransactionId>;

getBoxesByTokenId<T extends AmountType = string>(
tokenId: TokenId,
): AsyncGenerator<Box<T>[]>;
}

export interface BlockchainInfo {
currentHeight: number;
lastPeerMsgTimestamp?: number;
}

export interface BlockchainClient extends RestrictedBlockchainClient {
getMempool(): AsyncGenerator<SignedTransaction[]>;

getInfo(): Promise<BlockchainInfo>;

getBlock(height: number): Promise<unknown>;
}

export class DefaultBlockchainClient extends Component
implements BlockchainClient {
readonly #explorer: BlockchainClient;
readonly #node: BlockchainClient;

constructor(config: ErgomaticConfig) {
super(config, "DefaultBlockchainProvider");

this.#explorer = new ExplorerClient(config);
this.#node = new NodeClient(config);
}

getBlock(height: number): Promise<unknown> {
return this.#node.getBlock(height);
}

getInfo(): Promise<BlockchainInfo> {
return this.#node.getInfo();
}

getMempool(): AsyncGenerator<SignedTransaction[]> {
return this.#node.getMempool();
}

submitTx(signedTx: SignedTransaction): Promise<TransactionId> {
return this.#node.submitTx(signedTx);
}

getBoxesByTokenId<T extends AmountType = string>(
tokenId: TokenId,
): AsyncGenerator<Box<T>[]> {
return this.#explorer.getBoxesByTokenId(tokenId);
}
}
39 changes: 39 additions & 0 deletions src/blockchain/clients/blockchain_provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import {
AmountType,
Box,
SignedTransaction,
TokenId,
TransactionId,
} from "@fleet-sdk/common";
import { Component } from "../../component.ts";
import { ErgomaticConfig } from "../../config.ts";
import {
BlockchainClient,
RestrictedBlockchainClient,
} from "./blockchain_client.ts";
import { ExplorerClient } from "./explorer.ts";
import { NodeClient } from "./node.ts";

/** Restricted blockchain client used by plugins */
export class BlockchainProvider extends Component
implements RestrictedBlockchainClient {
readonly #explorer: BlockchainClient;
readonly #node: BlockchainClient;

constructor(config: ErgomaticConfig) {
super(config, "BlockchainProvider");

this.#explorer = new ExplorerClient(config);
this.#node = new NodeClient(config);
}

submitTx(signedTx: SignedTransaction): Promise<TransactionId> {
return this.#node.submitTx(signedTx);
}

getBoxesByTokenId<T extends AmountType = string>(
tokenId: TokenId,
): AsyncGenerator<Box<T>[]> {
return this.#explorer.getBoxesByTokenId(tokenId);
}
}
19 changes: 17 additions & 2 deletions src/blockchain/clients/explorer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import {
TransactionId,
} from "@fleet-sdk/common";
import axios, { AxiosInstance } from "axios";
import { BlockchainClient } from "../blockchain_client.ts";
import { BlockchainClient, BlockchainInfo } from "./blockchain_client.ts";
import { ErgomaticConfig } from "../../config.ts";
import { Component } from "../../component.ts";

export class ExplorerClient implements BlockchainClient {
export class ExplorerClient extends Component implements BlockchainClient {
readonly #http: AxiosInstance;
#pageSize = 100;
#timeoutMs: number;

constructor(config: ErgomaticConfig, httpTimeoutMs: number = 10000) {
super(config, "ExplorerClient");

// axios timeout is incompatible with deno due to a missing nodejs API
// use signals for timeouts instead.
this.#timeoutMs = httpTimeoutMs;
Expand All @@ -24,6 +27,18 @@ export class ExplorerClient implements BlockchainClient {
});
}

getBlock(_height: number): Promise<unknown> {
throw new Error("Method not implemented.");
}

getInfo(): Promise<BlockchainInfo> {
throw new Error("Method not implemented.");
}

getMempool(): AsyncGenerator<SignedTransaction[]> {
throw new Error(`${this.name} does not support getMempool operation`);
}

async submitTx(signedTx: SignedTransaction): Promise<TransactionId> {
const response = await this.#http.post(
"/mempool/transactions/submit",
Expand Down
3 changes: 2 additions & 1 deletion src/blockchain/clients/mod.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./explorer.ts";
export * from "./blockchain_client.ts";
export * from "./blockchain_provider.ts";
Loading
Loading