Skip to content

Commit

Permalink
Merge pull request #2 from Irys-xyz/feat/observability
Browse files Browse the repository at this point in the history
Feat/observability
  • Loading branch information
JesseTheRobot authored Dec 18, 2023
2 parents fb110ab + 2299c54 commit b8eaebc
Show file tree
Hide file tree
Showing 14 changed files with 627 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
"eqeqeq": 2,
"jest-formatting/padding-around-describe-blocks": 2,
"jest-formatting/padding-around-test-blocks": 2,
"header/header": [2, "./resources/license.header.js"],
// "header/header": [2, "./resources/license.header.js"],
"mocha/max-top-level-suites": "off",
"mocha/no-exports": "off",
"mocha/no-mocha-arrows": "off",
Expand Down
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
"arweave": "^1.14.4",
"axios": "^1.6.2",
"better-sqlite3": "^7.5.1",
"check-disk-space": "^3.4.0",
"cors": "^2.8.5",
"cron": "^3.1.6",
"crypto": "^1.0.1",
"dotenv": "^16.3.1",
"express": "^4.18.1",
Expand Down Expand Up @@ -47,6 +49,7 @@
"devDependencies": {
"@swc/core": "^1.3.100",
"@trivago/prettier-plugin-sort-imports": "^4.3.0",
"@types/async-retry": "^1.4.8",
"@types/better-sqlite3": "^7.5.0",
"@types/chai": "^4.3.11",
"@types/chai-as-promised": "^7.1.8",
Expand All @@ -65,6 +68,7 @@
"@types/swagger-ui-express": "^4.1.3",
"@typescript-eslint/eslint-plugin": "^5.26.0",
"@typescript-eslint/parser": "^5.26.0",
"async-retry": "^1.3.3",
"c8": "^8.0.1",
"chai": "^4.3.10",
"chai-as-promised": "^7.1.1",
Expand All @@ -82,7 +86,7 @@
"msgpack-lite": "^0.1.26",
"nodemon": "^2.0.18",
"npm-check-updates": "^16.14.11",
"prettier": "^3.1.1",
"prettier": "^2.8.8",
"rimraf": "^5.0.5",
"sinon": "^17.0.1",
"sinon-chai": "^3.7.0",
Expand Down
4 changes: 4 additions & 0 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ import { dataRouter } from './routes/data/index.js';
import { apolloServer } from './routes/graphql/index.js';
import { openApiRouter } from './routes/openapi.js';
import * as system from './system.js';
import { createCron } from './utils/cron.js';
import { runHealthCheck } from './utils/healthcheck.js';

system.arweaveClient.refreshPeers();

system.headerFsCacheCleanupWorker?.start();

createCron('Healthchecks', '0 */1 * * * *', runHealthCheck);

// Allow starting without writers to support SQLite replication
if (config.START_WRITERS) {
system.blockImporter.start();
Expand Down
17 changes: 16 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import dotenv from 'dotenv';
import { canonicalize } from 'json-canonicalize';
import crypto from 'node:crypto';
import { hostname } from 'node:os';

import { ONE_DAY } from './constants.js';
import { createFilter } from './filters.js';
import * as env from './lib/env.js';
import log from './log.js';
Expand Down Expand Up @@ -83,7 +85,7 @@ export const CONTRACT_ID = env.varOrDefault(
'CONTRACT_ID',
'bLAgYxAdX2Ry-nt6aH2ixgvJXbpsEYm28NgJgyqfs-U',
);
export const CHAIN_CACHE_TYPE = env.varOrDefault('CHAIN_CACHE_TYPE', 'lmdb');
export const CHAIN_CACHE_TYPE = env.varOrDefault('CHAIN_CACHE_TYPE', 'fs');
export const REDIS_CACHE_URL = env.varOrDefault(
'REDIS_CACHE_URL',
'redis://localhost:6379',
Expand All @@ -94,3 +96,16 @@ export const REDIS_CACHE_TTL_SECONDS = +env.varOrDefault(
);
export const ENABLE_FS_HEADER_CACHE_CLEANUP =
env.varOrDefault('ENABLE_FS_HEADER_CACHE_CLEANUP', 'false') === 'true';

export const WEBHOOK_URL = env.varOrUndefined('WEBHOOK_URL');
export const PING_ID = env.varOrUndefined('PING_ID');
export const PDUTY_ROUTING_KEY = env.varOrUndefined('PDUTY_ROUTING_KEY');
export const NODE_NAME = env.varOrDefault('NODE_NAME', hostname());
export const FS_FREE_SPACE_MIN = +env.varOrDefault(
'FS_FREE_SPACE_MIN',
'200000000000', // 200GB
);
export const HIGH_LOAD_CRITICAL =
env.varOrDefault('HIGH_LOAD_CRITICAL', 'true') === 'true';

export const MAX_TMP_FS_AGE = +env.varOrDefault('MAX_TMP_FS_AGE', `${ONE_DAY}`);
2 changes: 2 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const ONE_HOUR = 60 * 60 * 1000;
export const ONE_DAY = 24 * ONE_HOUR;
2 changes: 1 addition & 1 deletion src/log.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import { createLogger, format, transports } from 'winston';

import * as env from './lib/env.js';

const LOG_LEVEL = env.varOrDefault('LOG_LEVEL', 'info');
const LOG_LEVEL = env.varOrDefault('LOG_LEVEL', 'debug');
const LOG_ALL_STACKTRACES =
env.varOrDefault('LOG_ALL_STACKTRACES', 'false') === 'true';
const LOG_FORMAT = env.varOrDefault('LOG_FORMAT', 'simple');
Expand Down
34 changes: 32 additions & 2 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
import { default as Arweave } from 'arweave';
import EventEmitter from 'node:events';
import { promises } from 'node:fs';

import { ArweaveCompositeClient } from './arweave/composite-client.js';
import * as config from './config.js';
Expand All @@ -28,8 +29,8 @@ import { TxChunksDataSource } from './data/tx-chunks-data-source.js';
import { StandaloneSqliteDatabase } from './database/standalone-sqlite.js';
import * as events from './events.js';
import { MatchTags } from './filters.js';
import { UniformFailureSimulator } from './lib/chaos.js';
import { makeBlockStore, makeTxStore } from './init/header-stores.js';
import { UniformFailureSimulator } from './lib/chaos.js';
import { currentUnixTimestamp } from './lib/time.js';
import log from './log.js';
import * as metrics from './metrics.js';
Expand All @@ -55,10 +56,10 @@ import { BlockImporter } from './workers/block-importer.js';
import { BundleRepairWorker } from './workers/bundle-repair-worker.js';
import { DataItemIndexer } from './workers/data-item-indexer.js';
import { FsCleanupWorker } from './workers/fs-cleanup-worker.js';
import { DataPrefetcher } from './workers/prefetch-data.js';
import { TransactionFetcher } from './workers/transaction-fetcher.js';
import { TransactionImporter } from './workers/transaction-importer.js';
import { TransactionRepairWorker } from './workers/transaction-repair-worker.js';
import { DataPrefetcher } from './workers/prefetch-data.js';

process.on('uncaughtException', (error) => {
metrics.uncaughtExceptionCounter.inc();
Expand Down Expand Up @@ -127,6 +128,35 @@ export const headerFsCacheCleanupWorker = config.ENABLE_FS_HEADER_CACHE_CLEANUP
})
: undefined;

// export const contiguousDataCacheDataCleanupWorker = new FsCleanupWorker({
// log,
// basePath: 'data/contiguous/data',
// shouldDelete: async (path) => {
// // stat it
// const stat = await promises.stat(path);
// if (stat.birthtimeMs) {
// }
// },
// });

export const contiguousDataCacheTmpCleanupWorker = new FsCleanupWorker({
log,
basePath: 'data/contiguous/tmp',
shouldDelete: async (path) => {
const stat = await promises.stat(path);
return Date.now() - stat.mtimeMs > config.MAX_TMP_FS_AGE;
},
});

export const generalTmpCleanupWorker = new FsCleanupWorker({
log,
basePath: 'data/tmp',
shouldDelete: async (path) => {
const stat = await promises.stat(path);
return Date.now() - stat.mtimeMs > config.MAX_TMP_FS_AGE;
},
});

const ans104TxMatcher = new MatchTags([
{ name: 'Bundle-Format', value: 'binary' },
{ name: 'Bundle-Version', valueStartsWith: '2.' },
Expand Down
5 changes: 5 additions & 0 deletions src/test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { contiguousDataCacheTmpCleanupWorker } from './system.js';

const batch = await contiguousDataCacheTmpCleanupWorker.getBatch('data', null);

console.log(batch);
30 changes: 30 additions & 0 deletions src/utils/cron.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { CronJob } from 'cron';

import logger from '../log.js';

export function createCron(
name: string,
time: string,
fn: () => Promise<void>,
): void {
let jobLocked = false;
try {
new CronJob(
time,
async function () {
if (!jobLocked) {
jobLocked = true;
await fn().catch((e) =>
logger.error(`[CRON] Error occurred while doing ${name} - ${e}`),
);
jobLocked = false;
}
},
null,
true,
);
} catch (e: any) {
logger.error(`[PROCESS] Error occurred while creating cron ${name} - ${e}`);
process.exit(1);
}
}
131 changes: 131 additions & 0 deletions src/utils/healthcheck.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import checkDiskSpace from 'check-disk-space';

import { FS_FREE_SPACE_MIN } from '../config.js';
import logger from '../log.js';
import { checkResourceUtilisation } from './resourceUtil.js';
import sendWebhookMessage from './webhook.js';

export async function runHealthCheck(): Promise<void> {
logger.info('Running health check');

// const lastNotified = +(await redisClient.get(REDIS_LAST_NOTIFIED_KEY));
// if (!isNaN(lastNotified) && Date.now() - lastNotified < 1.8e6) return;

await checkFsFreeSpace();

await checkResourceUtilisation();
// const lastSeeded = +(await redisClient.get(REDIS_LAST_SEEDED_KEY));
// if (isNaN(lastSeeded)) return;
// const now = Date.now();

// if (
// lastSeeded &&
// now - lastSeeded > 1.44e7 &&
// (await bundleQueue.getDelayedCount()) !== 0
// ) {
// sendWebhookMessage({
// name: 'No Seeding',
// value: 'Nothing has seeded for last 4 hours',
// ping: true,
// critical: true,
// });
// }

// const balance = new BigNumber(
// await arweave.wallets.getBalance(currencies.arweave.account.address),
// );
// const arReserveBytes = AR_RESERVE_BYTES;
// const required = new BigNumber(
// await redisClient.get(REDIS_PRICE_KEY),
// ).multipliedBy(arReserveBytes);
// logger.verbose(`[runHealthCheck:balance] Current Balance: ${balance}`);
// const percentOfRequired = balance.dividedBy(required);
// if (balance.isLessThanOrEqualTo(required))
// sendWebhookMessage({
// name: 'Running Low on AR',
// value: `Account: ${currencies.arweave.account.address}\n Current: ${balance} - cost for ${arReserveBytes} Bytes - ${required}`,
// ping: true,
// critical: percentOfRequired.isLessThanOrEqualTo(0.7),
// });

// const completedJobs = await bundleQueue.getJobs(['completed'], 0, 1000, true);
// const averageAttempts =
// completedJobs.reduce((total, next) => total + next.attemptsMade, 0) /
// completedJobs.length;

// if (averageAttempts > 10)
// sendWebhookMessage({
// name: 'Average Bundlr Queue Attempts High',
// value: `Current Average Attempts : ${averageAttempts}`,
// ping: true,
// critical: false,
// });

// const executorQueueWaitingCount = await bundleExecutorQueue.getWaitingCount();
// if (executorQueueWaitingCount >= 50) {
// sendWebhookMessage({
// name: 'Bundle Executor queue waiting count high',
// value: `Current executor jobs waiting: ${executorQueueWaitingCount}`,
// ping: true,
// critical: executorQueueWaitingCount >= 100,
// });
// }

// checkOtherQueue();

// await redisClient.set(REDIS_LAST_NOTIFIED_KEY, Date.now().toString());
return;
}

// export async function devnetHealthCheck(): Promise<void> {
// logger.info('[runHealthCheck] Running health check');

// const lastNotified = +(await redisClient.get(REDIS_LAST_NOTIFIED_KEY));
// if (!isNaN(lastNotified) && Date.now() - lastNotified < 1.8e6) return;

// await checkFsFreeSpace();
// await checkS3Queue();

// await redisClient.set(REDIS_LAST_NOTIFIED_KEY, Date.now().toString());
// return;
// }

// export async function checkS3Queue(): Promise<void> {
// const s3QueueWaitingCount = await s3Queue.getWaitingCount();
// logger.verbose(
// `[checkS3Queue] Queue has ${s3QueueWaitingCount} waiting jobs.`,
// );
// if (s3QueueWaitingCount > 1000)
// sendWebhookMessage({
// name: 'S3 queue waiting count high',
// value: `Current S3 jobs waiting ${s3QueueWaitingCount}`,
// ping: true,
// critical: s3QueueWaitingCount >= 2000,
// });
// }

// export async function checkOtherQueue(): Promise<void> {
// const otherQueueWaitingCount = await otherQueue.getWaitingCount();
// if (otherQueueWaitingCount > 10_000)
// sendWebhookMessage({
// name: 'Other queue waiting count high',
// value: `Current other queue jobs waiting ${otherQueueWaitingCount}`,
// ping: true,
// critical: otherQueueWaitingCount >= 50_000,
// });
// }

export async function checkFsFreeSpace(): Promise<void> {
const free = await checkDiskSpace('/')
.then((r) => r.free)
.catch((e) => logger.error(`Error checking free space:\n ${e}`));
const threshold = FS_FREE_SPACE_MIN;
if (typeof free === 'number' && free < threshold) {
sendWebhookMessage({
name: 'Running low on free space',
value: `Free space (${free}) < threshold (${threshold})`,
ping: true,
critical: true,
});
}
}
Loading

0 comments on commit b8eaebc

Please sign in to comment.