Skip to content
This repository has been archived by the owner on Sep 16, 2024. It is now read-only.

feat(s3): backup cache to s3 on an interval #108

Merged
merged 4 commits into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"clean": "rimraf [ node_modules dist cache ]",
"test:docker": "docker compose --env-file=.env.test up --exit-code-from test-runner --build",
"test:integration": "mocha --spec=tests/integration/**.test.ts",
"test:integration:local": "docker compose up arlocal -d ; yarn test:integration $* ; docker compose down -v",
"test:integration:local": "docker compose up arlocal -d ; yarn test:integration $*; docker compose down -v",
"docker:run": "docker compose up arns-service --build",
"docker:integration": "docker compose up --exit-code-from test-runner --build",
"format:fix": "yarn prettier --write .",
Expand All @@ -31,6 +31,7 @@
"dependencies": {
"@ardrive/ardrive-promise-cache": "^1.1.4",
"@aws-sdk/client-s3": "^3.490.0",
"@aws-sdk/lib-storage": "^3.554.0",
"@koa/cors": "^4.0.0",
"@koa/router": "^12.0.0",
"arweave": "^1.14.4",
Expand Down
1 change: 1 addition & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { EvaluationOptions } from 'warp-contracts';

export const PREFETCH_CONTRACTS = process.env.PREFETCH_CONTRACTS === 'true';
export const BOOTSTRAP_CACHE = process.env.BOOTSTRAP_CACHE === 'true';
export const SAVE_CACHE_TO_S3 = process.env.SAVE_CACHE_TO_S3 === 'true';
export const BLOCKLISTED_CONTRACT_IDS = new Set(
process.env.BLOCKLISTED_CONTRACT_IDS
? process.env.BLOCKLISTED_CONTRACT_IDS.split(',')
Expand Down
140 changes: 134 additions & 6 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,26 @@ import {
ListObjectsV2Command,
GetObjectCommand,
} from '@aws-sdk/client-s3';
import { Upload } from '@aws-sdk/lib-storage';
import fs from 'node:fs';
import { Readable } from 'stream';
import path from 'node:path';
import { BOOTSTRAP_CACHE, PREFETCH_CONTRACTS } from './constants';
import {
BOOTSTRAP_CACHE,
PREFETCH_CONTRACTS,
SAVE_CACHE_TO_S3,
} from './constants';
import pLimit from 'p-limit';

const bucket = process.env.WARP_CACHE_BUCKET || 'arns-warp-cache';
const cacheDirectory = process.env.WARP_CACHE_KEY || 'cache';
const region = process.env.AWS_REGION || 'us-west-2';
const s3CacheIntervalMs = +(
process.env.S3_CACHE_INTERVAL_MS || 3 * 60 * 60 * 1000
); // default to 3 hours
const s3 = new S3Client({
region,
});

export const bootstrapCache = async () => {
if (BOOTSTRAP_CACHE) {
Expand All @@ -37,6 +53,16 @@ export const bootstrapCache = async () => {
if (PREFETCH_CONTRACTS) {
await prefetchContracts();
}

if (SAVE_CACHE_TO_S3) {
// save cache to S3 every s3CacheIntervalMs
logger.info('Cache will be saved to S3 on an interval', {
intervalMs: s3CacheIntervalMs,
});
setInterval(async () => {
await saveCacheToS3();
}, s3CacheIntervalMs);
}
};

let successfullyPrefetchedContracts = false;
Expand Down Expand Up @@ -108,12 +134,9 @@ export const prefetchContracts = async () => {

export const fetchCacheFromS3 = async () => {
const startTimeMs = Date.now();
const s3 = new S3Client({
region: process.env.AWS_REGION,
});
const params = {
Bucket: process.env.WARP_CACHE_BUCKET || 'arns-warp-cache',
Key: process.env.WARP_CACHE_KEY || 'cache',
Bucket: bucket,
Key: cacheDirectory,
};

logger.info('Bootstrapping warp cache from S3', {
Expand Down Expand Up @@ -173,3 +196,108 @@ export const fetchCacheFromS3 = async () => {
});
}
};

export const saveCacheToS3 = async () => {
const startTimeMs = Date.now();

logger.info('Saving warp cache to S3', {
bucket,
});

try {
// read files from local file system
const parallelLimit = pLimit(10);
const uploadFolder = async ({
folderPath,
bucket,
keyPrefix,
}: {
folderPath: string;
bucket: string;
keyPrefix: string;
}) => {
const files = fs.readdirSync(folderPath);
logger.debug('Uploading folder to S3', {
folderPath,
bucket,
keyPrefix,
files: files.length,
});
await Promise.all(
files.map(async (file) => {
// wrap in a pLimit to avoid resource exhaustion
return parallelLimit(() => {
const filePath = path.join(folderPath, file);
if (fs.statSync(filePath).isFile()) {
logger.debug('Uploading file to S3', {
filePath,
bucket,
keyPrefix,
});
const fileStream = fs.createReadStream(filePath);
const upload = new Upload({
client: s3,
params: {
Bucket: bucket,
Key: filePath,
Body: fileStream,
},
});

const fileStartTime = Date.now();

// catch errors for a single file
return upload
.done()
.then(() => {
logger.debug('Successfully uploaded file to S3', {
filePath,
bucket,
keyPrefix,
durationMs: Date.now() - fileStartTime,
});
})
.catch((error: unknown) => {
const message =
error instanceof Error ? error : new Error('Unknown error');
logger.error('Failed to upload file to S3', {
error: message,
file,
});
});
} else {
logger.debug('Recursively uploading folder to S3', {
filePath,
bucket,
keyPrefix,
});
// recursively upload folders
return uploadFolder({
folderPath: filePath,
bucket,
keyPrefix: keyPrefix + file + '/',
});
}
});
}),
);
};

// upload files to S3 recursively and in a pLimit to avoid resource exhaustion
await uploadFolder({
folderPath: cacheDirectory,
bucket,
keyPrefix: '',
});
fedellen marked this conversation as resolved.
Show resolved Hide resolved

logger.info('Successfully saved warp cache to S3', {
durationMs: Date.now() - startTimeMs,
bucket,
});
} catch (error: unknown) {
const message = error instanceof Error ? error : new Error('Unknown error');
logger.error('Failed to save cache to S3', {
error: message,
});
}
};
Loading
Loading