Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List datasets + files from core + staging sources #700

Closed
wants to merge 13 commits into from
11 changes: 10 additions & 1 deletion src/sources/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ class CoreSource extends Source {
async baseUrl() { return "https://nextstrain-data.s3.amazonaws.com/"; }
get repo() { return "nextstrain/narratives"; }
get branch() { return "master"; }
get inventoryLocation() { return {
bucket: 'nextstrain-inventories',
prefix: 'nextstrain-data/config-v1/',
}}

// eslint-disable-next-line no-unused-vars
async urlFor(path, method = 'GET', headers = {}) {
Expand All @@ -38,7 +42,12 @@ class CoreSource extends Source {

async collectResources() {
if (!this._allResources) this._allResources = new Map();
const s3objects = await parseInventory();
const s3objects = await parseInventory({...this.inventoryLocation, name: this.name});
if (!s3objects.length) {
// potentially due to a (transient) failure to parse the inventory -- so we want
// to keep any previously set resources
return;
}
const datasets = new Map(), files = new Map();
s3objects.forEach((object) => {
const [name, resourceType] = CoreCollectedResources.objectName(object);
Expand Down
113 changes: 92 additions & 21 deletions src/utils/inventories.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,83 @@ import * as fs from 'node:fs/promises';
import neatCsv from 'neat-csv';
import zlib from 'zlib';
import { promisify } from 'util';
import AWS from 'aws-sdk';
import * as utils from '../utils/index.js';

const gunzip = promisify(zlib.gunzip)

/**
* Currently this reads files from disk for development purposes, but a future commit
* will change this to reading from S3
* Fetches and reads the latest inventory from the provided bucket/prefix via:
* - finds the most recent manifest.json via comparison of timestamps in keys
* - uses this manifest.json to get the schema + key of the actual inventory
* - gets the actual inventory & returns the data as an object[] with keys from the schema
*
* Note that we only read a maximum of 999 keys from the provided bucket+prefix. A typical inventory
* update adds ~4 keys, so this should allow for ~8 months of inventories. The bucket where inventories
* are stored should use lifecycles to expire objects.
*
* If `process.env.LOCAL_INVENTORY` is set then we instead read the following files:
* - `./devData/${name}.manifest.json`
* - `./devData/${name}.inventory.csv.gz`
*
* @returns {object[]} list of entries in the inventory, using the schema to define keys
*/
const fetchInventory = async () => {
// list the provided (TODO) s3 prefix (can assume it's not paginated since we will use lifecycle policies)
// Find the latest directory (via simple sorting) and download the manifest.json from there
const manifestHandle = fs.open('./devData/core.manifest.json', 'r');
const manifest = JSON.parse(
await manifestHandle.then((handle) => handle.readFile())
)
manifestHandle.then((handle) => handle.close()) // P.S. close() is a promise
const schema = manifest.fileSchema.split(",").map((f) => f.trim());
// the manifest contains the s3 object key for the inventory to fetch (a .csv.gz file)
const inventoryHandle = fs.open('./devData/core.inventory.csv.gz', 'r');
const inventory = await inventoryHandle
.then((handle) => handle.readFile())
.then((buffer) => gunzip(buffer))
.then((data) => neatCsv(data, schema))
inventoryHandle.then((handle) => handle.close()) // P.S. close() is a promise
const fetchInventory = async ({bucket, prefix, name}) => {

if (process.env.LOCAL_INVENTORY) {
const manifestPath = `./devData/${name}.manifest.json`;
const inventoryPath = `./devData/${name}.inventory.csv.gz`;
utils.verbose(`inventory for ${name} -- reading S3 inventories from ${manifestPath} and ${inventoryPath}`);
const manifestHandle = fs.open(manifestPath, 'r');
const manifest = JSON.parse(
await manifestHandle.then((handle) => handle.readFile())
)
manifestHandle.then((handle) => handle.close()) // P.S. close() is a promise
const schema = _parseManifest(manifest).schema;
const inventoryHandle = fs.open(inventoryPath, 'r');
const inventory = await inventoryHandle
.then((handle) => handle.readFile())
.then((buffer) => gunzip(buffer))
.then((data) => neatCsv(data, schema))
inventoryHandle.then((handle) => handle.close()) // P.S. close() is a promise
utils.verbose(`inventory for ${name} - read ${inventory.length} rows from the local file`)
return inventory;
}

const S3 = new AWS.S3();
const manifestKey = await new Promise((resolve, reject) => {
S3.listObjectsV2({Bucket: bucket, Prefix: prefix, MaxKeys: 999}, (err, data) => {
if (err) reject(err);
const orderedKeys = data.Contents
.map((object) => object.Key)
.filter((key) => key.endsWith('/manifest.json'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to explicitly match and filter on the whole key, e.g. (psuedo-code, needs regexp metachar escaping)

.filter(key => key.match(new RegExp(`${prefix}/\d{4}-\d{2}-\d{2}T\d{2}-\d{2}Z/manifest[.]json$`)))

or the equivalent behaviour via another approach.

Otherwise it's easy for this to start mismatching unexpectedly in the future.

.sort() // keys are identical except for a YYYY-MM-DDTHH-MMZ timestamp within the key itself
.reverse(); // now sorted most recent object first
if (orderedKeys.length===0) reject("No valid inventory manifest.json found")
resolve(orderedKeys[0])
});
});
utils.verbose(`inventory for ${name} - manifest key: ${manifestKey}`)

const {schema, inventoryKey} = await new Promise((resolve, reject) => {
S3.getObject({Bucket: bucket, Key: manifestKey}, (err, data) => {
if (err) reject(err);
/* NOTE - the Body property of the response is a Buffer in S3 SDK v2 (which we use)
but this has changed to a readable stream in SDK v3 */
Comment on lines +66 to +67
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We previously used v3 in other parts of the codebase, but that's (IIRC) since been removed with the wrap up of the groups migration. I think we should be using v3 in new code (which is why I had done so previously for the groups migration tooling).

resolve(_parseManifest(JSON.parse(data.Body.toString('utf-8'))));
})
});
utils.verbose(`inventory for ${name} - parsed manifest JSON`)

const inventory = await (new Promise((resolve, reject) => {
S3.getObject({Bucket: bucket, Key: inventoryKey}, (err, data) => {
if (err) reject(err);
resolve(data.Body);
})
})).then((buffer) => gunzip(buffer))
.then((data) => neatCsv(data, schema));

utils.verbose(`inventory for ${name} - fetched ${inventory.length} rows`)
return inventory;
}

Expand All @@ -36,9 +88,15 @@ const fetchInventory = async () => {
* delete markers and to change the last modified timestamp into a datestamp
* @returns {object[]}
*/
const parseInventory = async () => {
const parseInventory = async ({bucket, prefix, name}) => {
const deletedKeys = new Set();
let objects = await fetchInventory();
let objects;
try {
objects = await fetchInventory({bucket, prefix, name});
} catch (e) {
utils.warn(`Error while fetching s3 inventory for ${name}: ${e.message}`)
return [];
}
objects = objects.filter((item) => {
if (item.IsDeleteMarker === "true") {
deletedKeys.add(item.Key);
Expand Down Expand Up @@ -123,4 +181,17 @@ const coreBucketKeyMunger = (object) => {
export {
parseInventory,
coreBucketKeyMunger,
}
}

/**
* Parses a S3 inventory manifest JSON file
* @param {object} manifest
* @returns {object} object.schema = string[]
* object.inventoryKey = string
*/
function _parseManifest(manifest) {
return {
schema: manifest.fileSchema.split(",").map((f) => f.trim()),
inventoryKey: manifest.files[0].key
}
}