Skip to content
This repository has been archived by the owner on Sep 16, 2024. It is now read-only.

feat(s3): backup cache to s3 on an interval #108

Merged
merged 4 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"clean": "rimraf [ node_modules dist cache ]",
"test:docker": "docker compose --env-file=.env.test up --exit-code-from test-runner --build",
"test:integration": "mocha --spec=tests/integration/**.test.ts",
"test:integration:local": "docker compose up arlocal -d ; yarn test:integration $* ; docker compose down -v",
"test:integration:local": "docker compose up arlocal -d ; yarn test:integration $*; docker compose down -v",
"docker:run": "docker compose up arns-service --build",
"docker:integration": "docker compose up --exit-code-from test-runner --build",
"format:fix": "yarn prettier --write .",
Expand All @@ -31,6 +31,7 @@
"dependencies": {
"@ardrive/ardrive-promise-cache": "^1.1.4",
"@aws-sdk/client-s3": "^3.490.0",
"@aws-sdk/lib-storage": "^3.554.0",
"@koa/cors": "^4.0.0",
"@koa/router": "^12.0.0",
"arweave": "^1.14.4",
Expand Down
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { EvaluationOptions } from 'warp-contracts';

export const PREFETCH_CONTRACTS = process.env.PREFETCH_CONTRACTS === 'true';
export const BOOTSTRAP_CACHE = process.env.BOOTSTRAP_CACHE === 'true';
export const SAVE_CACHE_TO_S3 = process.env.SAVE_CACHE_TO_S3 === 'true';
export const BLOCKLISTED_CONTRACT_IDS = new Set(
process.env.BLOCKLISTED_CONTRACT_IDS
? process.env.BLOCKLISTED_CONTRACT_IDS.split(',')
Expand Down
113 changes: 107 additions & 6 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,26 @@ import {
ListObjectsV2Command,
GetObjectCommand,
} from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import fs from 'node:fs';
import { Readable } from 'stream';
import path from 'node:path';
import { BOOTSTRAP_CACHE, PREFETCH_CONTRACTS } from './constants';
import {
BOOTSTRAP_CACHE,
PREFETCH_CONTRACTS,
SAVE_CACHE_TO_S3,
} from './constants';
import pLimit from 'p-limit';

const bucket = process.env.WARP_CACHE_BUCKET || 'arns-warp-cache';
const cacheDirectory = process.env.WARP_CACHE_KEY || 'cache';
const region = process.env.AWS_REGION || 'us-west-2';
const s3CacheIntervalMs = +(
process.env.S3_CACHE_INTERVAL_MS || 3 * 60 * 60 * 1000
);
const s3 = new S3Client({
region,
});

export const bootstrapCache = async () => {
if (BOOTSTRAP_CACHE) {
Expand All @@ -37,6 +53,11 @@ export const bootstrapCache = async () => {
if (PREFETCH_CONTRACTS) {
await prefetchContracts();
}

if (SAVE_CACHE_TO_S3) {
// save the cache on a 3 hour interval
fedellen marked this conversation as resolved.
Show resolved Hide resolved
setInterval(saveCacheToS3, s3CacheIntervalMs);
}
};

let successfullyPrefetchedContracts = false;
Expand Down Expand Up @@ -108,12 +129,9 @@ export const prefetchContracts = async () => {

export const fetchCacheFromS3 = async () => {
const startTimeMs = Date.now();
const s3 = new S3Client({
region: process.env.AWS_REGION,
});
const params = {
Bucket: process.env.WARP_CACHE_BUCKET || 'arns-warp-cache',
Key: process.env.WARP_CACHE_KEY || 'cache',
Bucket: bucket,
Key: cacheDirectory,
};

logger.info('Bootstrapping warp cache from S3', {
Expand Down Expand Up @@ -173,3 +191,86 @@ export const fetchCacheFromS3 = async () => {
});
}
};

export const saveCacheToS3 = async () => {
const startTimeMs = Date.now();

logger.info('Saving warp cache to S3', {
bucket,
});

try {
// read files from local file system
const parallelLimit = pLimit(10);
const uploadFolder = async ({
folderPath,
bucket,
keyPrefix,
}: {
folderPath: string;
bucket: string;
keyPrefix: string;
}) => {
const files = fs.readdirSync(folderPath);
await Promise.all(
files.map(async (file) => {
// wrap in a pLimit to avoid resource exhaustion
return parallelLimit(() => {
const filePath = path.join(folderPath, file);
if (fs.statSync(filePath).isFile()) {
logger.debug('Uploading file to S3', {
filePath,
bucket,
keyPrefix,
});
const fileStream = fs.createReadStream(filePath);
const key = path.basename(filePath);
const upload = new Upload({
client: s3,
params: {
Bucket: bucket,
Key: key,
Body: fileStream,
},
});

// catch errors for a single file
return upload.done().catch((error: unknown) => {
const message =
error instanceof Error ? error : new Error('Unknown error');
logger.error('Failed to upload file to S3', {
error: message,
file,
});
});
} else {
// recursively upload folders
return uploadFolder({
folderPath,
bucket,
keyPrefix: keyPrefix + file + '/',
});
}
});
}),
);
};

// upload files to S3 recursively and in a pLimit to avoid resource exhaustion
await uploadFolder({
folderPath: cacheDirectory,
bucket,
keyPrefix: '',
});
fedellen marked this conversation as resolved.
Show resolved Hide resolved

logger.info('Successfully saved warp cache to S3', {
durationMs: Date.now() - startTimeMs,
bucket,
});
} catch (error: unknown) {
const message = error instanceof Error ? error : new Error('Unknown error');
logger.error('Failed to save cache to S3', {
error: message,
});
}
};
Loading
Loading