Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data versioning support #99

Merged
merged 9 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ repos:
# pre-commit's default_language_version, see
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.9
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[project]
name = "aggregator"
requires-python = ">= 3.9"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little unrelated, but since isort pulled up a couple instances of typing reorgs, i pinned this version and removed all Dict/List typing in favor of 3.9+ dict/list.

version = "0.1.3"
# This project is designed to run on the AWS serverless application framework (SAM).
# The project dependencies are handled via AWS layers. These are only required for
Expand Down Expand Up @@ -44,7 +45,13 @@ test = [
dev = [
"bandit",
"black==22.12.0",
"isort==5.12.0",
"pre-commit",
"pylint",
"pycodestyle"
]

[tool.isort]
profile = "black"
src_paths = ["src", "tests"]
skip_glob = [".aws_sam"]
1 change: 0 additions & 1 deletion scripts/credential_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import sys

import boto3

from requests.auth import _basic_auth_str


Expand Down
1 change: 0 additions & 1 deletion scripts/cumulus_upload_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import argparse
import os
import sys

from pathlib import Path

import boto3
Expand Down
102 changes: 102 additions & 0 deletions scripts/migrate_versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env python3
""" Utility for adding versioning to an existing aggregator data store

This is a one time thing for us, so the CLI/Boto creds are not robust.
"""
import argparse
import io
import json

import boto3

UPLOAD_ROOT_BUCKETS = [
"archive",
"error",
"last_valid",
"latest",
"site_upload",
"study_metadata",
]


def _get_s3_data(key: str, bucket_name: str, client) -> dict:
"""Convenience class for retrieving a dict from S3"""
try:
bytes_buffer = io.BytesIO()
client.download_fileobj(Bucket=bucket_name, Key=key, Fileobj=bytes_buffer)
return json.loads(bytes_buffer.getvalue().decode())
except Exception: # pylint: disable=broad-except
return {}


def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None:
"""Convenience class for writing a dict to S3"""
b_data = io.BytesIO(json.dumps(data).encode())
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data)


def _get_depth(d):
if isinstance(d, dict):
return 1 + (max(map(_get_depth, d.values())) if d else 0)
return 0


def migrate_bucket_versioning(bucket: str):
client = boto3.client("s3")
res = client.list_objects_v2(Bucket=bucket)
contents = res["Contents"]
moved_files = 0
for s3_file in contents:
if s3_file["Key"].split("/")[0] in UPLOAD_ROOT_BUCKETS:
key = s3_file["Key"]
key_array = key.split("/")
if len(key_array) == 5:
key_array.insert(4, "000")
new_key = "/".join(key_array)
client.copy({"Bucket": bucket, "Key": key}, bucket, new_key)
client.delete_object(Bucket=bucket, Key=key)
moved_files += 1
print(f"Moved {moved_files} uploads")
study_periods = _get_s3_data("metadata/study_periods.json", bucket, client)

if _get_depth(study_periods) == 3:
new_sp = {}
for site in study_periods:
new_sp[site] = {}
for study in study_periods[site]:
new_sp[site][study] = {}
new_sp[site][study]["000"] = study_periods[site][study]
new_sp[site][study]["000"].pop("version")
new_sp[site][study]["000"]["study_period_format_version"] = 2
# print(json.dumps(new_sp, indent=2))
_put_s3_data("metadata/study_periods.json", bucket, client, new_sp)
print("study_periods.json updated")
else:
print("study_periods.json does not need update")

transactions = _get_s3_data("metadata/transactions.json", bucket, client)
if _get_depth(transactions) == 4:
new_t = {}
for site in transactions:
new_t[site] = {}
for study in transactions[site]:
new_t[site][study] = {}
for dp in transactions[site][study]:
new_t[site][study][dp] = {}
new_t[site][study][dp]["000"] = transactions[site][study][dp]
new_t[site][study][dp]["000"].pop("version")
new_t[site][study][dp]["000"]["transacton_format_version"] = 2
# print(json.dumps(new_t, indent=2))
_put_s3_data("metadata/transactions.json", bucket, client, new_t)
print("transactions.json updated")
else:
print("transactions.json does not need update")


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Util for migrating aggregator data"""
)
parser.add_argument("-b", "--bucket", help="bucket name")
args = parser.parse_args()
migrate_bucket_versioning(args.bucket)
8 changes: 3 additions & 5 deletions src/handlers/dashboard/get_chart_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
"""
import os

from typing import List, Dict

import awswrangler
import boto3
import pandas
Expand All @@ -15,7 +13,7 @@
from src.handlers.shared.functions import http_response


