Skip to content

Commit

Permalink
Merge pull request #15 from nautls/blockchain-monitor-component
Browse files Browse the repository at this point in the history
Blockchain monitor component
  • Loading branch information
ross-weir authored Aug 22, 2023
2 parents ef39490 + 174cd42 commit cfe47f4
Show file tree
Hide file tree
Showing 17 changed files with 541 additions and 85 deletions.
4 changes: 3 additions & 1 deletion plugins/example_plugin/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ export class ExamplePlugin extends Plugin<ExamplePluginConfig> {
const { tokenId, exitAtPage } = this.config;
let currentPage = 0;

for await (const page of this.blockchainClient.getBoxesByTokenId(tokenId)) {
for await (
const page of this.blockchainProvider.getBoxesByTokenId(tokenId)
) {
currentPage++;

this.logger.info(
Expand Down
10 changes: 10 additions & 0 deletions src/blockchain/_testing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { ErgomaticConfig } from "../config.ts";
import { BlockchainMonitor } from "./blockchain_monitor.ts";
import { BlockchainClient } from "./clients/mod.ts";

export function mkTestBlockchainMonitor(
config: ErgomaticConfig,
blockchainClient: BlockchainClient,
) {
return new BlockchainMonitor(config, blockchainClient);
}
15 changes: 0 additions & 15 deletions src/blockchain/blockchain_client.ts

This file was deleted.

160 changes: 160 additions & 0 deletions src/blockchain/blockchain_monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { SignedTransaction, TransactionId } from "@fleet-sdk/common";
import { Component } from "../component.ts";
import { ErgomaticConfig } from "../config.ts";
import { BlockchainClient } from "./clients/mod.ts";

export interface BlockchainSnapshot {
height: number;
mempool: SignedTransaction[];
}

interface MonitorState {
currentHeight: number;
/** map of txid -> bool indicating if mempool tx has been passed to plugins */
mempoolTxDelivery: Record<TransactionId, boolean>;
/** map of txid -> int indicating the number of re-checks */
mempoolTxChecks: Record<TransactionId, number>;
pastMempoolTxIds: TransactionId[];
lastPeerMsgTimestamp: number;
}

type MonitorEvent<T> = CustomEvent<[T, Readonly<BlockchainSnapshot>]>;

interface BlockchainMonitorEvent {
"monitor:mempool-tx": MonitorEvent<SignedTransaction>;
"monitor:mempool-tx-drop": MonitorEvent<SignedTransaction>;
"monitor:included-tx": MonitorEvent<SignedTransaction>;
"monitor:new-block": MonitorEvent<unknown>;
}

export class BlockchainMonitor extends Component<BlockchainMonitorEvent> {
readonly #blockchainClient: BlockchainClient;
readonly #pollIntervalMs: number;
readonly #maxMempoolTxChecks: number;
readonly #state: MonitorState;
#taskHandle?: number;

constructor(
config: ErgomaticConfig,
blockchainClient: BlockchainClient,
pollIntervalMs: number = 500,
maxMempoolTxChecks: number = 10,
) {
super(config, "BlockchainMonitor");

this.#pollIntervalMs = pollIntervalMs;
this.#maxMempoolTxChecks = maxMempoolTxChecks;
this.#blockchainClient = blockchainClient;
this.#state = {
currentHeight: 0,
mempoolTxDelivery: {},
mempoolTxChecks: {},
pastMempoolTxIds: [],
lastPeerMsgTimestamp: 0,
};
}

start(): Promise<void> {
// TODO: raise component:error event if monitor throws exception
this.#taskHandle = setInterval(() => this.#monitor(), this.#pollIntervalMs);

return super.start();
}

stop(): Promise<void> {
clearInterval(this.#taskHandle);

return super.stop();
}

async #monitor() {
this.logger.debug("Gathering blockchain state");

const { currentHeight, lastPeerMsgTimestamp } = await this.#blockchainClient
.getInfo();

if (lastPeerMsgTimestamp === this.#state.lastPeerMsgTimestamp) {
return;
}

this.#state.lastPeerMsgTimestamp = lastPeerMsgTimestamp!;

const mempool = [];
// the loop following this where we go through all the mempool txs could
// go in here as well but each time the `monitor:mempool-tx` event is raised
// it could provide a different mempool snapshot to plugins.
//
// instead, collect the full mempool first so the plugins can receive a full consistent snapshot.
for await (const page of this.#blockchainClient.getMempool()) {
mempool.push(...page);
}

const snapshot: Readonly<BlockchainSnapshot> = Object.freeze({
height: currentHeight,
mempool,
});

for (const tx of mempool) {
if (!this.#state.mempoolTxDelivery[tx.id]) {
this.#state.mempoolTxDelivery[tx.id] = true;

this.dispatchEvent(
new CustomEvent("monitor:mempool-tx", { detail: [tx, snapshot] }),
);
}

this.#state.pastMempoolTxIds = this.#state.pastMempoolTxIds.filter((
txId,
) => txId !== tx.id);

// remove txid from undefined state transactions map if present
// this resets the mempool drop detection counter for this txid
delete this.#state.mempoolTxChecks[tx.id];
}

// if tx was present in previous mempool, but not in the
// current, it may have been dropped or included in a block
for (const txId of this.#state.pastMempoolTxIds) {
this.#state.mempoolTxChecks[txId] =
(this.#state.mempoolTxChecks[txId] ?? 0) + 1;
}

this.#state.pastMempoolTxIds = mempool.map((tx) => tx.id);

if (currentHeight > this.#state.currentHeight) {
const newBlock = await this.#blockchainClient.getBlock(
currentHeight,
) as any;

this.dispatchEvent(
new CustomEvent("monitor:new-block", { detail: [newBlock, snapshot] }),
);

this.#state.currentHeight = currentHeight;

for (const tx of newBlock.blockTransactions) {
this.dispatchEvent(
new CustomEvent("monitor:included-tx", { detail: [tx, snapshot] }),
);

// stop tracking mempool delivery for this txid
delete this.#state.mempoolTxDelivery[tx.id];

// prevent `onMempoolTxDrop` event for this txid as
// it is now included in a block
delete this.#state.mempoolTxChecks[tx.id];
}
}

for (const txId of Object.keys(this.#state.mempoolTxChecks)) {
// if a tx is not included in a block in dropChecks * `n` seconds,
// then it's probably dropped from the mempool
if (this.#state.mempoolTxChecks[txId] > this.#maxMempoolTxChecks) {
// TODO raise mempool dropped, so we still need to keep track of txns not just the id?
delete this.#state.mempoolTxChecks[txId];
} else {
this.#state.mempoolTxChecks[txId] += 1;
}
}
}
}
31 changes: 0 additions & 31 deletions src/blockchain/blockchain_provider.ts

This file was deleted.

72 changes: 72 additions & 0 deletions src/blockchain/clients/blockchain_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import {
AmountType,
Box,
SignedTransaction,
TokenId,
TransactionId,
} from "@fleet-sdk/common";
import { Component } from "../../component.ts";
import { ErgomaticConfig } from "../../config.ts";
import { ExplorerClient } from "./explorer.ts";
import { NodeClient } from "./node.ts";

/**
* RestrictedBlockchainClient is a blockchain client that is exclusively used by plugins.
* The API is intentionally restricted to prevent plugins issuing excessive API requests
* that are instead provided to plugins via a snapshot of the blockchain state.
*/
export interface RestrictedBlockchainClient {
submitTx(signedTx: SignedTransaction): Promise<TransactionId>;

getBoxesByTokenId<T extends AmountType = string>(
tokenId: TokenId,
): AsyncGenerator<Box<T>[]>;
}

export interface BlockchainInfo {
currentHeight: number;
lastPeerMsgTimestamp?: number;
}

export interface BlockchainClient extends RestrictedBlockchainClient {
getMempool(): AsyncGenerator<SignedTransaction[]>;

getInfo(): Promise<BlockchainInfo>;

getBlock(height: number): Promise<unknown>;
}

export class DefaultBlockchainClient extends Component
implements BlockchainClient {
readonly #explorer: BlockchainClient;
readonly #node: BlockchainClient;

constructor(config: ErgomaticConfig) {
super(config, "DefaultBlockchainProvider");

this.#explorer = new ExplorerClient(config);
this.#node = new NodeClient(config);
}

getBlock(height: number): Promise<unknown> {
return this.#node.getBlock(height);
}

getInfo(): Promise<BlockchainInfo> {
return this.#node.getInfo();
}

getMempool(): AsyncGenerator<SignedTransaction[]> {
return this.#node.getMempool();
}

submitTx(signedTx: SignedTransaction): Promise<TransactionId> {
return this.#node.submitTx(signedTx);
}

getBoxesByTokenId<T extends AmountType = string>(
tokenId: TokenId,
): AsyncGenerator<Box<T>[]> {
return this.#explorer.getBoxesByTokenId(tokenId);
}
}
39 changes: 39 additions & 0 deletions src/blockchain/clients/blockchain_provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import {
AmountType,
Box,
SignedTransaction,
TokenId,
TransactionId,
} from "@fleet-sdk/common";
import { Component } from "../../component.ts";
import { ErgomaticConfig } from "../../config.ts";
import {
BlockchainClient,
RestrictedBlockchainClient,
} from "./blockchain_client.ts";
import { ExplorerClient } from "./explorer.ts";
import { NodeClient } from "./node.ts";

/** Restricted blockchain client used by plugins */
export class BlockchainProvider extends Component
implements RestrictedBlockchainClient {
readonly #explorer: BlockchainClient;
readonly #node: BlockchainClient;

constructor(config: ErgomaticConfig) {
super(config, "BlockchainProvider");

this.#explorer = new ExplorerClient(config);
this.#node = new NodeClient(config);
}

submitTx(signedTx: SignedTransaction): Promise<TransactionId> {
return this.#node.submitTx(signedTx);
}

getBoxesByTokenId<T extends AmountType = string>(
tokenId: TokenId,
): AsyncGenerator<Box<T>[]> {
return this.#explorer.getBoxesByTokenId(tokenId);
}
}
19 changes: 17 additions & 2 deletions src/blockchain/clients/explorer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import {
TransactionId,
} from "@fleet-sdk/common";
import axios, { AxiosInstance } from "axios";
import { BlockchainClient } from "../blockchain_client.ts";
import { BlockchainClient, BlockchainInfo } from "./blockchain_client.ts";
import { ErgomaticConfig } from "../../config.ts";
import { Component } from "../../component.ts";

export class ExplorerClient implements BlockchainClient {
export class ExplorerClient extends Component implements BlockchainClient {
readonly #http: AxiosInstance;
#pageSize = 100;
#timeoutMs: number;

constructor(config: ErgomaticConfig, httpTimeoutMs: number = 10000) {
super(config, "ExplorerClient");

// axios timeout is incompatible with deno due to a missing nodejs API
// use signals for timeouts instead.
this.#timeoutMs = httpTimeoutMs;
Expand All @@ -24,6 +27,18 @@ export class ExplorerClient implements BlockchainClient {
});
}

getBlock(_height: number): Promise<unknown> {
throw new Error("Method not implemented.");
}

getInfo(): Promise<BlockchainInfo> {
throw new Error("Method not implemented.");
}

getMempool(): AsyncGenerator<SignedTransaction[]> {
throw new Error(`${this.name} does not support getMempool operation`);
}

async submitTx(signedTx: SignedTransaction): Promise<TransactionId> {
const response = await this.#http.post(
"/mempool/transactions/submit",
Expand Down
3 changes: 2 additions & 1 deletion src/blockchain/clients/mod.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from "./explorer.ts";
export * from "./blockchain_client.ts";
export * from "./blockchain_provider.ts";
Loading

0 comments on commit cfe47f4

Please sign in to comment.