Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ✨ Worker threads #4

Merged
merged 1 commit into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,21 @@ import {
NormalizedDataItem,
PartialJsonTransaction,
} from './types.js';
import { createCron } from './utils/cron.js';
import { Ans104DataIndexer } from './workers/ans104-data-indexer.js';
import { Ans104Unbundler } from './workers/ans104-unbundler.js';
import { BlockImporter } from './workers/block-importer.js';
import { BundleRepairWorker } from './workers/bundle-repair-worker.js';
import { DataItemIndexer } from './workers/data-item-indexer.js';
import { FsCleanupWorker } from './workers/fs-cleanup-worker.js';
import { DataPrefetcher } from './workers/prefetch-data.js';
import {
PrefetchJobBody,
PrefetchJobReturnBody,
} from './workers/prefetch-data.js';
import { TransactionFetcher } from './workers/transaction-fetcher.js';
import { TransactionImporter } from './workers/transaction-importer.js';
import { TransactionRepairWorker } from './workers/transaction-repair-worker.js';
import { WorkerThreadQueue } from './workers/worker-thead.js';

process.on('uncaughtException', (error) => {
metrics.uncaughtExceptionCounter.inc();
Expand Down Expand Up @@ -264,19 +269,46 @@ const ans104Unbundler = new Ans104Unbundler({
workerCount: config.ANS104_UNBUNDLE_WORKERS,
});

const dataPrefetcher = new DataPrefetcher({
const dataPrefetcher = new WorkerThreadQueue<
PrefetchJobBody,
PrefetchJobReturnBody
>({
log,
contiguousDataSource,
workerPath: './prefetch-data.js', // relative to ./workers,
workerCount: 5,
});

const maxPendingPrefetchJobs = 200;

createCron('Limit block imports', '*/10 * * * * *', async () => {
if (
dataPrefetcher.pendingJobs >= maxPendingPrefetchJobs &&
blockImporter.running
)
return await blockImporter
.stop()
.then((_) => void log.info('Pausing block import'));
if (!blockImporter.running)
return await blockImporter
.start()
.then((_) => void log.info('Resuming block import'));
});

eventEmitter.on(
events.TX_INDEXED,
async (tx: MatchableItem & { data_size?: string }) => {
// download the tx
// await contiguousDataSource.getData(tx.id)
// check if the tx has any data
const size = +(tx?.data_size ?? 0);
if (!isNaN(size) && size !== 0) {
await dataPrefetcher.queuePrefetchData(tx);
// queue a worker to download the tx
const response = await dataPrefetcher
.queueWork({ id: tx.id })
.catch((_) => undefined);
if (response)
// insert into index
await contiguousDataIndex.saveDataContentAttributes(
response as PrefetchJobReturnBody,
);
}
},
);
Expand Down
14 changes: 10 additions & 4 deletions src/workers/block-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ export class BlockImporter {
if (height > this.startHeight) {
// Retrieve expected previous block hash from the DB
const previousHeight = height - 1;
const previousDbBlockHash =
await this.chainIndex.getBlockHashByHeight(previousHeight);
const previousDbBlockHash = await this.chainIndex.getBlockHashByHeight(
previousHeight,
);

if (previousDbBlockHash === undefined) {
// If a gap is found, rewind the index to the last known block
Expand Down Expand Up @@ -152,8 +153,9 @@ export class BlockImporter {
}

public async importBlock(height: number) {
const { block, txs, missingTxIds } =
await this.getBlockOrForkedBlock(height);
const { block, txs, missingTxIds } = await this.getBlockOrForkedBlock(
height,
);

// Emit sucessful fetch events
this.eventEmitter.emit(events.BLOCK_FETCHED, block);
Expand Down Expand Up @@ -245,6 +247,10 @@ export class BlockImporter {
}
}

get running() {
return this.shouldRun;
}

public async stop() {
this.shouldRun = false;
metrics.blockImporterRunningGauge.set(0);
Expand Down
119 changes: 61 additions & 58 deletions src/workers/prefetch-data.ts
Original file line number Diff line number Diff line change
@@ -1,66 +1,69 @@
/* eslint-disable header/header */
import { default as fastq } from 'fastq';
import type { queueAsPromised } from 'fastq';
import * as winston from 'winston';
import { createHash } from 'crypto';
import { pipeline } from 'stream/promises';
import { isMainThread, parentPort } from 'worker_threads';

import { ContiguousDataSource, MatchableItem } from '../types.js';
import { GatewayDataSource } from '../data/gateway-data-source.js';
import { currentUnixTimestamp } from '../lib/time.js';
import logger from '../log.js';
import { FsDataStore } from '../store/fs-data-store.js';
import { WorkerMessage, wrapWorkerFn } from './worker-thead.js';

const DEFAULT_WORKER_COUNT = 4;
export type PrefetchJobBody = {
id: string;
};

export class DataPrefetcher {
// Dependencies
private log: winston.Logger;
private contiguousDataSource: ContiguousDataSource;
// private indexWriter: DataItemIndexWriter;
// seperate deps, as we can't load the DB.
const dataStore = new FsDataStore({ log: logger, baseDir: 'data/contiguous' });
const dataSource = new GatewayDataSource({
log: logger,
trustedGatewayUrl: 'https://arweave.net',
});

// Data indexing queue
private queue: queueAsPromised<MatchableItem, void>;
export async function prefetchTransaction(
job: PrefetchJobBody,
): Promise<PrefetchJobReturnBody> {
const txId = job.id;
const log = logger.child({ worker: 'transaction-prefetch', txId: txId });
const then = performance.now();
log.verbose(`Prefetching ${txId}`);

constructor({
log,
// indexWriter,
contiguousDataSource,
workerCount = DEFAULT_WORKER_COUNT,
}: {
log: winston.Logger;
// indexWriter: DataItemIndexWriter;
contiguousDataSource: ContiguousDataSource;
workerCount?: number;
}) {
this.log = log.child({ class: 'DataPrefetcher' });
// this.indexWriter = indexWriter;
this.contiguousDataSource = contiguousDataSource;

this.queue = fastq.promise(this.prefetchData.bind(this), workerCount);
}

async queuePrefetchData(item: MatchableItem): Promise<void> {
const log = this.log.child({
method: 'queueDataItem',
id: item.id,
});
log.debug('Queueing item for prefetching...');
this.queue.push(item);
log.debug('Data item queued for prefetching.');
}
const data = await dataSource.getData(txId);
const hasher = createHash('sha256');
const cacheStream = await dataStore.createWriteStream();
const dataStream = data.stream;
data.stream.on('data', (chunk) => {
hasher.update(chunk);
});
data.stream.pause();
await pipeline(dataStream, cacheStream);
const hash = hasher.digest('base64url');
await dataStore.finalize(cacheStream, hash);
log.verbose(
`Prefetched ${txId} in ${((performance.now() - then) / 1000).toFixed(3)}s`,
);
return {
id: txId,
dataRoot: undefined,
hash,
dataSize: data.size,
contentType: data.sourceContentType,
cachedAt: currentUnixTimestamp(),
};
}

async prefetchData(item: MatchableItem): Promise<void> {
const log = this.log.child({
method: 'indexDataItem',
id: item.id,
});
export type PrefetchJobReturnBody = {
id: string;
dataRoot?: string;
hash: string;
dataSize: number;
contentType?: string;
cachedAt?: number;
};

try {
log.debug('Prefetching data item...');
const res = await this.contiguousDataSource.getData(item.id);
const stream = res.stream;
// you have to consume the stream so it actually caches the item fully.
for await (const _ of stream) {
true; // void it
}
log.debug('Data item prefetched.');
} catch (error) {
log.error('Failed to prefetch data item data:', error);
}
}
if (!isMainThread) {
parentPort?.on('message', (msg: WorkerMessage<PrefetchJobBody>) =>
wrapWorkerFn(prefetchTransaction, msg),
);
}

// export default prefetchTransaction;
Loading