Skip to content

Commit

Permalink
Merge pull request #4 from Irys-xyz/feat/worker-thread
Browse files Browse the repository at this point in the history
feat: ✨ Worker threads
  • Loading branch information
JesseTheRobot authored Dec 20, 2023
2 parents 25c15d8 + 9d9f537 commit 623590f
Show file tree
Hide file tree
Showing 4 changed files with 392 additions and 68 deletions.
44 changes: 38 additions & 6 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,21 @@ import {
NormalizedDataItem,
PartialJsonTransaction,
} from './types.js';
import { createCron } from './utils/cron.js';
import { Ans104DataIndexer } from './workers/ans104-data-indexer.js';
import { Ans104Unbundler } from './workers/ans104-unbundler.js';
import { BlockImporter } from './workers/block-importer.js';
import { BundleRepairWorker } from './workers/bundle-repair-worker.js';
import { DataItemIndexer } from './workers/data-item-indexer.js';
import { FsCleanupWorker } from './workers/fs-cleanup-worker.js';
import { DataPrefetcher } from './workers/prefetch-data.js';
import {
PrefetchJobBody,
PrefetchJobReturnBody,
} from './workers/prefetch-data.js';
import { TransactionFetcher } from './workers/transaction-fetcher.js';
import { TransactionImporter } from './workers/transaction-importer.js';
import { TransactionRepairWorker } from './workers/transaction-repair-worker.js';
import { WorkerThreadQueue } from './workers/worker-thead.js';

