From 8f5a2fab9ef955dcd0b2e71a13119d51f2ae0e95 Mon Sep 17 00:00:00 2001 From: JesseTheRobot Date: Mon, 18 Dec 2023 01:18:34 +0000 Subject: [PATCH] feat: :sparkles: misc changes --- .eslintrc | 2 +- package.json | 6 +- src/config.ts | 17 +++- src/constants.ts | 2 + src/log.ts | 2 +- src/system.ts | 34 ++++++- src/test.ts | 5 + src/utils/cron.ts | 30 ++++++ src/utils/healthcheck.ts | 131 ++++++++++++++++++++++++ src/utils/resourceUtil.ts | 123 ++++++++++++++++++++++ src/utils/webhook.ts | 170 +++++++++++++++++++++++++++++++ src/workers/fs-cleanup-worker.ts | 89 +++++++++++----- yarn.lock | 45 +++++++- 13 files changed, 618 insertions(+), 38 deletions(-) create mode 100644 src/constants.ts create mode 100644 src/test.ts create mode 100644 src/utils/cron.ts create mode 100644 src/utils/healthcheck.ts create mode 100644 src/utils/resourceUtil.ts create mode 100644 src/utils/webhook.ts diff --git a/.eslintrc b/.eslintrc index 7d97b57..2b35d04 100644 --- a/.eslintrc +++ b/.eslintrc @@ -33,7 +33,7 @@ "eqeqeq": 2, "jest-formatting/padding-around-describe-blocks": 2, "jest-formatting/padding-around-test-blocks": 2, - "header/header": [2, "./resources/license.header.js"], + // "header/header": [2, "./resources/license.header.js"], "mocha/max-top-level-suites": "off", "mocha/no-exports": "off", "mocha/no-mocha-arrows": "off", diff --git a/package.json b/package.json index 3e38b8e..89859b1 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,9 @@ "arweave": "^1.14.4", "axios": "^1.6.2", "better-sqlite3": "^7.5.1", + "check-disk-space": "^3.4.0", "cors": "^2.8.5", + "cron": "^3.1.6", "crypto": "^1.0.1", "dotenv": "^16.3.1", "express": "^4.18.1", @@ -47,6 +49,7 @@ "devDependencies": { "@swc/core": "^1.3.100", "@trivago/prettier-plugin-sort-imports": "^4.3.0", + "@types/async-retry": "^1.4.8", "@types/better-sqlite3": "^7.5.0", "@types/chai": "^4.3.11", "@types/chai-as-promised": "^7.1.8", @@ -65,6 +68,7 @@ "@types/swagger-ui-express": "^4.1.3", "@typescript-eslint/eslint-plugin": "^5.26.0", "@typescript-eslint/parser": "^5.26.0", + "async-retry": "^1.3.3", "c8": "^8.0.1", "chai": "^4.3.10", "chai-as-promised": "^7.1.1", @@ -82,7 +86,7 @@ "msgpack-lite": "^0.1.26", "nodemon": "^2.0.18", "npm-check-updates": "^16.14.11", - "prettier": "^3.1.1", + "prettier": "^2.8.8", "rimraf": "^5.0.5", "sinon": "^17.0.1", "sinon-chai": "^3.7.0", diff --git a/src/config.ts b/src/config.ts index 117adfe..c5449fd 100644 --- a/src/config.ts +++ b/src/config.ts @@ -18,7 +18,9 @@ import dotenv from 'dotenv'; import { canonicalize } from 'json-canonicalize'; import crypto from 'node:crypto'; +import { hostname } from 'node:os'; +import { ONE_DAY } from './constants.js'; import { createFilter } from './filters.js'; import * as env from './lib/env.js'; import log from './log.js'; @@ -83,7 +85,7 @@ export const CONTRACT_ID = env.varOrDefault( 'CONTRACT_ID', 'bLAgYxAdX2Ry-nt6aH2ixgvJXbpsEYm28NgJgyqfs-U', ); -export const CHAIN_CACHE_TYPE = env.varOrDefault('CHAIN_CACHE_TYPE', 'lmdb'); +export const CHAIN_CACHE_TYPE = env.varOrDefault('CHAIN_CACHE_TYPE', 'fs'); export const REDIS_CACHE_URL = env.varOrDefault( 'REDIS_CACHE_URL', 'redis://localhost:6379', @@ -94,3 +96,16 @@ export const REDIS_CACHE_TTL_SECONDS = +env.varOrDefault( ); export const ENABLE_FS_HEADER_CACHE_CLEANUP = env.varOrDefault('ENABLE_FS_HEADER_CACHE_CLEANUP', 'false') === 'true'; + +export const WEBHOOK_URL = env.varOrUndefined('WEBHOOK_URL'); +export const PING_ID = env.varOrUndefined('PING_ID'); +export const PDUTY_ROUTING_KEY = env.varOrUndefined('PDUTY_ROUTING_KEY'); +export const NODE_NAME = env.varOrDefault('NODE_NAME', hostname()); +export const FS_FREE_SPACE_MIN = +env.varOrDefault( + 'FS_FREE_SPACE_MIN', + '200000000000', // 200GB +); +export const HIGH_LOAD_CRITICAL = + env.varOrDefault('HIGH_LOAD_CRITICAL', 'true') === 'true'; + +export const MAX_TMP_FS_AGE = +env.varOrDefault('MAX_TMP_FS_AGE', `${ONE_DAY}`); diff --git a/src/constants.ts b/src/constants.ts new file mode 100644 index 0000000..e8cead6 --- /dev/null +++ b/src/constants.ts @@ -0,0 +1,2 @@ +export const ONE_HOUR = 60 * 60 * 1000; +export const ONE_DAY = 24 * ONE_HOUR; diff --git a/src/log.ts b/src/log.ts index 323b5c4..38463b5 100644 --- a/src/log.ts +++ b/src/log.ts @@ -19,7 +19,7 @@ import { createLogger, format, transports } from 'winston'; import * as env from './lib/env.js'; -const LOG_LEVEL = env.varOrDefault('LOG_LEVEL', 'info'); +const LOG_LEVEL = env.varOrDefault('LOG_LEVEL', 'debug'); const LOG_ALL_STACKTRACES = env.varOrDefault('LOG_ALL_STACKTRACES', 'false') === 'true'; const LOG_FORMAT = env.varOrDefault('LOG_FORMAT', 'simple'); diff --git a/src/system.ts b/src/system.ts index fb0a09b..fbe2728 100644 --- a/src/system.ts +++ b/src/system.ts @@ -17,6 +17,7 @@ */ import { default as Arweave } from 'arweave'; import EventEmitter from 'node:events'; +import { promises } from 'node:fs'; import { ArweaveCompositeClient } from './arweave/composite-client.js'; import * as config from './config.js'; @@ -28,8 +29,8 @@ import { TxChunksDataSource } from './data/tx-chunks-data-source.js'; import { StandaloneSqliteDatabase } from './database/standalone-sqlite.js'; import * as events from './events.js'; import { MatchTags } from './filters.js'; -import { UniformFailureSimulator } from './lib/chaos.js'; import { makeBlockStore, makeTxStore } from './init/header-stores.js'; +import { UniformFailureSimulator } from './lib/chaos.js'; import { currentUnixTimestamp } from './lib/time.js'; import log from './log.js'; import * as metrics from './metrics.js'; @@ -55,10 +56,10 @@ import { BlockImporter } from './workers/block-importer.js'; import { BundleRepairWorker } from './workers/bundle-repair-worker.js'; import { DataItemIndexer } from './workers/data-item-indexer.js'; import { FsCleanupWorker } from './workers/fs-cleanup-worker.js'; +import { DataPrefetcher } from './workers/prefetch-data.js'; import { TransactionFetcher } from './workers/transaction-fetcher.js'; import { TransactionImporter } from './workers/transaction-importer.js'; import { TransactionRepairWorker } from './workers/transaction-repair-worker.js'; -import { DataPrefetcher } from './workers/prefetch-data.js'; process.on('uncaughtException', (error) => { metrics.uncaughtExceptionCounter.inc(); @@ -127,6 +128,35 @@ export const headerFsCacheCleanupWorker = config.ENABLE_FS_HEADER_CACHE_CLEANUP }) : undefined; +// export const contiguousDataCacheDataCleanupWorker = new FsCleanupWorker({ +// log, +// basePath: 'data/contiguous/data', +// shouldDelete: async (path) => { +// // stat it +// const stat = await promises.stat(path); +// if (stat.birthtimeMs) { +// } +// }, +// }); + +export const contiguousDataCacheTmpCleanupWorker = new FsCleanupWorker({ + log, + basePath: 'data/contiguous/tmp', + shouldDelete: async (path) => { + const stat = await promises.stat(path); + return Date.now() - stat.mtimeMs > config.MAX_TMP_FS_AGE; + }, +}); + +export const generalTmpCleanupWorker = new FsCleanupWorker({ + log, + basePath: 'data/tmp', + shouldDelete: async (path) => { + const stat = await promises.stat(path); + return Date.now() - stat.mtimeMs > config.MAX_TMP_FS_AGE; + }, +}); + const ans104TxMatcher = new MatchTags([ { name: 'Bundle-Format', value: 'binary' }, { name: 'Bundle-Version', valueStartsWith: '2.' }, diff --git a/src/test.ts b/src/test.ts new file mode 100644 index 0000000..569a0da --- /dev/null +++ b/src/test.ts @@ -0,0 +1,5 @@ +import { contiguousDataCacheTmpCleanupWorker } from './system.js'; + +const batch = await contiguousDataCacheTmpCleanupWorker.getBatch('data', null); + +console.log(batch); diff --git a/src/utils/cron.ts b/src/utils/cron.ts new file mode 100644 index 0000000..a11f46f --- /dev/null +++ b/src/utils/cron.ts @@ -0,0 +1,30 @@ +import { CronJob } from 'cron'; + +import logger from '../log'; + +export function createCron( + name: string, + time: string, + fn: () => Promise, +): void { + let jobLocked = false; + try { + new CronJob( + time, + async function () { + if (!jobLocked) { + jobLocked = true; + await fn().catch((e) => + logger.error(`[CRON] Error occurred while doing ${name} - ${e}`), + ); + jobLocked = false; + } + }, + null, + true, + ); + } catch (e: any) { + logger.error(`[PROCESS] Error occurred while creating cron ${name} - ${e}`); + process.exit(1); + } +} diff --git a/src/utils/healthcheck.ts b/src/utils/healthcheck.ts new file mode 100644 index 0000000..e954d09 --- /dev/null +++ b/src/utils/healthcheck.ts @@ -0,0 +1,131 @@ +import checkDiskSpace from 'check-disk-space'; + +import { FS_FREE_SPACE_MIN } from '../config'; +import logger from '../log'; +import { checkResourceUtilisation } from './resourceUtil'; +import sendWebhookMessage from './webhook'; + +export async function runHealthCheck(): Promise { + logger.info('Running health check'); + + // const lastNotified = +(await redisClient.get(REDIS_LAST_NOTIFIED_KEY)); + // if (!isNaN(lastNotified) && Date.now() - lastNotified < 1.8e6) return; + + await checkFsFreeSpace(); + + await checkResourceUtilisation(); + // const lastSeeded = +(await redisClient.get(REDIS_LAST_SEEDED_KEY)); + // if (isNaN(lastSeeded)) return; + // const now = Date.now(); + + // if ( + // lastSeeded && + // now - lastSeeded > 1.44e7 && + // (await bundleQueue.getDelayedCount()) !== 0 + // ) { + // sendWebhookMessage({ + // name: 'No Seeding', + // value: 'Nothing has seeded for last 4 hours', + // ping: true, + // critical: true, + // }); + // } + + // const balance = new BigNumber( + // await arweave.wallets.getBalance(currencies.arweave.account.address), + // ); + // const arReserveBytes = AR_RESERVE_BYTES; + // const required = new BigNumber( + // await redisClient.get(REDIS_PRICE_KEY), + // ).multipliedBy(arReserveBytes); + // logger.verbose(`[runHealthCheck:balance] Current Balance: ${balance}`); + // const percentOfRequired = balance.dividedBy(required); + // if (balance.isLessThanOrEqualTo(required)) + // sendWebhookMessage({ + // name: 'Running Low on AR', + // value: `Account: ${currencies.arweave.account.address}\n Current: ${balance} - cost for ${arReserveBytes} Bytes - ${required}`, + // ping: true, + // critical: percentOfRequired.isLessThanOrEqualTo(0.7), + // }); + + // const completedJobs = await bundleQueue.getJobs(['completed'], 0, 1000, true); + // const averageAttempts = + // completedJobs.reduce((total, next) => total + next.attemptsMade, 0) / + // completedJobs.length; + + // if (averageAttempts > 10) + // sendWebhookMessage({ + // name: 'Average Bundlr Queue Attempts High', + // value: `Current Average Attempts : ${averageAttempts}`, + // ping: true, + // critical: false, + // }); + + // const executorQueueWaitingCount = await bundleExecutorQueue.getWaitingCount(); + // if (executorQueueWaitingCount >= 50) { + // sendWebhookMessage({ + // name: 'Bundle Executor queue waiting count high', + // value: `Current executor jobs waiting: ${executorQueueWaitingCount}`, + // ping: true, + // critical: executorQueueWaitingCount >= 100, + // }); + // } + + // checkOtherQueue(); + + // await redisClient.set(REDIS_LAST_NOTIFIED_KEY, Date.now().toString()); + return; +} + +// export async function devnetHealthCheck(): Promise { +// logger.info('[runHealthCheck] Running health check'); + +// const lastNotified = +(await redisClient.get(REDIS_LAST_NOTIFIED_KEY)); +// if (!isNaN(lastNotified) && Date.now() - lastNotified < 1.8e6) return; + +// await checkFsFreeSpace(); +// await checkS3Queue(); + +// await redisClient.set(REDIS_LAST_NOTIFIED_KEY, Date.now().toString()); +// return; +// } + +// export async function checkS3Queue(): Promise { +// const s3QueueWaitingCount = await s3Queue.getWaitingCount(); +// logger.verbose( +// `[checkS3Queue] Queue has ${s3QueueWaitingCount} waiting jobs.`, +// ); +// if (s3QueueWaitingCount > 1000) +// sendWebhookMessage({ +// name: 'S3 queue waiting count high', +// value: `Current S3 jobs waiting ${s3QueueWaitingCount}`, +// ping: true, +// critical: s3QueueWaitingCount >= 2000, +// }); +// } + +// export async function checkOtherQueue(): Promise { +// const otherQueueWaitingCount = await otherQueue.getWaitingCount(); +// if (otherQueueWaitingCount > 10_000) +// sendWebhookMessage({ +// name: 'Other queue waiting count high', +// value: `Current other queue jobs waiting ${otherQueueWaitingCount}`, +// ping: true, +// critical: otherQueueWaitingCount >= 50_000, +// }); +// } + +export async function checkFsFreeSpace(): Promise { + const free = await checkDiskSpace('/') + .then((r) => r.free) + .catch((e) => logger.error(`Error checking free space:\n ${e}`)); + const threshold = FS_FREE_SPACE_MIN; + if (typeof free === 'number' && free < threshold) { + sendWebhookMessage({ + name: 'Running low on free space', + value: `Free space (${free}) < threshold (${threshold})`, + ping: true, + critical: true, + }); + } +} diff --git a/src/utils/resourceUtil.ts b/src/utils/resourceUtil.ts new file mode 100644 index 0000000..2f790ec --- /dev/null +++ b/src/utils/resourceUtil.ts @@ -0,0 +1,123 @@ +import { spawn } from 'child_process'; +import { cpus, freemem, loadavg, totalmem } from 'os'; + +import { HIGH_LOAD_CRITICAL } from '../config'; +import sendWebhookMessage from './webhook'; + +export async function checkResourceUtilisation(): Promise { + const loadAvg = loadavg(); + const oneMinLoad = loadAvg[0]; + const cpuCount = cpus().length; + const loadAveragePer = Math.round((oneMinLoad / cpuCount) * 100); + + // text, if it's ping, if it's critical (page) + let comb = + loadAveragePer >= 200 + ? ['> 200', true, true] + : loadAveragePer >= 75 + ? ['> 75', true, false] + : loadAveragePer >= 50 + ? ['> 50', false, false] + : undefined; + if (comb) + sendWebhookMessage({ + name: 'Load Average High', + value: `Current 1m Load Avg ${comb[0]} - ${loadAveragePer}`, + info: [ + { name: '5m load avg', value: loadAvg[1].toString() }, + { name: '15m load avg', value: loadAvg[2].toString() }, + ], + ping: comb[1] as boolean, + critical: HIGH_LOAD_CRITICAL ? (comb[2] as boolean) : false, + }); + + let availableMemory; + if (process.platform === 'linux') { + const prc = spawn('free', []); + prc.stdout.setEncoding('utf8'); + availableMemory = + +(await new Promise((r) => { + prc.stdout.on('data', (d) => { + const lines = d.toString().split(/\n/g); + const stats = lines[1].split(/\s+/); + const avail = stats[6]; + r(avail as string); + }); + })) * 1024; + prc.kill(); + } else { + availableMemory = freemem(); + } + + const usagePercentage = Math.round((1 - availableMemory / totalmem()) * 100); + comb = + usagePercentage >= 90 + ? ['> 90', true, true] + : usagePercentage >= 75 + ? ['> 75', true, false] + : usagePercentage >= 50 + ? ['> 50', false, false] + : undefined; + if (comb) + sendWebhookMessage({ + name: 'Memory Usage High', + value: `Memory usage ${comb[0]}% - ${usagePercentage}%`, + ping: comb[1] as boolean, + critical: comb[2] as boolean, + }); + + // await checkRedisMemoryUtil(); +} + +// export async function checkRedisMemoryUtil(): Promise { +// try { +// const redis = createClient({ +// url: redisUrl, +// }); +// const info = await redisClient.info(); +// const splitInfo = info.split('\r\n'); +// const usedMem = +( +// splitInfo.find((v) => v.split(':')[0] === 'used_memory').split(':')[1] ?? +// 0 +// ); + +// const maxMemRaw = splitInfo.find((v) => v.split(':')[0] === 'maxmemory'); +// if (!maxMemRaw) throw new Error('Unable to find maxmemory'); +// let maxMem = REDIS_MAX_MEMORY +// ? +REDIS_MAX_MEMORY +// : +maxMemRaw.split(':')[1]; +// // maxMem = 0 means redis will use all of the system memory, so we check that +// if (maxMem === 0) { +// const systemMem = splitInfo.find( +// (v) => v.split(':')[0] === 'total_system_memory', +// ); +// if (!systemMem) throw new Error('Unable to find total_system_memory'); +// maxMem = +(systemMem.split(':')[1] ?? 0); +// } +// const usedPercent = (usedMem / maxMem) * 100; + +// const comb = +// usedPercent >= 90 +// ? ['> 90', true, true] +// : usedPercent >= 75 +// ? ['> 75', true, false] +// : usedPercent >= 50 +// ? ['> 50', false, false] +// : undefined; + +// if (comb) +// sendWebhookMessage({ +// name: 'Redis memory Usage High', +// value: `Memory usage ${comb[0]}% - ${usedPercent}%`, +// ping: comb[1] as boolean, +// critical: comb[2] as boolean, +// }); +// } catch (e: any) { +// await sendWebhookMessage({ +// name: 'Unable to check Redis memory usage', +// value: e?.stack ?? e?.message ?? e, +// ping: true, +// critical: true, +// }); +// } +// } diff --git a/src/utils/webhook.ts b/src/utils/webhook.ts new file mode 100644 index 0000000..6fc553d --- /dev/null +++ b/src/utils/webhook.ts @@ -0,0 +1,170 @@ +// eslint-disable-next-line header/header +import AsyncRetry from 'async-retry'; +import Axios from 'axios'; + +import { NODE_NAME, PDUTY_ROUTING_KEY, PING_ID, WEBHOOK_URL } from '../config'; +import logger from '../log'; + +export default async function sendWebhookMessage({ + name, + value = 'no value', + content, + username = NODE_NAME, + ping = false, + webhookUrl = WEBHOOK_URL, + pagerDutyRoutingKey = PDUTY_ROUTING_KEY, + pingId = PING_ID, + component = undefined, + critical = false, + info = [], + useDiscord = true, + usePagerDuty = true, +}: { + name: string; + value?: string; + content?: string; + username?: string; + critical?: boolean; + ping?: boolean; + webhookUrl?: string; + pagerDutyRoutingKey?: string; + pingId?: string; + component?: string; + info?: { name: string; value: string }[]; + useDiscord?: boolean; + usePagerDuty?: boolean; + race?: boolean; + raceTimeout?: number; +}): Promise { + try { + // TODO: disallow `any` types codebase-wide so this isn't required. + if (typeof value !== 'string') + value = (value as any)?.toString() ?? JSON.stringify(value); + if (typeof name !== 'string') + name = (name as any)?.toString() ?? JSON.stringify(name); + + if (name.length >= 256) { + value = name.slice(250) + value; + name = name.slice(0, 250) + '...'; + } + if (component && username.length + component.length < 32) + username = username + ':' + component; + + // max is 6k characters + if (value.length >= 6000) { + value = + value.slice(0, 5500) + + ' `Cut-off due to exceeding maximum character limit`'; + } + + let vl = value; + value = value.slice(0, 1020); + let i = 1; + + while ((vl = vl.slice(1020)).length > 0) { + info.push({ name: `Continuation ${i++}`, value: vl.slice(0, 1020) }); + } + + info.push({ name: 'Timestamp', value: new Date().toISOString() }); + const data = { + content: (content ?? '') + `${ping ? (pingId ? `${pingId}` : '') : ''}`, + username: username, + embeds: [ + { + color: 15548997, + fields: [{ name, value }, ...info], + timestamp: new Date().toISOString(), + }, + ], + }; + const severity = critical ? 'critical' : ping ? 'error' : 'warning'; + logger.error(`[webhook] ${name} - ${value}`); + if (usePagerDuty && pagerDutyRoutingKey) { + // let [{ name, value }, ...info] = fields; + const data = { + routing_key: pagerDutyRoutingKey, + payload: { + summary: name + ' ' + (value === 'no value' ? '' : value), + severity, + source: username, + component: component, + custom_details: { + value, + ...info.reduce((acc: any, { name, value }) => { + acc[name] = value; + return acc; + }, {}), + }, + }, + event_action: 'trigger', + }; + + await AsyncRetry( + () => + Axios.post('https://events.pagerduty.com/v2/enqueue', data, { + headers: { 'content-type': 'application/json' }, + }), + { + retries: 10, + onRetry: (_: any, a: number) => { + // Incrementally reduce properties in a bid to avoid any malformed payloads. + switch (a) { + case 4: + data.payload.custom_details = value; + break; + case 6: + data.payload.summary = name; + break; + } + }, + }, + ).catch((e: any) => { + logger.error( + `[webhook:pagerDuty] Unable to post error to pagerduty - ${e}`, + ); + logger.debug( + `[webhook:pagerDuty] request body: ${JSON.stringify(data)}`, + ); + if (webhookUrl) + Axios.post(webhookUrl, { + content: `${ + critical ? pingId ?? '' : '' + } Unable to post error to pagerduty\n${e + .toString() + .slice(0, 1000)}`, + username, + }).catch((e) => logger.error(`[webhook] ${e}`)); + }); + } + + logger.warn( + `[webhook] Notification ${webhookUrl ? '' : 'not '}posted to webhook`, + ); + + if (useDiscord && webhookUrl) + await AsyncRetry(() => Axios.post(webhookUrl, data), { + retries: 10, + onRetry: (_: any, a: number) => { + // Incrementally reduce properties in a bid to avoid any malformed payloads. + switch (a) { + case 4: + data.embeds = data.embeds.slice(0, 2); + break; + } + }, + }).catch((e: any) => { + logger.error(`[webhook] Unable to post error to webhook - ${e}`); + logger.debug(`[webhook] request body: ${JSON.stringify(data)}`); + Axios.post(webhookUrl, { + content: `${ + ping ? pingId ?? '' : '' + } Unable to post error to webhook`, + username, + }).catch((e) => logger.error(`[webhook] ${e}`)); + }); + } catch (err) { + logger.error( + `[webhook] Unable to post webhook with field ${name} ${value} - ${err}`, + ); + } +} diff --git a/src/workers/fs-cleanup-worker.ts b/src/workers/fs-cleanup-worker.ts index 769cfbe..1ab7a9d 100644 --- a/src/workers/fs-cleanup-worker.ts +++ b/src/workers/fs-cleanup-worker.ts @@ -15,7 +15,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -import fs from 'node:fs'; +import fs, { promises } from 'node:fs'; import path from 'node:path'; import * as winston from 'winston'; @@ -111,37 +111,72 @@ export class FsCleanupWorker { this.lastPath = batch[batch.length - 1]; } - async getBatch(basePath: string, lastPath: string | null): Promise { + public async getBatch( + basePath: string, + lastPath: string | null, + ): Promise { const batch: string[] = []; let totalFilesProcessed = 0; - const walk = async (dir: string) => { - const files = await fs.promises.readdir(dir, { withFileTypes: true }); - files.sort((a, b) => a.name.localeCompare(b.name)); - - for (const file of files) { - if (totalFilesProcessed >= this.batchSize) break; - - const fullPath = path.join(dir, file.name); - if ( - lastPath !== null && - (lastPath.startsWith(fullPath) || fullPath >= lastPath) && - file.isDirectory() - ) { - await walk(fullPath); - } else { - if (lastPath === null || fullPath > lastPath) { - if (await this.shouldDelete(fullPath)) { - batch.push(fullPath); - totalFilesProcessed++; - } - } - } + for await (const filePath of walk(basePath)) { + if (totalFilesProcessed >= this.batchSize) break; + const fullPath = path.resolve(filePath); + // if ( + // lastPath !== null && + // (lastPath.startsWith(fullPath) || fullPath >= lastPath) && + // file.isDirectory() + // ) { + // await walk(fullPath); + // } else { + // if (lastPath === null || fullPath > lastPath) { + if ( + (lastPath === null || fullPath > lastPath) && + (await this.shouldDelete(fullPath)) + ) { + batch.push(fullPath); + totalFilesProcessed++; } - }; - - await walk(basePath); + // } + // } + } + // const walk = async (dir: string) => { + // const files = await fs.promises.readdir(dir, { withFileTypes: true }); + // files.sort((a, b) => a.name.localeCompare(b.name)); + + // for (const file of files) { + // if (totalFilesProcessed >= this.batchSize) break; + + // const fullPath = path.join(dir, file.name); + // if ( + // lastPath !== null && + // (lastPath.startsWith(fullPath) || fullPath >= lastPath) && + // file.isDirectory() + // ) { + // await walk(fullPath); + // } else { + // if (lastPath === null || fullPath > lastPath) { + // if (await this.shouldDelete(fullPath)) { + // batch.push(fullPath); + // totalFilesProcessed++; + // } + // } + // } + // } + // }; + + // await walk(basePath); return batch; } } + +async function* walk( + dir: string, + recursive = true, +): AsyncGenerator { + for await (const d of await promises.opendir(dir)) { + const entry = path.join(dir, d.name); + if (recursive && d.isDirectory()) yield* await walk(entry); + else if (d.isFile()) yield entry; + } +} diff --git a/yarn.lock b/yarn.lock index d479c14..c8c9064 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1415,6 +1415,13 @@ resolved "https://registry.yarnpkg.com/@types/argparse/-/argparse-1.0.38.tgz#a81fd8606d481f873a3800c6ebae4f1d768a56a9" integrity sha512-ebDJ9b0e702Yr7pWgB0jzm+CX4Srzz8RcXtLJDJB+BSccqMa36uyH/zUsSYao5+BD1ytv3k3rPYCq4mAE1hsXA== +"@types/async-retry@^1.4.8": + version "1.4.8" + resolved "https://registry.yarnpkg.com/@types/async-retry/-/async-retry-1.4.8.tgz#eb32df13aceb9ba1a8a80e7fe518ff4e3fe46bb3" + integrity sha512-Qup/B5PWLe86yI5I3av6ePGaeQrIHNKCwbsQotD6aHQ6YkHsMUxVZkZsmx/Ry3VZQ6uysHwTjQ7666+k6UjVJA== + dependencies: + "@types/retry" "*" + "@types/better-sqlite3@^7.5.0": version "7.5.0" resolved "https://registry.npmjs.org/@types/better-sqlite3/-/better-sqlite3-7.5.0.tgz" @@ -1549,6 +1556,11 @@ resolved "https://registry.npmjs.org/@types/long/-/long-4.0.2.tgz" integrity sha512-MqTGEo5bj5t157U6fA/BiDynNkn0YknVdh48CMPkTSpFTVmvao5UQmm7uEF6xBEo7qIMAlY/JSleYaE6VOdpaA== +"@types/luxon@~3.3.0": + version "3.3.7" + resolved "https://registry.yarnpkg.com/@types/luxon/-/luxon-3.3.7.tgz#043d413b6492a012de47503907bdf3ec4f827933" + integrity sha512-gKc9P2d4g5uYwmy4s/MO/yOVPmvHyvzka1YH6i5dM03UrFofHSmgc0D0ymbDRStFWHusk6cwwF6nhLm/ckBbbQ== + "@types/mime@^1": version "1.3.2" resolved "https://registry.npmjs.org/@types/mime/-/mime-1.3.2.tgz" @@ -1617,6 +1629,11 @@ resolved "https://registry.npmjs.org/@types/range-parser/-/range-parser-1.2.4.tgz" integrity sha512-EEhsLsD6UsDM1yFhAvy0Cjr6VwmpMWqFBCb9w07wVugF7w9nfajxLuVmngTIpgS6svCnm6Vaw+MZhoDCKnOfsw== +"@types/retry@*": + version "0.12.5" + resolved "https://registry.yarnpkg.com/@types/retry/-/retry-0.12.5.tgz#f090ff4bd8d2e5b940ff270ab39fd5ca1834a07e" + integrity sha512-3xSjTp3v03X/lSQLkczaN9UIEwJMoMCA1+Nb5HfbJEQWogdeQIyVtTvxPXDQjZ5zws8rFQfVfRdz03ARihPJgw== + "@types/serve-static@*": version "1.13.10" resolved "https://registry.npmjs.org/@types/serve-static/-/serve-static-1.13.10.tgz" @@ -2200,7 +2217,7 @@ assertion-error@^1.1.0: resolved "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz" integrity sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw== -async-retry@^1.2.1: +async-retry@^1.2.1, async-retry@^1.3.3: version "1.3.3" resolved "https://registry.npmjs.org/async-retry/-/async-retry-1.3.3.tgz" integrity sha512-wfr/jstw9xNi/0teMHrRW7dsz3Lt5ARhYNZ2ewpadnhaIp5mbALhOAP+EAdsC7t4Z6wqsDVv9+W6gm1Dk9mEyw== @@ -2667,6 +2684,11 @@ chalk@^5.0.1, chalk@^5.2.0, chalk@^5.3.0: resolved "https://registry.yarnpkg.com/chalk/-/chalk-5.3.0.tgz#67c20a7ebef70e7f3970a01f90fa210cb6860385" integrity sha512-dLitG79d+GV1Nb/VYcCDFivJeK1hiukt9QjRNVOsUtTy1rR1YJsmpGGTZ3qJos+uw7WmWF4wUwBd9jxjocFC2w== +check-disk-space@^3.4.0: + version "3.4.0" + resolved "https://registry.yarnpkg.com/check-disk-space/-/check-disk-space-3.4.0.tgz#eb8e69eee7a378fd12e35281b8123a8b4c4a8ff7" + integrity sha512-drVkSqfwA+TvuEhFipiR1OC9boEGZL5RrWvVsOthdcvQNXyCCuKkEiTOTXZ7qxSf/GLwq4GvzfrQD/Wz325hgw== + check-error@^1.0.2: version "1.0.2" resolved "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz" @@ -2991,6 +3013,14 @@ create-require@^1.1.0: resolved "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz" integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ== +cron@^3.1.6: + version "3.1.6" + resolved "https://registry.yarnpkg.com/cron/-/cron-3.1.6.tgz#e7e1798a468e017c8d31459ecd7c2d088f97346c" + integrity sha512-cvFiQCeVzsA+QPM6fhjBtlKGij7tLLISnTSvFxVdnFGLdz+ZdXN37kNe0i2gefmdD17XuZA6n2uPVwzl4FxW/w== + dependencies: + "@types/luxon" "~3.3.0" + luxon "~3.4.0" + cross-env@^7.0.3: version "7.0.3" resolved "https://registry.yarnpkg.com/cross-env/-/cross-env-7.0.3.tgz#865264b29677dc015ba8418918965dd232fc54cf" @@ -5323,6 +5353,11 @@ lru-cache@^7.4.4, lru-cache@^7.5.1, lru-cache@^7.7.1: resolved "https://registry.yarnpkg.com/lru-cache/-/lru-cache-10.1.0.tgz#2098d41c2dc56500e6c88584aa656c84de7d0484" integrity sha512-/1clY/ui8CzjKFyjdvwPWJUYKiFVXG2I2cY0ssG7h4+hwk+XOIX7ZSG9Q7TW8TW3Kp3BUSqgFWBLgL4PJ+Blag== +luxon@~3.4.0: + version "3.4.4" + resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.4.4.tgz#cf20dc27dc532ba41a169c43fdcc0063601577af" + integrity sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA== + make-dir@^3.0.0: version "3.1.0" resolved "https://registry.npmjs.org/make-dir/-/make-dir-3.1.0.tgz" @@ -6493,10 +6528,10 @@ prettier-linter-helpers@^1.0.0: dependencies: fast-diff "^1.1.2" -prettier@^3.1.1: - version "3.1.1" - resolved "https://registry.yarnpkg.com/prettier/-/prettier-3.1.1.tgz#6ba9f23165d690b6cbdaa88cb0807278f7019848" - integrity sha512-22UbSzg8luF4UuZtzgiUOfcGM8s4tjBv6dJRT7j275NXsy2jb4aJa4NNveul5x4eqlF1wuhuR2RElK71RvmVaw== +prettier@^2.8.8: + version "2.8.8" + resolved "https://registry.yarnpkg.com/prettier/-/prettier-2.8.8.tgz#e8c5d7e98a4305ffe3de2e1fc4aca1a71c28b1da" + integrity sha512-tdN8qQGvNjw4CHbY+XXk0JgCXn9QiF21a55rBe5LJAU+kDyC4WQn4+awm2Xfk2lQMk5fKup9XgzTZtGkjBdP9Q== proc-log@^3.0.0: version "3.0.0"