diff --git a/.gitignore b/.gitignore index eadb0bb..0d1bd19 100644 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,6 @@ lerna-debug.log* # IDE - VSCode .vscode/* -!.vscode/settings.json !.vscode/tasks.json !.vscode/launch.json !.vscode/extensions.json diff --git a/config/dev.config.yaml b/config/dev.config.yaml index 5b885c0..b304d4a 100644 --- a/config/dev.config.yaml +++ b/config/dev.config.yaml @@ -3,7 +3,7 @@ db: synchronize: true app: port: 3000 - verbose: true + verbose: false debug: true network: regtest requestRetry: @@ -19,4 +19,3 @@ bitcoinCore: rpcPass: password rpcUser: admin rpcPort: 18443 - diff --git a/src/block-data-providers/base-block-data-provider.abstract.ts b/src/block-data-providers/base-block-data-provider.abstract.ts index a9894c0..f19be1b 100644 --- a/src/block-data-providers/base-block-data-provider.abstract.ts +++ b/src/block-data-providers/base-block-data-provider.abstract.ts @@ -1,51 +1,25 @@ import { OperationStateService } from '@/operation-state/operation-state.service'; -import { Logger, OnModuleInit } from '@nestjs/common'; +import { Logger } from '@nestjs/common'; import { IndexerService, TransactionInput, TransactionOutput, } from '@/indexer/indexer.service'; -import { TransactionsService } from '@/transactions/transactions.service'; -import { SchedulerRegistry } from '@nestjs/schedule'; -import { CronJob } from 'cron'; import { ConfigService } from '@nestjs/config'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { BlockState } from '@/block-state/block-state.entity'; -export interface BaseOperationState { - indexedBlockHeight: number; - indexedBlockHash: string; -} - -export abstract class BaseBlockDataProvider< - OperationState extends BaseOperationState, -> implements OnModuleInit -{ +export abstract class BaseBlockDataProvider { protected abstract readonly logger: Logger; - protected readonly cronJobName = 'providerSync'; - protected readonly schedulerInterval = '*/10 * * * * *'; - protected emptyHash = - '0000000000000000000000000000000000000000000000000000000000000000'; + protected abstract readonly operationStateKey: string; protected constructor( protected readonly configService: ConfigService, private readonly indexerService: IndexerService, private readonly operationStateService: OperationStateService, - private readonly transactionService: TransactionsService, - private readonly schedulerRegistry: SchedulerRegistry, + protected readonly blockStateService: BlockStateService, ) {} - onModuleInit() { - this.initiateCronJob(); - } - - abstract sync(): void; - - private initiateCronJob() { - const job = new CronJob(this.schedulerInterval, () => this.sync()); - - this.schedulerRegistry.addCronJob(this.cronJobName, job); - job.start(); - } - async indexTransaction( txid: string, vin: TransactionInput[], @@ -63,42 +37,45 @@ export abstract class BaseBlockDataProvider< } async getState(): Promise { - const state = - await this.operationStateService.getCurrentOperationState(); - return state as unknown as Promise; + return ( + await this.operationStateService.getOperationState( + this.operationStateKey, + ) + )?.state; } - async setState(futureState: Partial): Promise { - await this.operationStateService.setOperationState(futureState); + async setState( + state: OperationState, + blockState: BlockState, + ): Promise { + await this.operationStateService.setOperationState( + this.operationStateKey, + state, + ); + + await this.blockStateService.addBlockState(blockState); } abstract getBlockHash(height: number): Promise; async traceReorg(): Promise { - let state = await this.operationStateService.getCurrentOperationState(); + let state = await this.blockStateService.getCurrentBlockState(); - if (state.indexedBlockHash === this.emptyHash) { - return state.indexedBlockHeight; - } + if (!state) return null; - while (true) { - if (state === null) { - throw new Error('Reorgs levels deep'); - } + while (state) { + const fetchedBlockHash = await this.getBlockHash(state.blockHeight); - const fetchedBlockHash = await this.getBlockHash( - state.indexedBlockHeight, - ); + if (state.blockHash === fetchedBlockHash) return state.blockHeight; - if (state.indexedBlockHash === fetchedBlockHash) { - return state.indexedBlockHeight; - } + await this.blockStateService.removeState(state); - await this.transactionService.deleteTransactionByBlockHash( - state.indexedBlockHash, + this.logger.log( + `Reorg found at height: ${state.blockHeight}, Wrong hash: ${state.blockHash}, Correct hash: ${fetchedBlockHash}`, ); - - state = await this.operationStateService.dequeue_operation_state(); + state = await this.blockStateService.getCurrentBlockState(); } + + throw new Error('Cannot Reorgs, blockchain state exhausted'); } } diff --git a/src/block-data-providers/bitcoin-core/interfaces.ts b/src/block-data-providers/bitcoin-core/interfaces.ts index 23b3ee2..e219fa4 100644 --- a/src/block-data-providers/bitcoin-core/interfaces.ts +++ b/src/block-data-providers/bitcoin-core/interfaces.ts @@ -1,5 +1,4 @@ import { TransactionInput, TransactionOutput } from '@/indexer/indexer.service'; -import { BaseOperationState } from '@/block-data-providers/base-block-data-provider.abstract'; export interface Block { height: number; @@ -40,7 +39,9 @@ export interface Output { }; } -export type BitcoinCoreOperationState = BaseOperationState; +export type BitcoinCoreOperationState = { + indexedBlockHeight: number; +}; export type Transaction = { txid: string; diff --git a/src/block-data-providers/bitcoin-core/provider.spec.ts b/src/block-data-providers/bitcoin-core/provider.spec.ts index 380c3d1..fa9ab7e 100644 --- a/src/block-data-providers/bitcoin-core/provider.spec.ts +++ b/src/block-data-providers/bitcoin-core/provider.spec.ts @@ -12,8 +12,7 @@ import { rawTransactions, } from '@/block-data-providers/bitcoin-core/provider-fixtures'; import { Test, TestingModule } from '@nestjs/testing'; -import { TransactionsService } from '@/transactions/transactions.service'; -import { SchedulerRegistry } from '@nestjs/schedule'; +import { BlockStateService } from '@/block-state/block-state.service'; describe('Bitcoin Core Provider', () => { let provider: BitcoinCoreProvider; @@ -49,11 +48,7 @@ describe('Bitcoin Core Provider', () => { }, }, { - provide: TransactionsService, - useClass: jest.fn(), - }, - { - provide: SchedulerRegistry, + provide: BlockStateService, useClass: jest.fn(), }, ], diff --git a/src/block-data-providers/bitcoin-core/provider.ts b/src/block-data-providers/bitcoin-core/provider.ts index b31d785..b44bd16 100644 --- a/src/block-data-providers/bitcoin-core/provider.ts +++ b/src/block-data-providers/bitcoin-core/provider.ts @@ -7,7 +7,7 @@ import { SATS_PER_BTC, TAPROOT_ACTIVATION_HEIGHT, } from '@/common/constants'; -import { SchedulerRegistry } from '@nestjs/schedule'; +import { Cron, CronExpression } from '@nestjs/schedule'; import { IndexerService, TransactionInput, @@ -28,7 +28,7 @@ import { import { AxiosRequestConfig } from 'axios'; import * as currency from 'currency.js'; import { AxiosRetryConfig, makeRequest } from '@/common/request'; -import { TransactionsService } from '@/transactions/transactions.service'; +import { BlockStateService } from '@/block-state/block-state.service'; @Injectable() export class BitcoinCoreProvider @@ -45,15 +45,13 @@ export class BitcoinCoreProvider configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, - transactionService: TransactionsService, - schedulerRegistry: SchedulerRegistry, + blockStateService: BlockStateService, ) { super( configService, indexerService, operationStateService, - transactionService, - schedulerRegistry, + blockStateService, ); const { protocol, rpcPort, rpcHost } = @@ -75,19 +73,27 @@ export class BitcoinCoreProvider ); } else { this.logger.log('No previous state found. Starting from scratch.'); - const updatedState: BitcoinCoreOperationState = { - indexedBlockHash: this.emptyHash, - indexedBlockHeight: - this.configService.get('app.network') === - BitcoinNetwork.MAINNET - ? TAPROOT_ACTIVATION_HEIGHT - 1 - : 0, - }; - - await this.setState(updatedState); + + const blockHeight = + this.configService.get('app.network') === + BitcoinNetwork.MAINNET + ? TAPROOT_ACTIVATION_HEIGHT - 1 + : 0; + const blockHash = await this.getBlockHash(blockHeight); + + await this.setState( + { + indexedBlockHeight: blockHeight, + }, + { + blockHash, + blockHeight, + }, + ); } } + @Cron(CronExpression.EVERY_10_SECONDS) async sync() { if (this.isSyncing) return; this.isSyncing = true; @@ -112,7 +118,8 @@ export class BitcoinCoreProvider const networkInfo = await this.getNetworkInfo(); const verbosityLevel = this.versionToVerbosity(networkInfo.version); - let height = (await this.traceReorg()) + 1; + let height = + ((await this.traceReorg()) ?? state.indexedBlockHeight) + 1; for (height; height <= tipHeight; height++) { const [transactions, blockHash] = await this.processBlock( @@ -132,9 +139,10 @@ export class BitcoinCoreProvider ); } - await this.setState({ - indexedBlockHeight: height, - indexedBlockHash: blockHash, + state.indexedBlockHeight = height; + await this.setState(state, { + blockHash: blockHash, + blockHeight: height, }); } } finally { diff --git a/src/block-data-providers/block-provider.module.ts b/src/block-data-providers/block-provider.module.ts index f0b6f90..771ae1c 100644 --- a/src/block-data-providers/block-provider.module.ts +++ b/src/block-data-providers/block-provider.module.ts @@ -7,16 +7,15 @@ import { IndexerService } from '@/indexer/indexer.service'; import { ProviderType } from '@/common/enum'; import { BitcoinCoreProvider } from '@/block-data-providers/bitcoin-core/provider'; import { EsploraProvider } from '@/block-data-providers/esplora/provider'; -import { TransactionsService } from '@/transactions/transactions.service'; -import { TransactionsModule } from '@/transactions/transactions.module'; -import { SchedulerRegistry } from '@nestjs/schedule'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { BlockStateModule } from '@/block-state/block-state.module'; @Module({ imports: [ OperationStateModule, IndexerModule, ConfigModule, - TransactionsModule, + BlockStateModule, ], controllers: [], providers: [ @@ -26,15 +25,13 @@ import { SchedulerRegistry } from '@nestjs/schedule'; ConfigService, IndexerService, OperationStateService, - TransactionsService, - SchedulerRegistry, + BlockStateService, ], useFactory: ( configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, - transactionService: TransactionsService, - schedulerRegistry: SchedulerRegistry, + blockStateService: BlockStateService, ) => { switch (configService.get('providerType')) { case ProviderType.ESPLORA: @@ -42,16 +39,14 @@ import { SchedulerRegistry } from '@nestjs/schedule'; configService, indexerService, operationStateService, - transactionService, - schedulerRegistry, + blockStateService, ); case ProviderType.BITCOIN_CORE_RPC: return new BitcoinCoreProvider( configService, indexerService, operationStateService, - transactionService, - schedulerRegistry, + blockStateService, ); default: throw Error('unrecognised provider type in config'); diff --git a/src/block-data-providers/esplora/interface.ts b/src/block-data-providers/esplora/interface.ts index 53c45eb..438ef4d 100644 --- a/src/block-data-providers/esplora/interface.ts +++ b/src/block-data-providers/esplora/interface.ts @@ -1,10 +1,7 @@ -import { BaseOperationState } from '@/block-data-providers/base-block-data-provider.abstract'; - -export interface EsploraOperationState extends BaseOperationState { - providerState: { - currentBlockHeight: number; - lastProcessedTxIndex: number; - }; +export interface EsploraOperationState { + currentBlockHeight: number; + indexedBlockHeight: number; + lastProcessedTxIndex: number; } type EsploraTransactionInput = { diff --git a/src/block-data-providers/esplora/provider.ts b/src/block-data-providers/esplora/provider.ts index 49b3f6e..5dc36f5 100644 --- a/src/block-data-providers/esplora/provider.ts +++ b/src/block-data-providers/esplora/provider.ts @@ -11,8 +11,8 @@ import { EsploraTransaction, } from '@/block-data-providers/esplora/interface'; import { TAPROOT_ACTIVATION_HEIGHT } from '@/common/constants'; -import { TransactionsService } from '@/transactions/transactions.service'; -import { SchedulerRegistry } from '@nestjs/schedule'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { Cron, CronExpression } from '@nestjs/schedule'; @Injectable() export class EsploraProvider @@ -30,15 +30,13 @@ export class EsploraProvider configService: ConfigService, indexerService: IndexerService, operationStateService: OperationStateService, - transactionService: TransactionsService, - schedulerRegistry: SchedulerRegistry, + blockStateService: BlockStateService, ) { super( configService, indexerService, operationStateService, - transactionService, - schedulerRegistry, + blockStateService, ); this.batchSize = this.configService.get('esplora.batchSize'); @@ -73,22 +71,29 @@ export class EsploraProvider ); } else { this.logger.log('No previous state found. Starting from scratch.'); - const updatedState: EsploraOperationState = { - providerState: { + + const blockHeight = + this.configService.get('app.network') === + BitcoinNetwork.MAINNET + ? TAPROOT_ACTIVATION_HEIGHT - 1 + : 0; + const blockHash = await this.getBlockHash(blockHeight); + + await this.setState( + { currentBlockHeight: 0, - lastProcessedTxIndex: 0, + indexedBlockHeight: blockHeight, + lastProcessedTxIndex: 0, // we don't take coinbase txn into account }, - indexedBlockHash: this.emptyHash, - indexedBlockHeight: - this.configService.get('app.network') === - BitcoinNetwork.MAINNET - ? TAPROOT_ACTIVATION_HEIGHT - 1 - : 0, - }; - await this.setState(updatedState); + { + blockHash, + blockHeight, + }, + ); } } + @Cron(CronExpression.EVERY_10_SECONDS) async sync() { if (this.isSyncing) return; this.isSyncing = true; @@ -108,7 +113,8 @@ export class EsploraProvider return; } - let height = (await this.traceReorg()) + 1; + let height = + ((await this.traceReorg()) ?? state.indexedBlockHeight) + 1; for (height; height <= tipHeight; height++) { const blockHash = await this.getBlockHash(height); @@ -128,7 +134,7 @@ export class EsploraProvider const txids = await this.getTxidsForBlock(hash); for ( - let i = state.providerState.lastProcessedTxIndex + 1; + let i = state.lastProcessedTxIndex + 1; i < txids.length; i += this.batchSize ) { @@ -163,14 +169,11 @@ export class EsploraProvider }, this), ); - await this.setState({ - indexedBlockHeight: height, - indexedBlockHash: hash, - providerState: { - lastProcessedTxIndex: i + this.batchSize - 1, - currentBlockHeight: - state.providerState.currentBlockHeight, - }, + state.indexedBlockHeight = height; + state.lastProcessedTxIndex = i + this.batchSize - 1; + await this.setState(state, { + blockHeight: height, + blockHash: hash, }); } catch (error) { this.logger.error( diff --git a/src/block-state/block-state.entity.ts b/src/block-state/block-state.entity.ts new file mode 100644 index 0000000..f31a7d7 --- /dev/null +++ b/src/block-state/block-state.entity.ts @@ -0,0 +1,10 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +@Entity() +export class BlockState { + @PrimaryColumn('integer') + blockHeight: number; + + @Column('text') + blockHash: string; +} diff --git a/src/block-state/block-state.module.ts b/src/block-state/block-state.module.ts new file mode 100644 index 0000000..84baabe --- /dev/null +++ b/src/block-state/block-state.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { BlockState } from '@/block-state/block-state.entity'; +import { BlockStateService } from '@/block-state/block-state.service'; +import { TransactionsModule } from '@/transactions/transactions.module'; + +@Module({ + imports: [TypeOrmModule.forFeature([BlockState]), TransactionsModule], + controllers: [], + providers: [BlockStateService], + exports: [BlockStateService], +}) +export class BlockStateModule {} diff --git a/src/block-state/block-state.service.ts b/src/block-state/block-state.service.ts new file mode 100644 index 0000000..5874649 --- /dev/null +++ b/src/block-state/block-state.service.ts @@ -0,0 +1,36 @@ +import { Injectable } from '@nestjs/common'; +import { Repository } from 'typeorm'; +import { BlockState } from '@/block-state/block-state.entity'; +import { InjectRepository } from '@nestjs/typeorm'; +import { TransactionsService } from '@/transactions/transactions.service'; + +@Injectable() +export class BlockStateService { + constructor( + @InjectRepository(BlockState) + private readonly blockStateRepository: Repository, + private readonly transactionService: TransactionsService, + ) {} + + async getCurrentBlockState(): Promise { + return ( + await this.blockStateRepository.find({ + order: { + blockHeight: 'DESC', + }, + take: 1, + }) + )[0]; + } + + async addBlockState(state: BlockState): Promise { + await this.blockStateRepository.save(state); + } + + async removeState(state: BlockState): Promise { + await this.blockStateRepository.delete(state.blockHeight); + await this.transactionService.deleteTransactionByBlockHash( + state.blockHash, + ); + } +} diff --git a/src/main.ts b/src/main.ts index 68f5ccf..6488ce1 100644 --- a/src/main.ts +++ b/src/main.ts @@ -12,18 +12,12 @@ async function bootstrap() { const port = configService.get('app.port'); const isVerbose = configService.get('app.verbose') ?? false; - const isDebug = configService.get('app.debug') ?? false; const loggerLevels: LogLevel[] = ['error', 'warn', 'log']; - if (isVerbose) { - loggerLevels.push('verbose'); - } - - if (isDebug) { - loggerLevels.push('debug'); - } + if (isVerbose) loggerLevels.push('verbose'); + if (isDebug) loggerLevels.push('debug'); app.useLogger(loggerLevels); diff --git a/src/operation-state/operation-state.entity.ts b/src/operation-state/operation-state.entity.ts index 8cfcf3d..837ed41 100644 --- a/src/operation-state/operation-state.entity.ts +++ b/src/operation-state/operation-state.entity.ts @@ -1,16 +1,10 @@ import { Column, Entity, PrimaryColumn } from 'typeorm'; -import configuration from '@/configuration'; -const operationTableName = `${configuration()['providerType']}_operation_state`; - -@Entity(operationTableName) +@Entity() export class OperationState { - @PrimaryColumn('integer') - indexedBlockHeight: number; - - @Column('text') - indexedBlockHash: string; + @PrimaryColumn('text') + id: string; - @Column({ type: 'simple-json', nullable: true }) - providerState: unknown; + @Column('simple-json') + state: any; } diff --git a/src/operation-state/operation-state.service.ts b/src/operation-state/operation-state.service.ts index 1f6a531..554debb 100644 --- a/src/operation-state/operation-state.service.ts +++ b/src/operation-state/operation-state.service.ts @@ -5,70 +5,19 @@ import { InjectRepository } from '@nestjs/typeorm'; @Injectable() export class OperationStateService { - private cacheSize = 2016; // Define the maximum cache size - constructor( @InjectRepository(OperationState) private readonly operationStateRepository: Repository, ) {} - async getCurrentOperationState(): Promise { - const state = ( - await this.operationStateRepository.find({ - order: { - indexedBlockHeight: 'DESC', - }, - take: 1, - }) - )[0]; - - return state; + async getOperationState(id: string): Promise { + return this.operationStateRepository.findOneBy({ id: id }); } - async setOperationState(state: any): Promise { + async setOperationState(id: string, state: any): Promise { const operationState = new OperationState(); - operationState.indexedBlockHash = state.indexedBlockHash; - operationState.indexedBlockHeight = state.indexedBlockHeight; - operationState.providerState = state.providerState; - this.operationStateRepository.save(operationState); - - // Ensure the cache size does not exceed 2016 - await this.trimState(); - } - - // Remove and return the oldest item in the state cache - async dequeue_operation_state(): Promise { - const latest_state = ( - await this.operationStateRepository.find({ - order: { - indexedBlockHeight: 'DESC', - }, - take: 1, - }) - )[0]; - - if (latest_state) { - await this.operationStateRepository.remove(latest_state); - return latest_state; - } - - return null; - } - - private async trimState(): Promise { - const queueCount = await this.operationStateRepository.count(); - - if (queueCount > this.cacheSize) { - // Delete the oldest entries from the cache - const old_states = await this.operationStateRepository.find({ - order: { - indexedBlockHeight: 'ASC', - }, - take: queueCount - this.cacheSize, - }); - await this.operationStateRepository.delete( - old_states.map((state) => state.indexedBlockHeight), - ); - } + operationState.id = id; + operationState.state = state; + return this.operationStateRepository.save(operationState); } }