Skip to content

Commit

Permalink
Data versioning
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Aug 7, 2023
1 parent 103e148 commit 6e7eb2d
Show file tree
Hide file tree
Showing 23 changed files with 433 additions and 244 deletions.
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ repos:
# pre-commit's default_language_version, see
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.9
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,13 @@ test = [
dev = [
"bandit",
"black==22.12.0",
"isort",
"pre-commit",
"pylint",
"pycodestyle"
]

[tool.isort]
profile = "black"
src_paths = ["src", "tests"]
skip_glob = [".aws_sam"]
1 change: 0 additions & 1 deletion scripts/credential_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import sys

import boto3

from requests.auth import _basic_auth_str


Expand Down
1 change: 0 additions & 1 deletion scripts/cumulus_upload_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import argparse
import os
import sys

from pathlib import Path

import boto3
Expand Down
90 changes: 90 additions & 0 deletions scripts/migrate_versioning.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
""" Utility for adding versioning to an existing aggregator data store
This is a one time thing for us, so the CLI/Boto creds are not robust.
"""
import argparse
import boto3
import io
import json

from rich import progress

UPLOAD_ROOT_BUCKETS = ["archive", "error", "last_valid", "latest", "site_upload"]


def _get_s3_data(key: str, bucket_name: str, client) -> dict:
"""Convenience class for retrieving a dict from S3"""
try:
bytes_buffer = io.BytesIO()
client.download_fileobj(Bucket=bucket_name, Key=key, Fileobj=bytes_buffer)
return json.loads(bytes_buffer.getvalue().decode())
except Exception: # pylint: disable=broad-except
return {}


def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None:
"""Convenience class for writing a dict to S3"""
b_data = io.BytesIO(json.dumps(data).encode())
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data)


def _get_depth(d):
if isinstance(d, dict):
return 1 + (max(map(_get_depth, d.values())) if d else 0)
return 0


def migrate_bucket_versioning(bucket: str):
client = boto3.client("s3")
res = client.list_objects_v2(Bucket=bucket)
contents = res["Contents"]
moved_files = 0
for s3_file in contents:
if s3_file["Key"].split("/")[0] in UPLOAD_ROOT_BUCKETS:
key = s3_file["Key"]
key_array = key.split("/")
if len(key_array) == 5:
key_array.insert(4, "000")
new_key = "/".join(key_array)
client.copy({"Bucket": bucket, "Key": key}, bucket, new_key)
client.delete_object(Bucket=bucket, Key=key)
moved_files += 1
print(f"Moved {moved_files} uploads")

study_periods = _get_s3_data("metadata/study_periods.json", bucket, client)

if _get_depth(study_periods) == 3:
new_sp = {}
for site in study_periods:
new_sp[site] = {}
for study in study_periods[site]:
new_sp[site][study] = {}
new_sp[site][study]["000"] = study_periods[site][study]
_put_s3_data("metadata/study_periods.json", bucket, client, new_sp)
print("study_periods.json updated")
else:
print("study_periods.json does not need update")

transactions = _get_s3_data("metadata/transactions.json", bucket, client)
if _get_depth(transactions) == 4:
new_t = {}
for site in transactions:
new_t[site] = {}
for study in transactions[site]:
new_t[site][study] = {}
for dp in transactions[site][study]:
new_t[site][study][dp] = {}
new_t[site][study][dp]["000"] = transactions[site][study][dp]
_put_s3_data("metadata/transactions.json", bucket, client, new_t)
print("transactions.json updated")
else:
print("transactions.json does not need update")


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Util for migrating aggregator data"""
)
parser.add_argument("-b", "--bucket", help="bucket name")
args = parser.parse_args()
migrate_bucket_versioning(args.bucket)
3 changes: 1 addition & 2 deletions src/handlers/dashboard/get_chart_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
This is intended to provide an implementation of the logic described in docs/api.md
"""
import os

from typing import List, Dict
from typing import Dict, List

import awswrangler
import boto3
Expand Down
20 changes: 11 additions & 9 deletions src/handlers/shared/functions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
""" Functions used across different lambdas"""
import io
import logging
import json

