-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
List datasets + files from core + staging sources #700
Closed
Closed
Changes from 1 commit
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
a3d863f
[drop] disable s3 listing on server start
jameshadfield 94c100e
API to list all available resources
jameshadfield 07c0796
Implement filtering for getAvailable API
jameshadfield d48d0e6
[drop] disable original table
jameshadfield c547a0c
Add (prototype) cards UI for resource collection
jameshadfield 02fe52f
Add loading spinner
jameshadfield eed3a2d
Update /pathogens page
jameshadfield f13a7cd
Add remote-inputs page listing core files
jameshadfield 99c2c83
Fetch latest inventory from S3
jameshadfield 8cb84fe
Add staging source inventory
jameshadfield 8940561
Recollect resources every ~24 hours
jameshadfield cac3780
[fixup] case sensitive filename
jameshadfield a00b8e1
Update dev server IAM policy…
victorlin File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,31 +2,83 @@ import * as fs from 'node:fs/promises'; | |
import neatCsv from 'neat-csv'; | ||
import zlib from 'zlib'; | ||
import { promisify } from 'util'; | ||
import AWS from 'aws-sdk'; | ||
import * as utils from '../utils/index.js'; | ||
|
||
const gunzip = promisify(zlib.gunzip) | ||
|
||
/** | ||
* Currently this reads files from disk for development purposes, but a future commit | ||
* will change this to reading from S3 | ||
* Fetches and reads the latest inventory from the provided bucket/prefix via: | ||
* - finds the most recent manifest.json via comparison of timestamps in keys | ||
* - uses this manifest.json to get the schema + key of the actual inventory | ||
* - gets the actual inventory & returns the data as an object[] with keys from the schema | ||
* | ||
* Note that we only read a maximum of 999 keys from the provided bucket+prefix. A typical inventory | ||
* update adds ~4 keys, so this should allow for ~8 months of inventories. The bucket where inventories | ||
* are stored should use lifecycles to expire objects. | ||
* | ||
* If `process.env.LOCAL_INVENTORY` is set then we instead read the following files: | ||
* - `./devData/${name}.manifest.json` | ||
* - `./devData/${name}.inventory.csv.gz` | ||
* | ||
* @returns {object[]} list of entries in the inventory, using the schema to define keys | ||
*/ | ||
const fetchInventory = async () => { | ||
// list the provided (TODO) s3 prefix (can assume it's not paginated since we will use lifecycle policies) | ||
// Find the latest directory (via simple sorting) and download the manifest.json from there | ||
const manifestHandle = fs.open('./devData/core.manifest.json', 'r'); | ||
const manifest = JSON.parse( | ||
await manifestHandle.then((handle) => handle.readFile()) | ||
) | ||
manifestHandle.then((handle) => handle.close()) // P.S. close() is a promise | ||
const schema = manifest.fileSchema.split(",").map((f) => f.trim()); | ||
// the manifest contains the s3 object key for the inventory to fetch (a .csv.gz file) | ||
const inventoryHandle = fs.open('./devData/core.inventory.csv.gz', 'r'); | ||
const inventory = await inventoryHandle | ||
.then((handle) => handle.readFile()) | ||
.then((buffer) => gunzip(buffer)) | ||
.then((data) => neatCsv(data, schema)) | ||
inventoryHandle.then((handle) => handle.close()) // P.S. close() is a promise | ||
const fetchInventory = async ({bucket, prefix, name}) => { | ||
|
||
if (process.env.LOCAL_INVENTORY) { | ||
const manifestPath = `./devData/${name}.manifest.json`; | ||
const inventoryPath = `./devData/${name}.inventory.csv.gz`; | ||
utils.verbose(`inventory for ${name} -- reading S3 inventories from ${manifestPath} and ${inventoryPath}`); | ||
const manifestHandle = fs.open(manifestPath, 'r'); | ||
const manifest = JSON.parse( | ||
await manifestHandle.then((handle) => handle.readFile()) | ||
) | ||
manifestHandle.then((handle) => handle.close()) // P.S. close() is a promise | ||
const schema = _parseManifest(manifest).schema; | ||
const inventoryHandle = fs.open(inventoryPath, 'r'); | ||
const inventory = await inventoryHandle | ||
.then((handle) => handle.readFile()) | ||
.then((buffer) => gunzip(buffer)) | ||
.then((data) => neatCsv(data, schema)) | ||
inventoryHandle.then((handle) => handle.close()) // P.S. close() is a promise | ||
utils.verbose(`inventory for ${name} - read ${inventory.length} rows from the local file`) | ||
return inventory; | ||
} | ||
|
||
const S3 = new AWS.S3(); | ||
const manifestKey = await new Promise((resolve, reject) => { | ||
S3.listObjectsV2({Bucket: bucket, Prefix: prefix, MaxKeys: 999}, (err, data) => { | ||
if (err) reject(err); | ||
const orderedKeys = data.Contents | ||
.map((object) => object.Key) | ||
.filter((key) => key.endsWith('/manifest.json')) | ||
.sort() // keys are identical except for a YYYY-MM-DDTHH-MMZ timestamp within the key itself | ||
.reverse(); // now sorted most recent object first | ||
if (orderedKeys.length===0) reject("No valid inventory manifest.json found") | ||
resolve(orderedKeys[0]) | ||
}); | ||
}); | ||
utils.verbose(`inventory for ${name} - manifest key: ${manifestKey}`) | ||
|
||
const {schema, inventoryKey} = await new Promise((resolve, reject) => { | ||
S3.getObject({Bucket: bucket, Key: manifestKey}, (err, data) => { | ||
if (err) reject(err); | ||
/* NOTE - the Body property of the response is a Buffer in S3 SDK v2 (which we use) | ||
but this has changed to a readable stream in SDK v3 */ | ||
Comment on lines
+66
to
+67
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We previously used v3 in other parts of the codebase, but that's (IIRC) since been removed with the wrap up of the groups migration. I think we should be using v3 in new code (which is why I had done so previously for the groups migration tooling). |
||
resolve(_parseManifest(JSON.parse(data.Body.toString('utf-8')))); | ||
}) | ||
}); | ||
utils.verbose(`inventory for ${name} - parsed manifest JSON`) | ||
|
||
const inventory = await (new Promise((resolve, reject) => { | ||
S3.getObject({Bucket: bucket, Key: inventoryKey}, (err, data) => { | ||
if (err) reject(err); | ||
resolve(data.Body); | ||
}) | ||
})).then((buffer) => gunzip(buffer)) | ||
.then((data) => neatCsv(data, schema)); | ||
|
||
utils.verbose(`inventory for ${name} - fetched ${inventory.length} rows`) | ||
return inventory; | ||
} | ||
|
||
|
@@ -36,9 +88,15 @@ const fetchInventory = async () => { | |
* delete markers and to change the last modified timestamp into a datestamp | ||
* @returns {object[]} | ||
*/ | ||
const parseInventory = async () => { | ||
const parseInventory = async ({bucket, prefix, name}) => { | ||
const deletedKeys = new Set(); | ||
let objects = await fetchInventory(); | ||
let objects; | ||
try { | ||
objects = await fetchInventory({bucket, prefix, name}); | ||
} catch (e) { | ||
utils.warn(`Error while fetching s3 inventory for ${name}: ${e.message}`) | ||
return []; | ||
} | ||
objects = objects.filter((item) => { | ||
if (item.IsDeleteMarker === "true") { | ||
deletedKeys.add(item.Key); | ||
|
@@ -123,4 +181,17 @@ const coreBucketKeyMunger = (object) => { | |
export { | ||
parseInventory, | ||
coreBucketKeyMunger, | ||
} | ||
} | ||
|
||
/** | ||
* Parses a S3 inventory manifest JSON file | ||
* @param {object} manifest | ||
* @returns {object} object.schema = string[] | ||
* object.inventoryKey = string | ||
*/ | ||
function _parseManifest(manifest) { | ||
return { | ||
schema: manifest.fileSchema.split(",").map((f) => f.trim()), | ||
inventoryKey: manifest.files[0].key | ||
} | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better to explicitly match and filter on the whole key, e.g. (psuedo-code, needs regexp metachar escaping)
or the equivalent behaviour via another approach.
Otherwise it's easy for this to start mismatching unexpectedly in the future.