def _get_table_cols(table_name: str) -> List:
def _get_table_cols(table_name: str) -> list:
"""Returns the columns associated with a table.

Since running an athena query takes a decent amount of time due to queueing
Expand All @@ -34,7 +32,7 @@ def _get_table_cols(table_name: str) -> List:
return next(s3_iter).decode().split(",")


def _build_query(query_params: Dict, filters: List, path_params: Dict) -> str:
def _build_query(query_params: dict, filters: list, path_params: dict) -> str:
"""Creates a query from the dashboard API spec"""
table = path_params["subscription_name"]
columns = _get_table_cols(table)
Expand Down Expand Up @@ -67,7 +65,7 @@ def _build_query(query_params: Dict, filters: List, path_params: Dict) -> str:
return query_str


def _format_payload(df: pandas.DataFrame, query_params: Dict, filters: List) -> Dict:
def _format_payload(df: pandas.DataFrame, query_params: dict, filters: list) -> dict:
"""Coerces query results into the return format defined by the dashboard"""
payload = {}
payload["column"] = query_params["column"]
Expand Down
15 changes: 10 additions & 5 deletions src/handlers/shared/awswrangler_functions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
""" functions specifically requiring AWSWranger, which requires a lambda layer"""
import awswrangler

from src.handlers.shared.enums import BucketPath


def get_s3_data_package_list(
bucket_root: str,
Expand All @@ -9,23 +11,26 @@ def get_s3_data_package_list(
data_package: str,
extension: str = "parquet",
):
"""Retrieves a list of data packages for a given S3 path post-upload proceesing"""
"""Retrieves a list of data packages for a given S3 path post-upload processing"""
return awswrangler.s3.list_objects(
path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}",
path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{study}__{data_package}/",
suffix=extension,
)


def get_s3_study_meta_list(
bucket_root: str,
s3_bucket_name: str,
study: str,
data_package: str,
site: str,
version: str,
extension: str = "parquet",
):
"""Retrieves a list of data packages for a given S3 path post-upload proceesing"""
"""Retrieves metadata associated with a given upload"""
return awswrangler.s3.list_objects(
path=f"s3://{s3_bucket_name}/{bucket_root}/{study}/{data_package}/{site}",
path=(
f"s3://{bucket_root}/{BucketPath.STUDY_META.value}/{study}/"
f"{study}__{data_package}/{site}/{version}"
),
suffix=extension,
)
39 changes: 23 additions & 16 deletions src/handlers/shared/functions.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
""" Functions used across different lambdas"""
import io
import logging
import json

from typing import Dict, Optional
import logging
from datetime import datetime, timezone
from typing import Optional
dogversioning marked this conversation as resolved.
Show resolved Hide resolved

import boto3

from src.handlers.shared.enums import BucketPath, JsonFilename

TRANSACTION_METADATA_TEMPLATE = {
"version": "1.0",
"transacton_format_version": "2",
"last_upload": None,
"last_data_update": None,
"last_aggregation": None,
"last_error": None,
"deleted": None,
}
STUDY_PERIOD_METADATA_TEMPLATE = {
"version": "1.0",
"study_period_format_version": "2",
"earliest_date": None,
"latest_date": None,
"last_data_update": None,
}


def http_response(status: int, body: str, allow_cors: bool = False) -> Dict:
def http_response(status: int, body: str, allow_cors: bool = False) -> dict:
"""Generates the payload AWS lambda expects as a return value"""
headers = {"Content-Type": "application/json"}
if allow_cors:
Expand Down Expand Up @@ -57,7 +56,7 @@ def check_meta_type(meta_type: str) -> None:

def read_metadata(
s3_client, s3_bucket_name: str, meta_type: str = JsonFilename.TRANSACTIONS.value
) -> Dict:
) -> dict:
"""Reads transaction information from an s3 bucket as a dictionary"""
check_meta_type(meta_type)
s3_path = f"{BucketPath.META.value}/{meta_type}.json"
Expand All @@ -71,38 +70,46 @@ def read_metadata(


def update_metadata(
metadata: Dict,
metadata: dict,
site: str,
study: str,
data_package: str,
version: str,
target: str,
dt: Optional[datetime] = None,
meta_type: str = JsonFilename.TRANSACTIONS.value,
):
"""Safely updates items in metadata dictionary"""
"""Safely updates items in metadata dictionary


It's assumed that, other than the version field itself, every item in one
of these metadata dicts is a datetime corresponding to an S3 event timestamp
"""
check_meta_type(meta_type)
if meta_type == JsonFilename.TRANSACTIONS.value:
site_metadata = metadata.setdefault(site, {})
study_metadata = site_metadata.setdefault(study, {})
data_package_metadata = study_metadata.setdefault(
data_package, TRANSACTION_METADATA_TEMPLATE
data_package_metadata = study_metadata.setdefault(data_package, {})
data_version_metadata = data_package_metadata.setdefault(
version, TRANSACTION_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
data_package_metadata[target] = dt.isoformat()
data_version_metadata[target] = dt.isoformat()
elif meta_type == JsonFilename.STUDY_PERIODS.value:
site_metadata = metadata.setdefault(site, {})
study_period_metadata = site_metadata.setdefault(
study, STUDY_PERIOD_METADATA_TEMPLATE
study_period_metadata = site_metadata.setdefault(study, {})
data_version_metadata = study_period_metadata.setdefault(
version, STUDY_PERIOD_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
study_period_metadata[target] = dt.isoformat()
data_version_metadata[target] = dt.isoformat()
return metadata


def write_metadata(
s3_client,
s3_bucket_name: str,
metadata: Dict,
metadata: dict,
meta_type: str = JsonFilename.TRANSACTIONS.value,
) -> None:
"""Writes transaction info from ∏a dictionary to an s3 bucket metadata location"""
Expand Down
14 changes: 13 additions & 1 deletion src/handlers/site_upload/fetch_upload_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,21 @@ def upload_url_handler(event, context):
)
user = event["requestContext"]["authorizer"]["principalId"]
body = json.loads(event["body"])
for key in ["study", "data_package", "filename"]:
if body[key] is None:
return http_response(
400,
"Malformed data payload. See "
"https://docs.smarthealthit.org/cumulus/library/sharing-data.html "
"for more information about uploading data.",
)
if "data_package_version" in body:
version = body["data_package_version"]
else:
version = "0"
res = create_presigned_post(
os.environ.get("BUCKET_NAME"),
f"{BucketPath.UPLOAD.value}/{body['study']}/{body['data_package']}/"
f"{metadata_db[user]['path']}/{body['filename']}",
f"{metadata_db[user]['path']}/{int(version):03d}/{body['filename']}",
)
return res
Loading