from typing import Dict, Optional
import logging
from datetime import datetime, timezone
from typing import Dict, Optional

import boto3

Expand Down Expand Up @@ -75,6 +74,7 @@ def update_metadata(
site: str,
study: str,
data_package: str,
version: str,
target: str,
dt: Optional[datetime] = None,
meta_type: str = JsonFilename.TRANSACTIONS.value,
Expand All @@ -84,18 +84,20 @@ def update_metadata(
if meta_type == JsonFilename.TRANSACTIONS.value:
site_metadata = metadata.setdefault(site, {})
study_metadata = site_metadata.setdefault(study, {})
data_package_metadata = study_metadata.setdefault(
data_package, TRANSACTION_METADATA_TEMPLATE
data_package_metadata = study_metadata.setdefault(data_package, {})
version_metadata = data_package_metadata.setdefault(
version, TRANSACTION_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
data_package_metadata[target] = dt.isoformat()
version_metadata[target] = dt.isoformat()
elif meta_type == JsonFilename.STUDY_PERIODS.value:
site_metadata = metadata.setdefault(site, {})
study_period_metadata = site_metadata.setdefault(
study, STUDY_PERIOD_METADATA_TEMPLATE
study_period_metadata = site_metadata.setdefault(study, {})
version_metadata = study_period_metadata.setdefault(
version, STUDY_PERIOD_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
study_period_metadata[target] = dt.isoformat()
version_metadata[target] = dt.isoformat()
return metadata


Expand Down
2 changes: 1 addition & 1 deletion src/handlers/site_upload/fetch_upload_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ def upload_url_handler(event, context):
res = create_presigned_post(
os.environ.get("BUCKET_NAME"),
f"{BucketPath.UPLOAD.value}/{body['study']}/{body['data_package']}/"
f"{metadata_db[user]['path']}/{body['filename']}",
f"{body['version']}/{metadata_db[user]['path']}/{body['filename']}",
)
return res
16 changes: 6 additions & 10 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
import logging
import os
import traceback

from datetime import datetime, timezone

import awswrangler
import boto3
import pandas

from numpy import nan
from pandas.core.indexes.range import RangeIndex

from src.handlers.shared.awswrangler_functions import get_s3_data_package_list
from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.enums import BucketPath
from src.handlers.shared.awswrangler_functions import get_s3_data_package_list
from src.handlers.shared.functions import (
get_s3_site_filename_suffix,
http_response,
Expand Down Expand Up @@ -47,7 +45,7 @@ def __init__(self, event):
self.site = s3_key_array[3]
self.study = s3_key_array[1]
self.data_package = s3_key_array[2]

self.version = s3_key_array[4]
self.metadata = read_metadata(self.s3_client, self.s3_bucket_name)

# S3 Filesystem operations
Expand Down Expand Up @@ -78,7 +76,7 @@ def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None
"""writes dataframe as parquet to s3 and sends an SNS notification if new"""
parquet_aggregate_path = (
f"s3://{self.s3_bucket_name}/{BucketPath.AGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}__{self.data_package}__aggregate.parquet"
)
awswrangler.s3.to_parquet(df, parquet_aggregate_path, index=False)
Expand All @@ -92,7 +90,7 @@ def write_csv(self, df: pandas.DataFrame) -> None:
"""writes dataframe as csv to s3"""
csv_aggregate_path = (
f"s3://{self.s3_bucket_name}/{BucketPath.CSVAGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}__{self.data_package}__aggregate.csv"
)
df = df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace(
Expand All @@ -109,7 +107,7 @@ def update_local_metadata(self, key, site=None):
if site is None:
site = self.site
self.metadata = update_metadata(
self.metadata, site, self.study, self.data_package, key
self.metadata, site, self.study, self.data_package, self.version, key
)

def write_local_metadata(self):
Expand Down Expand Up @@ -162,9 +160,7 @@ def expand_and_concat_sets(
site_df["site"] = get_static_string_series(None, site_df.index)
df_copy["site"] = get_static_string_series(site_name, df_copy.index)

# TODO: we should introduce some kind of data versioning check to see if datasets
# are generated from the same vintage. This naive approach will cause a decent
# amount of data churn we'll have to manage in the interim.
# Did we change the schema without updating the version?
if df.empty is False and set(site_df.columns) != set(df.columns):
raise MergeError(
"Uploaded data has a different schema than last aggregate",
Expand Down
13 changes: 11 additions & 2 deletions src/handlers/site_upload/process_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> N
study = path_params[1]
data_package = path_params[2]
site = path_params[3]
version = path_params[4]
if s3_key.endswith(".parquet"):
if "_meta_" in s3_key:
if "__meta_" in s3_key:
new_key = f"{BucketPath.STUDY_META.value}/{s3_key.split('/', 1)[-1]}"
topic_sns_arn = os.environ.get("TOPIC_PROCESS_STUDY_META_ARN")
sns_subject = "Process study medata upload event"
Expand All @@ -43,6 +44,7 @@ def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> N
site,
study,
data_package,
version,
"last_upload",
last_uploaded_date,
)
Expand All @@ -56,11 +58,18 @@ def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> N
site,
study,
data_package,
version,
"last_upload",
last_uploaded_date,
)
metadata = update_metadata(
metadata, site, study, data_package, "last_error", last_uploaded_date
metadata,
site,
study,
data_package,
version,
"last_error",
last_uploaded_date,
)
write_metadata(s3_client, s3_bucket_name, metadata)
raise UnexpectedFileTypeError
Expand Down
21 changes: 12 additions & 9 deletions src/handlers/site_upload/study_period.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
""" Lambda for updating date ranges associated with studies """

import os

from datetime import datetime, timezone

import awswrangler
import boto3

from src.handlers.shared.awswrangler_functions import get_s3_study_meta_list
from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.enums import BucketPath, JsonFilename
from src.handlers.shared.awswrangler_functions import get_s3_study_meta_list
from src.handlers.shared.functions import (
http_response,
read_metadata,
Expand All @@ -18,7 +17,7 @@
)


def update_study_period(s3_client, s3_bucket, site, study, data_package):
def update_study_period(s3_client, s3_bucket, site, study, data_package, version):
"""gets earliest/latest date from study metadata files"""
path = get_s3_study_meta_list(
BucketPath.STUDY_META.value, s3_bucket, study, data_package, site
Expand All @@ -29,11 +28,13 @@ def update_study_period(s3_client, s3_bucket, site, study, data_package):
study_meta = read_metadata(
s3_client, s3_bucket, meta_type=JsonFilename.STUDY_PERIODS.value
)

study_meta = update_metadata(
study_meta,
site,
study,
data_package,
version,
"earliest_date",
df["min_date"][0],
meta_type=JsonFilename.STUDY_PERIODS.value,
Expand All @@ -43,6 +44,7 @@ def update_study_period(s3_client, s3_bucket, site, study, data_package):
site,
study,
data_package,
version,
"latest_date",
df["max_date"][0],
meta_type=JsonFilename.STUDY_PERIODS.value,
Expand All @@ -52,6 +54,7 @@ def update_study_period(s3_client, s3_bucket, site, study, data_package):
site,
study,
data_package,
version,
"last_data_update",
datetime.now(timezone.utc),
meta_type=JsonFilename.STUDY_PERIODS.value,
Expand All @@ -68,11 +71,11 @@ def study_period_handler(event, context):
s3_bucket = os.environ.get("BUCKET_NAME")
s3_client = boto3.client("s3")
s3_key = event["Records"][0]["Sns"]["Message"]
s3_key_array = s3_key.split("/")
site = s3_key_array[3]
study = s3_key_array[1]
data_package = s3_key_array[2]

update_study_period(s3_client, s3_bucket, site, study, data_package)
path_params = s3_key.split("/")
study = path_params[1]
data_package = path_params[2]
site = path_params[3]
version = path_params[4]
update_study_period(s3_client, s3_bucket, site, study, data_package, version)
res = http_response(200, "Study period update successful")
return res
Loading

0 comments on commit 6e7eb2d

Please sign in to comment.