process.on('uncaughtException', (error) => {
metrics.uncaughtExceptionCounter.inc();
Expand Down Expand Up @@ -264,19 +269,46 @@ const ans104Unbundler = new Ans104Unbundler({
workerCount: config.ANS104_UNBUNDLE_WORKERS,
});

const dataPrefetcher = new DataPrefetcher({
const dataPrefetcher = new WorkerThreadQueue<
PrefetchJobBody,
PrefetchJobReturnBody
>({
log,
contiguousDataSource,
workerPath: './prefetch-data.js', // relative to ./workers,
workerCount: 5,
});

const maxPendingPrefetchJobs = 200;

createCron('Limit block imports', '*/10 * * * * *', async () => {
if (
dataPrefetcher.pendingJobs >= maxPendingPrefetchJobs &&
blockImporter.running
)
return await blockImporter
.stop()
.then((_) => void log.info('Pausing block import'));
if (!blockImporter.running)
return await blockImporter
.start()
.then((_) => void log.info('Resuming block import'));
});

eventEmitter.on(
events.TX_INDEXED,
async (tx: MatchableItem & { data_size?: string }) => {
// download the tx
// await contiguousDataSource.getData(tx.id)
// check if the tx has any data
const size = +(tx?.data_size ?? 0);
if (!isNaN(size) && size !== 0) {
await dataPrefetcher.queuePrefetchData(tx);
// queue a worker to download the tx
const response = await dataPrefetcher
.queueWork({ id: tx.id })
.catch((_) => undefined);
if (response)
// insert into index
await contiguousDataIndex.saveDataContentAttributes(
response as PrefetchJobReturnBody,
);
}
},
);
Expand Down
14 changes: 10 additions & 4 deletions src/workers/block-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ export class BlockImporter {
if (height > this.startHeight) {
// Retrieve expected previous block hash from the DB
const previousHeight = height - 1;
const previousDbBlockHash =
await this.chainIndex.getBlockHashByHeight(previousHeight);
const previousDbBlockHash = await this.chainIndex.getBlockHashByHeight(
previousHeight,
);

if (previousDbBlockHash === undefined) {
// If a gap is found, rewind the index to the last known block
Expand Down Expand Up @@ -152,8 +153,9 @@ export class BlockImporter {
}

public async importBlock(height: number) {
const { block, txs, missingTxIds } =
await this.getBlockOrForkedBlock(height);
const { block, txs, missingTxIds } = await this.getBlockOrForkedBlock(
height,
);

// Emit sucessful fetch events
this.eventEmitter.emit(events.BLOCK_FETCHED, block);
Expand Down Expand Up @@ -245,6 +247,10 @@ export class BlockImporter {
}
}

get running() {
return this.shouldRun;
}

public async stop() {
this.shouldRun = false;
metrics.blockImporterRunningGauge.set(0);
Expand Down
119 changes: 61 additions & 58 deletions src/workers/prefetch-data.ts
Original file line number Diff line number Diff line change
@@ -1,66 +1,69 @@
/* eslint-disable header/header */
import { default as fastq } from 'fastq';
import type { queueAsPromised } from 'fastq';
import * as winston from 'winston';
import { createHash } from 'crypto';
import { pipeline } from 'stream/promises';
import { isMainThread, parentPort } from 'worker_threads';

import { ContiguousDataSource, MatchableItem } from '../types.js';
import { GatewayDataSource } from '../data/gateway-data-source.js';
import { currentUnixTimestamp } from '../lib/time.js';
import logger from '../log.js';
import { FsDataStore } from '../store/fs-data-store.js';
import { WorkerMessage, wrapWorkerFn } from './worker-thead.js';

const DEFAULT_WORKER_COUNT = 4;
export type PrefetchJobBody = {
id: string;
};

export class DataPrefetcher {
// Dependencies
private log: winston.Logger;
private contiguousDataSource: ContiguousDataSource;
// private indexWriter: DataItemIndexWriter;
// seperate deps, as we can't load the DB.
const dataStore = new FsDataStore({ log: logger, baseDir: 'data/contiguous' });
const dataSource = new GatewayDataSource({
log: logger,
trustedGatewayUrl: 'https://arweave.net',
});

// Data indexing queue
private queue: queueAsPromised<MatchableItem, void>;
export async function prefetchTransaction(
job: PrefetchJobBody,
): Promise<PrefetchJobReturnBody> {
const txId = job.id;
const log = logger.child({ worker: 'transaction-prefetch', txId: txId });
const then = performance.now();
log.verbose(`Prefetching ${txId}`);

constructor({
log,
// indexWriter,
contiguousDataSource,
workerCount = DEFAULT_WORKER_COUNT,
}: {
log: winston.Logger;
// indexWriter: DataItemIndexWriter;
contiguousDataSource: ContiguousDataSource;
workerCount?: number;
}) {
this.log = log.child({ class: 'DataPrefetcher' });
// this.indexWriter = indexWriter;
this.contiguousDataSource = contiguousDataSource;

this.queue = fastq.promise(this.prefetchData.bind(this), workerCount);
}

async queuePrefetchData(item: MatchableItem): Promise<void> {
const log = this.log.child({
method: 'queueDataItem',
id: item.id,
});
log.debug('Queueing item for prefetching...');
this.queue.push(item);
log.debug('Data item queued for prefetching.');
}
const data = await dataSource.getData(txId);
const hasher = createHash('sha256');
const cacheStream = await dataStore.createWriteStream();
const dataStream = data.stream;
data.stream.on('data', (chunk) => {
hasher.update(chunk);
});
data.stream.pause();
await pipeline(dataStream, cacheStream);
const hash = hasher.digest('base64url');
await dataStore.finalize(cacheStream, hash);
log.verbose(
`Prefetched ${txId} in ${((performance.now() - then) / 1000).toFixed(3)}s`,
);
return {
id: txId,
dataRoot: undefined,
hash,
dataSize: data.size,
contentType: data.sourceContentType,
cachedAt: currentUnixTimestamp(),
};
}

async prefetchData(item: MatchableItem): Promise<void> {
const log = this.log.child({
method: 'indexDataItem',
id: item.id,
});
export type PrefetchJobReturnBody = {
id: string;
dataRoot?: string;
hash: string;
dataSize: number;
contentType?: string;
cachedAt?: number;
};

try {
log.debug('Prefetching data item...');
const res = await this.contiguousDataSource.getData(item.id);
const stream = res.stream;
// you have to consume the stream so it actually caches the item fully.
for await (const _ of stream) {
true; // void it
}
log.debug('Data item prefetched.');
} catch (error) {
log.error('Failed to prefetch data item data:', error);
}
}
if (!isMainThread) {
parentPort?.on('message', (msg: WorkerMessage<PrefetchJobBody>) =>
wrapWorkerFn(prefetchTransaction, msg),
);
}

// export default prefetchTransaction;
Loading

0 comments on commit 623590f

Please sign in to comment.