diff --git a/src/system.ts b/src/system.ts index fbe2728..c100bb4 100644 --- a/src/system.ts +++ b/src/system.ts @@ -50,16 +50,21 @@ import { NormalizedDataItem, PartialJsonTransaction, } from './types.js'; +import { createCron } from './utils/cron.js'; import { Ans104DataIndexer } from './workers/ans104-data-indexer.js'; import { Ans104Unbundler } from './workers/ans104-unbundler.js'; import { BlockImporter } from './workers/block-importer.js'; import { BundleRepairWorker } from './workers/bundle-repair-worker.js'; import { DataItemIndexer } from './workers/data-item-indexer.js'; import { FsCleanupWorker } from './workers/fs-cleanup-worker.js'; -import { DataPrefetcher } from './workers/prefetch-data.js'; +import { + PrefetchJobBody, + PrefetchJobReturnBody, +} from './workers/prefetch-data.js'; import { TransactionFetcher } from './workers/transaction-fetcher.js'; import { TransactionImporter } from './workers/transaction-importer.js'; import { TransactionRepairWorker } from './workers/transaction-repair-worker.js'; +import { WorkerThreadQueue } from './workers/worker-thead.js'; process.on('uncaughtException', (error) => { metrics.uncaughtExceptionCounter.inc(); @@ -264,19 +269,46 @@ const ans104Unbundler = new Ans104Unbundler({ workerCount: config.ANS104_UNBUNDLE_WORKERS, }); -const dataPrefetcher = new DataPrefetcher({ +const dataPrefetcher = new WorkerThreadQueue< + PrefetchJobBody, + PrefetchJobReturnBody +>({ log, - contiguousDataSource, + workerPath: './prefetch-data.js', // relative to ./workers, + workerCount: 5, +}); + +const maxPendingPrefetchJobs = 200; + +createCron('Limit block imports', '*/10 * * * * *', async () => { + if ( + dataPrefetcher.pendingJobs >= maxPendingPrefetchJobs && + blockImporter.running + ) + return await blockImporter + .stop() + .then((_) => void log.info('Pausing block import')); + if (!blockImporter.running) + return await blockImporter + .start() + .then((_) => void log.info('Resuming block import')); }); eventEmitter.on( events.TX_INDEXED, async (tx: MatchableItem & { data_size?: string }) => { - // download the tx - // await contiguousDataSource.getData(tx.id) + // check if the tx has any data const size = +(tx?.data_size ?? 0); if (!isNaN(size) && size !== 0) { - await dataPrefetcher.queuePrefetchData(tx); + // queue a worker to download the tx + const response = await dataPrefetcher + .queueWork({ id: tx.id }) + .catch((_) => undefined); + if (response) + // insert into index + await contiguousDataIndex.saveDataContentAttributes( + response as PrefetchJobReturnBody, + ); } }, ); diff --git a/src/workers/block-importer.ts b/src/workers/block-importer.ts index b91ae8f..f3c9f56 100644 --- a/src/workers/block-importer.ts +++ b/src/workers/block-importer.ts @@ -105,8 +105,9 @@ export class BlockImporter { if (height > this.startHeight) { // Retrieve expected previous block hash from the DB const previousHeight = height - 1; - const previousDbBlockHash = - await this.chainIndex.getBlockHashByHeight(previousHeight); + const previousDbBlockHash = await this.chainIndex.getBlockHashByHeight( + previousHeight, + ); if (previousDbBlockHash === undefined) { // If a gap is found, rewind the index to the last known block @@ -152,8 +153,9 @@ export class BlockImporter { } public async importBlock(height: number) { - const { block, txs, missingTxIds } = - await this.getBlockOrForkedBlock(height); + const { block, txs, missingTxIds } = await this.getBlockOrForkedBlock( + height, + ); // Emit sucessful fetch events this.eventEmitter.emit(events.BLOCK_FETCHED, block); @@ -245,6 +247,10 @@ export class BlockImporter { } } + get running() { + return this.shouldRun; + } + public async stop() { this.shouldRun = false; metrics.blockImporterRunningGauge.set(0); diff --git a/src/workers/prefetch-data.ts b/src/workers/prefetch-data.ts index 54f701a..963c461 100644 --- a/src/workers/prefetch-data.ts +++ b/src/workers/prefetch-data.ts @@ -1,66 +1,69 @@ -/* eslint-disable header/header */ -import { default as fastq } from 'fastq'; -import type { queueAsPromised } from 'fastq'; -import * as winston from 'winston'; +import { createHash } from 'crypto'; +import { pipeline } from 'stream/promises'; +import { isMainThread, parentPort } from 'worker_threads'; -import { ContiguousDataSource, MatchableItem } from '../types.js'; +import { GatewayDataSource } from '../data/gateway-data-source.js'; +import { currentUnixTimestamp } from '../lib/time.js'; +import logger from '../log.js'; +import { FsDataStore } from '../store/fs-data-store.js'; +import { WorkerMessage, wrapWorkerFn } from './worker-thead.js'; -const DEFAULT_WORKER_COUNT = 4; +export type PrefetchJobBody = { + id: string; +}; -export class DataPrefetcher { - // Dependencies - private log: winston.Logger; - private contiguousDataSource: ContiguousDataSource; - // private indexWriter: DataItemIndexWriter; +// seperate deps, as we can't load the DB. +const dataStore = new FsDataStore({ log: logger, baseDir: 'data/contiguous' }); +const dataSource = new GatewayDataSource({ + log: logger, + trustedGatewayUrl: 'https://arweave.net', +}); - // Data indexing queue - private queue: queueAsPromised; +export async function prefetchTransaction( + job: PrefetchJobBody, +): Promise { + const txId = job.id; + const log = logger.child({ worker: 'transaction-prefetch', txId: txId }); + const then = performance.now(); + log.verbose(`Prefetching ${txId}`); - constructor({ - log, - // indexWriter, - contiguousDataSource, - workerCount = DEFAULT_WORKER_COUNT, - }: { - log: winston.Logger; - // indexWriter: DataItemIndexWriter; - contiguousDataSource: ContiguousDataSource; - workerCount?: number; - }) { - this.log = log.child({ class: 'DataPrefetcher' }); - // this.indexWriter = indexWriter; - this.contiguousDataSource = contiguousDataSource; - - this.queue = fastq.promise(this.prefetchData.bind(this), workerCount); - } - - async queuePrefetchData(item: MatchableItem): Promise { - const log = this.log.child({ - method: 'queueDataItem', - id: item.id, - }); - log.debug('Queueing item for prefetching...'); - this.queue.push(item); - log.debug('Data item queued for prefetching.'); - } + const data = await dataSource.getData(txId); + const hasher = createHash('sha256'); + const cacheStream = await dataStore.createWriteStream(); + const dataStream = data.stream; + data.stream.on('data', (chunk) => { + hasher.update(chunk); + }); + data.stream.pause(); + await pipeline(dataStream, cacheStream); + const hash = hasher.digest('base64url'); + await dataStore.finalize(cacheStream, hash); + log.verbose( + `Prefetched ${txId} in ${((performance.now() - then) / 1000).toFixed(3)}s`, + ); + return { + id: txId, + dataRoot: undefined, + hash, + dataSize: data.size, + contentType: data.sourceContentType, + cachedAt: currentUnixTimestamp(), + }; +} - async prefetchData(item: MatchableItem): Promise { - const log = this.log.child({ - method: 'indexDataItem', - id: item.id, - }); +export type PrefetchJobReturnBody = { + id: string; + dataRoot?: string; + hash: string; + dataSize: number; + contentType?: string; + cachedAt?: number; +}; - try { - log.debug('Prefetching data item...'); - const res = await this.contiguousDataSource.getData(item.id); - const stream = res.stream; - // you have to consume the stream so it actually caches the item fully. - for await (const _ of stream) { - true; // void it - } - log.debug('Data item prefetched.'); - } catch (error) { - log.error('Failed to prefetch data item data:', error); - } - } +if (!isMainThread) { + parentPort?.on('message', (msg: WorkerMessage) => + wrapWorkerFn(prefetchTransaction, msg), + ); } + +// export default prefetchTransaction; diff --git a/src/workers/worker-thead.ts b/src/workers/worker-thead.ts new file mode 100644 index 0000000..e30ec94 --- /dev/null +++ b/src/workers/worker-thead.ts @@ -0,0 +1,283 @@ +import { MessagePort, Worker, parentPort } from 'node:worker_threads'; +import * as winston from 'winston'; + +export enum WorkerEventName { + LOG_MESSAGE, + RETURN_DATA, + ERROR, +} + +export type WorkerMessage = { + eventName: WorkerEventName; + id: number; + data: JobData; +}; + +type WorkerJob = { + resolve: (value: any) => void; + reject: (reason?: any) => void; + data: JobData; + id: number; +}; + +export class WorkerThreadQueue { + protected log: winston.Logger; + + protected workers: { takeWork: () => void }[] = []; + protected workQueue: WorkerJob[] = []; + + constructor({ + log, + workerCount = 1, + workerPath, + workerData = undefined, + tasksPerWorker = 5, + }: { + log: winston.Logger; + workerCount?: number; + workerData?: any; + workerPath: string; + tasksPerWorker?: number; + }) { + this.log = log; + const self = this; + + function spawn() { + const workerUrl = new URL(workerPath, import.meta.url); + const worker = new Worker(workerUrl, { + workerData: workerData, + }); + // let jobId = 0; + + let jobs: WorkerJob[] = []; // Current item from the queue + let error: any = null; // Error that caused the worker to crash + let job: WorkerJob | null = null; + + function takeWork() { + // send jobs to the worker + while (jobs.length !== tasksPerWorker && self.workQueue.length) { + const job = self.workQueue.shift()!; + const pos = jobs.push(job) - 1; + job.id = pos; + const msg: WorkerMessage = { + id: pos, + eventName: WorkerEventName.LOG_MESSAGE, + data: job.data, + }; + worker.postMessage(msg); + } + } + + worker + .on('online', () => { + self.workers.push({ takeWork }); + takeWork(); + }) + .on('message', (message: WorkerMessage) => { + switch (message?.eventName) { + case WorkerEventName.LOG_MESSAGE: { + const logData = message.data; + log.info(logData); + break; + } + case WorkerEventName.RETURN_DATA: { + const data = message.data as ReturnData; + job = jobs.splice(message.id, 1)?.[0]; + job?.resolve(data); + job = null; + break; + } + case WorkerEventName.ERROR: { + job = jobs.splice(message.id, 1)?.[0]; + job?.reject(message.data); + job = null; + break; + } + default: + if (message?.data?.id) { + job = jobs.splice(message.id, 1)?.[0]; + job?.reject(message.data); + job = null; + break; + } + // very problematic, so we terminate + jobs = []; + worker.terminate(); + break; + } + takeWork(); // Check if there's more work to do + }) + .on('error', (err) => { + self.log.error('Worker error', err); + error = err; + }) + .on('exit', (code) => { + self.workers = self.workers.filter( + (w: any) => w.takeWork !== takeWork, + ); + if (job) { + job.reject(error || new Error('worker died')); + } + if (code !== 0) { + self.log.error('Worker stopped with exit code ' + code, { + exitCode: code, + }); + spawn(); // Worker died, so spawn a new one + } + }); + } + + for (let i = 0; i < workerCount; i++) { + spawn(); + } + } + + get pendingJobs() { + return this.workQueue.length; + } + + drainQueue() { + for (const worker of this.workers) { + worker.takeWork(); + } + } + + queueWork(data: JobData): Promise { + return new Promise((resolve, reject) => { + this.workQueue.push({ + resolve, + reject, + data, + id: 0, + }); + this.drainQueue(); + }); + } +} + +export type ParentPort = MessagePort | null; + +export function returnData( + parentPort: ParentPort, + id: number, + data: ReturnData, +): void { + const returnData: WorkerMessage = { + eventName: WorkerEventName.RETURN_DATA, + id, + data, + }; + parentPort?.postMessage(returnData); +} + +export function returnError( + parentPort: ParentPort, + id: number, + data: ErrorData, +): void { + const errorData: WorkerMessage = { + eventName: WorkerEventName.ERROR, + id, + data, + }; + parentPort?.postMessage(errorData); +} + +export async function wrapWorkerFn( + wrapped: (job: JobData) => ReturnData | Promise, + msg: WorkerMessage, +): Promise { + try { + returnData(parentPort, msg.id, await wrapped(msg.data)); + } catch (e: any) { + returnError(parentPort, msg.id, e?.stack ?? e); + } +} + +// if (!isMainThread) { +// const filter = createFilter(JSON.parse(workerData.dataItemIndexFilterString)); +// parentPort?.on('message', async (message: any) => { +// const { rootTxId, parentId, parentIndex, bundlePath } = message; +// let stream: fs.ReadStream | undefined = undefined; +// try { +// stream = fs.createReadStream(bundlePath); +// const iterable = await processStream(stream); +// const bundleLength = iterable.length; +// let matchedItemCount = 0; + +// const fnLog = log.child({ rootTxId, parentId, bundleLength }); +// fnLog.info('Unbundling ANS-104 bundle stream data items...'); + +// const processedDataItemIds = new Set(); +// for await (const [index, dataItem] of iterable.entries()) { +// const diLog = fnLog.child({ +// dataItemId: dataItem.id, +// dataItemIndex: index, +// }); +// diLog.info('Processing data item...'); + +// if (!dataItem.id) { +// diLog.warn('Skipping data item with missing ID.'); +// continue; +// } + +// if (processedDataItemIds.has(dataItem.id)) { +// diLog.warn('Skipping duplicate data item ID.'); +// continue; +// } + +// if (!dataItem.dataOffset) { +// diLog.warn('Skipping data item with missing data offset.'); +// } + +// const normalizedDataItem = normalizeAns104DataItem({ +// rootTxId: rootTxId as string, +// parentId: parentId as string, +// parentIndex: parentIndex as number, +// index: index as number, +// filter: workerData.dataItemIndexFilterString, +// ans104DataItem: dataItem as Record, +// }); + +// if (await filter.match(normalizedDataItem)) { +// matchedItemCount++; +// parentPort?.postMessage({ +// eventName: DATA_ITEM_MATCHED, +// dataItem: normalizedDataItem, +// }); +// } +// } +// parentPort?.postMessage({ +// eventName: UNBUNDLE_COMPLETE, +// parentId: parentId as string, +// itemCount: bundleLength, +// matchedItemCount, +// }); +// } catch (error: any) { +// log.error('Error unbundling ANS-104 bundle stream', { +// message: error?.message, +// stack: error?.stack, +// }); +// parentPort?.postMessage({ eventName: 'unbundle-error' }); +// } finally { +// try { +// await fsPromises.unlink(bundlePath); +// } catch (error: any) { +// log.error('Error deleting ANS-104 bundle temporary file', { +// message: error?.message, +// stack: error?.stack, +// }); +// } +// if (stream !== undefined) { +// try { +// stream.destroy(); +// } catch (error: any) { +// log.error('Error destroying ANS-104 bundle temporary file stream', { +// message: error?.message, +// stack: error?.stack, +// }); +// } +// } +// } +// }); +// }