diff --git a/src/system.ts b/src/system.ts index 26e8de6..fb0a09b 100644 --- a/src/system.ts +++ b/src/system.ts @@ -58,6 +58,7 @@ import { FsCleanupWorker } from './workers/fs-cleanup-worker.js'; import { TransactionFetcher } from './workers/transaction-fetcher.js'; import { TransactionImporter } from './workers/transaction-importer.js'; import { TransactionRepairWorker } from './workers/transaction-repair-worker.js'; +import { DataPrefetcher } from './workers/prefetch-data.js'; process.on('uncaughtException', (error) => { metrics.uncaughtExceptionCounter.inc(); @@ -233,6 +234,23 @@ const ans104Unbundler = new Ans104Unbundler({ workerCount: config.ANS104_UNBUNDLE_WORKERS, }); +const dataPrefetcher = new DataPrefetcher({ + log, + contiguousDataSource, +}); + +eventEmitter.on( + events.TX_INDEXED, + async (tx: MatchableItem & { data_size?: string }) => { + // download the tx + // await contiguousDataSource.getData(tx.id) + const size = +(tx?.data_size ?? 0); + if (!isNaN(size) && size !== 0) { + await dataPrefetcher.queuePrefetchData(tx); + } + }, +); + eventEmitter.on( events.ANS104_BUNDLE_INDEXED, async (item: NormalizedDataItem | PartialJsonTransaction) => { diff --git a/src/workers/prefetch-data.ts b/src/workers/prefetch-data.ts new file mode 100644 index 0000000..54f701a --- /dev/null +++ b/src/workers/prefetch-data.ts @@ -0,0 +1,66 @@ +/* eslint-disable header/header */ +import { default as fastq } from 'fastq'; +import type { queueAsPromised } from 'fastq'; +import * as winston from 'winston'; + +import { ContiguousDataSource, MatchableItem } from '../types.js'; + +const DEFAULT_WORKER_COUNT = 4; + +export class DataPrefetcher { + // Dependencies + private log: winston.Logger; + private contiguousDataSource: ContiguousDataSource; + // private indexWriter: DataItemIndexWriter; + + // Data indexing queue + private queue: queueAsPromised; + + constructor({ + log, + // indexWriter, + contiguousDataSource, + workerCount = DEFAULT_WORKER_COUNT, + }: { + log: winston.Logger; + // indexWriter: DataItemIndexWriter; + contiguousDataSource: ContiguousDataSource; + workerCount?: number; + }) { + this.log = log.child({ class: 'DataPrefetcher' }); + // this.indexWriter = indexWriter; + this.contiguousDataSource = contiguousDataSource; + + this.queue = fastq.promise(this.prefetchData.bind(this), workerCount); + } + + async queuePrefetchData(item: MatchableItem): Promise { + const log = this.log.child({ + method: 'queueDataItem', + id: item.id, + }); + log.debug('Queueing item for prefetching...'); + this.queue.push(item); + log.debug('Data item queued for prefetching.'); + } + + async prefetchData(item: MatchableItem): Promise { + const log = this.log.child({ + method: 'indexDataItem', + id: item.id, + }); + + try { + log.debug('Prefetching data item...'); + const res = await this.contiguousDataSource.getData(item.id); + const stream = res.stream; + // you have to consume the stream so it actually caches the item fully. + for await (const _ of stream) { + true; // void it + } + log.debug('Data item prefetched.'); + } catch (error) { + log.error('Failed to prefetch data item data:', error); + } + } +} diff --git a/wipe.sh b/wipe.sh new file mode 100755 index 0000000..6ff9e88 --- /dev/null +++ b/wipe.sh @@ -0,0 +1,8 @@ +for i in `find ./data -maxdepth 1 -mindepth 1 -type d ` # Finds all the subfolders and loop. +do + echo $i + sudo rm -rf ${i} + mkdir ${i} +done + +yarn db:migrate up \ No newline at end of file