Skip to content

Commit

Permalink
Merge pull request #1 from Irys-xyz/feat/tx-data-prefetch
Browse files Browse the repository at this point in the history
feat: ✨ workers: add prefetch-data worker
  • Loading branch information
joshbenaron authored Dec 17, 2023
2 parents 4617281 + 2287a9e commit f58da8c
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 0 deletions.
18 changes: 18 additions & 0 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import { FsCleanupWorker } from './workers/fs-cleanup-worker.js';
import { TransactionFetcher } from './workers/transaction-fetcher.js';
import { TransactionImporter } from './workers/transaction-importer.js';
import { TransactionRepairWorker } from './workers/transaction-repair-worker.js';
import { DataPrefetcher } from './workers/prefetch-data.js';

process.on('uncaughtException', (error) => {
metrics.uncaughtExceptionCounter.inc();
Expand Down Expand Up @@ -233,6 +234,23 @@ const ans104Unbundler = new Ans104Unbundler({
workerCount: config.ANS104_UNBUNDLE_WORKERS,
});

const dataPrefetcher = new DataPrefetcher({
log,
contiguousDataSource,
});

eventEmitter.on(
events.TX_INDEXED,
async (tx: MatchableItem & { data_size?: string }) => {
// download the tx
// await contiguousDataSource.getData(tx.id)
const size = +(tx?.data_size ?? 0);
if (!isNaN(size) && size !== 0) {
await dataPrefetcher.queuePrefetchData(tx);
}
},
);

eventEmitter.on(
events.ANS104_BUNDLE_INDEXED,
async (item: NormalizedDataItem | PartialJsonTransaction) => {
Expand Down
66 changes: 66 additions & 0 deletions src/workers/prefetch-data.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/* eslint-disable header/header */
import { default as fastq } from 'fastq';
import type { queueAsPromised } from 'fastq';
import * as winston from 'winston';

import { ContiguousDataSource, MatchableItem } from '../types.js';

const DEFAULT_WORKER_COUNT = 4;

export class DataPrefetcher {
// Dependencies
private log: winston.Logger;
private contiguousDataSource: ContiguousDataSource;
// private indexWriter: DataItemIndexWriter;

// Data indexing queue
private queue: queueAsPromised<MatchableItem, void>;

constructor({
log,
// indexWriter,
contiguousDataSource,
workerCount = DEFAULT_WORKER_COUNT,
}: {
log: winston.Logger;
// indexWriter: DataItemIndexWriter;
contiguousDataSource: ContiguousDataSource;
workerCount?: number;
}) {
this.log = log.child({ class: 'DataPrefetcher' });
// this.indexWriter = indexWriter;
this.contiguousDataSource = contiguousDataSource;

this.queue = fastq.promise(this.prefetchData.bind(this), workerCount);
}

async queuePrefetchData(item: MatchableItem): Promise<void> {
const log = this.log.child({
method: 'queueDataItem',
id: item.id,
});
log.debug('Queueing item for prefetching...');
this.queue.push(item);
log.debug('Data item queued for prefetching.');
}

async prefetchData(item: MatchableItem): Promise<void> {
const log = this.log.child({
method: 'indexDataItem',
id: item.id,
});

try {
log.debug('Prefetching data item...');
const res = await this.contiguousDataSource.getData(item.id);
const stream = res.stream;
// you have to consume the stream so it actually caches the item fully.
for await (const _ of stream) {
true; // void it
}
log.debug('Data item prefetched.');
} catch (error) {
log.error('Failed to prefetch data item data:', error);
}
}
}
8 changes: 8 additions & 0 deletions wipe.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
for i in `find ./data -maxdepth 1 -mindepth 1 -type d ` # Finds all the subfolders and loop.
do
echo $i
sudo rm -rf ${i}
mkdir ${i}
done

yarn db:migrate up

0 comments on commit f58da8c

Please sign in to comment.