Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data versioning support #99

Merged
merged 9 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,8 @@ repos:
# pre-commit's default_language_version, see
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.9
- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black", "--filter-files"]
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[project]
name = "aggregator"
requires-python = ">= 3.9"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a little unrelated, but since isort pulled up a couple instances of typing reorgs, i pinned this version and removed all Dict/List typing in favor of 3.9+ dict/list.

version = "0.1.3"
# This project is designed to run on the AWS serverless application framework (SAM).
# The project dependencies are handled via AWS layers. These are only required for
Expand Down Expand Up @@ -44,7 +45,13 @@ test = [
dev = [
"bandit",
"black==22.12.0",
"isort",
dogversioning marked this conversation as resolved.
Show resolved Hide resolved
"pre-commit",
"pylint",
"pycodestyle"
]

[tool.isort]
profile = "black"
src_paths = ["src", "tests"]
skip_glob = [".aws_sam"]
1 change: 0 additions & 1 deletion scripts/credential_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import sys

import boto3

from requests.auth import _basic_auth_str


Expand Down
1 change: 0 additions & 1 deletion scripts/cumulus_upload_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import argparse
import os
import sys

from pathlib import Path

import boto3
Expand Down
89 changes: 89 additions & 0 deletions scripts/migrate_versioning.py
Copy link
Contributor Author

@dogversioning dogversioning Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent with this file:

  • It should change all existing S3 files to add .../000/..., matching studies that don't have a version currently.
  • It should modify state tracking dicts to include the same as a nested key
  • It shouldn't try to modify files again if it's already been applied

After this is applied, I will either remove it in a followon commit or change it so that it is not runnable unless you :really: try, just in case we want an easy template for a future version of a similar operation. I could also be convinced to move this to a different folder - perhaps ./migrations?

I've already run this script on the dev bucket. If you want, I can reset it by copying over prod and you can give it a run yourself. The initial copy is slow - if this wasn't a one and done I'd put a progress bar on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, I don't need to run it. Sounds good - I can see some value as keeping it around, but you can just as easily add a line to some file here that's like Go grab scripts/migrate_versioning.py from revisions xyz to see an example of ... and not keep it current. Git lets us be ruthless 😄

Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
""" Utility for adding versioning to an existing aggregator data store

This is a one time thing for us, so the CLI/Boto creds are not robust.
"""
import argparse
import io
import json

import boto3

UPLOAD_ROOT_BUCKETS = ["archive", "error", "last_valid", "latest", "site_upload"]


def _get_s3_data(key: str, bucket_name: str, client) -> dict:
"""Convenience class for retrieving a dict from S3"""
try:
bytes_buffer = io.BytesIO()
client.download_fileobj(Bucket=bucket_name, Key=key, Fileobj=bytes_buffer)
return json.loads(bytes_buffer.getvalue().decode())
except Exception: # pylint: disable=broad-except
return {}


def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None:
"""Convenience class for writing a dict to S3"""
b_data = io.BytesIO(json.dumps(data).encode())
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data)


def _get_depth(d):
if isinstance(d, dict):
return 1 + (max(map(_get_depth, d.values())) if d else 0)
return 0


def migrate_bucket_versioning(bucket: str):
client = boto3.client("s3")
res = client.list_objects_v2(Bucket=bucket)
contents = res["Contents"]
moved_files = 0
for s3_file in contents:
if s3_file["Key"].split("/")[0] in UPLOAD_ROOT_BUCKETS:
key = s3_file["Key"]
key_array = key.split("/")
if len(key_array) == 5:
key_array.insert(4, "000")
new_key = "/".join(key_array)
client.copy({"Bucket": bucket, "Key": key}, bucket, new_key)
client.delete_object(Bucket=bucket, Key=key)
moved_files += 1
print(f"Moved {moved_files} uploads")

study_periods = _get_s3_data("metadata/study_periods.json", bucket, client)

if _get_depth(study_periods) == 3:
new_sp = {}
for site in study_periods:
new_sp[site] = {}
for study in study_periods[site]:
new_sp[site][study] = {}
new_sp[site][study]["000"] = study_periods[site][study]
_put_s3_data("metadata/study_periods.json", bucket, client, new_sp)
print("study_periods.json updated")
else:
print("study_periods.json does not need update")

transactions = _get_s3_data("metadata/transactions.json", bucket, client)
if _get_depth(transactions) == 4:
new_t = {}
for site in transactions:
new_t[site] = {}
for study in transactions[site]:
new_t[site][study] = {}
for dp in transactions[site][study]:
new_t[site][study][dp] = {}
new_t[site][study][dp]["000"] = transactions[site][study][dp]
_put_s3_data("metadata/transactions.json", bucket, client, new_t)
print("transactions.json updated")
else:
print("transactions.json does not need update")


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Util for migrating aggregator data"""
)
parser.add_argument("-b", "--bucket", help="bucket name")
args = parser.parse_args()
migrate_bucket_versioning(args.bucket)
8 changes: 3 additions & 5 deletions src/handlers/dashboard/get_chart_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
"""
import os

from typing import List, Dict

import awswrangler
import boto3
import pandas
Expand All @@ -15,7 +13,7 @@
from src.handlers.shared.functions import http_response


def _get_table_cols(table_name: str) -> List:
def _get_table_cols(table_name: str) -> list:
"""Returns the columns associated with a table.

Since running an athena query takes a decent amount of time due to queueing
Expand All @@ -34,7 +32,7 @@ def _get_table_cols(table_name: str) -> List:
return next(s3_iter).decode().split(",")


def _build_query(query_params: Dict, filters: List, path_params: Dict) -> str:
def _build_query(query_params: dict, filters: list, path_params: dict) -> str:
"""Creates a query from the dashboard API spec"""
table = path_params["subscription_name"]
columns = _get_table_cols(table)
Expand Down Expand Up @@ -67,7 +65,7 @@ def _build_query(query_params: Dict, filters: List, path_params: Dict) -> str:
return query_str


def _format_payload(df: pandas.DataFrame, query_params: Dict, filters: List) -> Dict:
def _format_payload(df: pandas.DataFrame, query_params: dict, filters: list) -> dict:
"""Coerces query results into the return format defined by the dashboard"""
payload = {}
payload["column"] = query_params["column"]
Expand Down
28 changes: 15 additions & 13 deletions src/handlers/shared/functions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
""" Functions used across different lambdas"""
import io
import logging
import json

from typing import Dict, Optional
import logging
from datetime import datetime, timezone
from typing import Optional
dogversioning marked this conversation as resolved.
Show resolved Hide resolved

import boto3

Expand All @@ -26,7 +25,7 @@
}


def http_response(status: int, body: str, allow_cors: bool = False) -> Dict:
def http_response(status: int, body: str, allow_cors: bool = False) -> dict:
"""Generates the payload AWS lambda expects as a return value"""
headers = {"Content-Type": "application/json"}
if allow_cors:
Expand Down Expand Up @@ -57,7 +56,7 @@ def check_meta_type(meta_type: str) -> None:

def read_metadata(
s3_client, s3_bucket_name: str, meta_type: str = JsonFilename.TRANSACTIONS.value
) -> Dict:
) -> dict:
"""Reads transaction information from an s3 bucket as a dictionary"""
check_meta_type(meta_type)
s3_path = f"{BucketPath.META.value}/{meta_type}.json"
Expand All @@ -71,10 +70,11 @@ def read_metadata(


def update_metadata(
metadata: Dict,
metadata: dict,
site: str,
study: str,
data_package: str,
version: str,
target: str,
dt: Optional[datetime] = None,
meta_type: str = JsonFilename.TRANSACTIONS.value,
Expand All @@ -84,25 +84,27 @@ def update_metadata(
if meta_type == JsonFilename.TRANSACTIONS.value:
site_metadata = metadata.setdefault(site, {})
study_metadata = site_metadata.setdefault(study, {})
data_package_metadata = study_metadata.setdefault(
data_package, TRANSACTION_METADATA_TEMPLATE
data_package_metadata = study_metadata.setdefault(data_package, {})
version_metadata = data_package_metadata.setdefault(
version, TRANSACTION_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
data_package_metadata[target] = dt.isoformat()
version_metadata[target] = dt.isoformat()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the version for a target getting set to a date? That bumped me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so that comes back to this data structure (as an example from the unit tests):

                    EXISTING_VERSION: {
                        "version": "1.0",
                        "last_upload": "2023-02-24T15:08:06+00:00",
                        "last_data_update": "2023-02-24T15:08:07.771080+00:00",
                        "last_aggregation": "2023-02-24T15:08:07.771080+00:00",
                        "last_error": None,
                        "deleted": None,
                    }

Other than the transaction format version, these are always either datetimes or none.

But I can do two things:

  • Update the docstring to make this explicit
    One of:
    • Raise an error if you pass in whatever the final name of the versioning key is
    • Change the name of the function to make it explicit that this is for setting fields in a transaction that are datetimes, and allowlist the names of datetime fields (A thing that jamie has asked for, for example, which I may not do here but should leave the option open for, is having some kind of summary metadata about an upload in here, like number of rows)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't push for you doing anything, I think this was me just not being familiar with the data structures / flow here. And maybe the re-use of the increasingly common word version - is version_metadata really a better name than data_package_metadata here? Maybe another word like data_version_metadata.

But regardless, it's fine as is.

elif meta_type == JsonFilename.STUDY_PERIODS.value:
site_metadata = metadata.setdefault(site, {})
study_period_metadata = site_metadata.setdefault(
study, STUDY_PERIOD_METADATA_TEMPLATE
study_period_metadata = site_metadata.setdefault(study, {})
version_metadata = study_period_metadata.setdefault(
version, STUDY_PERIOD_METADATA_TEMPLATE
)
dt = dt or datetime.now(timezone.utc)
study_period_metadata[target] = dt.isoformat()
version_metadata[target] = dt.isoformat()
return metadata


def write_metadata(
s3_client,
s3_bucket_name: str,
metadata: Dict,
metadata: dict,
meta_type: str = JsonFilename.TRANSACTIONS.value,
) -> None:
"""Writes transaction info from ∏a dictionary to an s3 bucket metadata location"""
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/site_upload/fetch_upload_url.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ def upload_url_handler(event, context):
res = create_presigned_post(
os.environ.get("BUCKET_NAME"),
f"{BucketPath.UPLOAD.value}/{body['study']}/{body['data_package']}/"
f"{metadata_db[user]['path']}/{body['filename']}",
f"{body['version']}/{metadata_db[user]['path']}/{body['filename']}",
)
return res
16 changes: 6 additions & 10 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
import logging
import os
import traceback

from datetime import datetime, timezone

import awswrangler
import boto3
import pandas

from numpy import nan
from pandas.core.indexes.range import RangeIndex

from src.handlers.shared.awswrangler_functions import get_s3_data_package_list
from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.enums import BucketPath
from src.handlers.shared.awswrangler_functions import get_s3_data_package_list
from src.handlers.shared.functions import (
get_s3_site_filename_suffix,
http_response,
Expand Down Expand Up @@ -47,7 +45,7 @@ def __init__(self, event):
self.site = s3_key_array[3]
self.study = s3_key_array[1]
self.data_package = s3_key_array[2]

self.version = s3_key_array[4]
self.metadata = read_metadata(self.s3_client, self.s3_bucket_name)

# S3 Filesystem operations
Expand Down Expand Up @@ -78,7 +76,7 @@ def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None
"""writes dataframe as parquet to s3 and sends an SNS notification if new"""
parquet_aggregate_path = (
f"s3://{self.s3_bucket_name}/{BucketPath.AGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}__{self.data_package}__aggregate.parquet"
)
awswrangler.s3.to_parquet(df, parquet_aggregate_path, index=False)
Expand All @@ -92,7 +90,7 @@ def write_csv(self, df: pandas.DataFrame) -> None:
"""writes dataframe as csv to s3"""
csv_aggregate_path = (
f"s3://{self.s3_bucket_name}/{BucketPath.CSVAGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}__{self.data_package}__aggregate.csv"
)
df = df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace(
Expand All @@ -109,7 +107,7 @@ def update_local_metadata(self, key, site=None):
if site is None:
site = self.site
self.metadata = update_metadata(
self.metadata, site, self.study, self.data_package, key
self.metadata, site, self.study, self.data_package, self.version, key
)

def write_local_metadata(self):
Expand Down Expand Up @@ -162,9 +160,7 @@ def expand_and_concat_sets(
site_df["site"] = get_static_string_series(None, site_df.index)
df_copy["site"] = get_static_string_series(site_name, df_copy.index)

# TODO: we should introduce some kind of data versioning check to see if datasets
# are generated from the same vintage. This naive approach will cause a decent
# amount of data churn we'll have to manage in the interim.
# Did we change the schema without updating the version?
if df.empty is False and set(site_df.columns) != set(df.columns):
raise MergeError(
"Uploaded data has a different schema than last aggregate",
Expand Down
13 changes: 11 additions & 2 deletions src/handlers/site_upload/process_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> N
study = path_params[1]
data_package = path_params[2]
site = path_params[3]
version = path_params[4]
if s3_key.endswith(".parquet"):
if "_meta_" in s3_key:
if "__meta_" in s3_key:
new_key = f"{BucketPath.STUDY_META.value}/{s3_key.split('/', 1)[-1]}"
topic_sns_arn = os.environ.get("TOPIC_PROCESS_STUDY_META_ARN")
sns_subject = "Process study medata upload event"
Expand All @@ -43,6 +44,7 @@ def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> N
site,
study,
data_package,
version,
"last_upload",
last_uploaded_date,
)
Expand All @@ -56,11 +58,18 @@ def process_upload(s3_client, sns_client, s3_bucket_name: str, s3_key: str) -> N
site,
study,
data_package,
version,
"last_upload",
last_uploaded_date,
)
metadata = update_metadata(
metadata, site, study, data_package, "last_error", last_uploaded_date
metadata,
site,
study,
data_package,
version,
"last_error",
last_uploaded_date,
)
write_metadata(s3_client, s3_bucket_name, metadata)
raise UnexpectedFileTypeError
Expand Down
Loading