Skip to content

Commit

Permalink
Migration for new data package format (#122)
Browse files Browse the repository at this point in the history
* Migration for new data package format

* more layers

* moved pandas function to seperate file
  • Loading branch information
dogversioning authored Oct 7, 2024
1 parent ff4bee6 commit 5dbb54d
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 51 deletions.
171 changes: 171 additions & 0 deletions scripts/migrations/migration.003.data_packages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import argparse
import enum
import io
import json
import os

import awswrangler
import boto3
import pandas
from rich import progress


class JsonFilename(enum.Enum):
"""stores names of expected kinds of persisted S3 JSON files"""

COLUMN_TYPES = "column_types"
TRANSACTIONS = "transactions"
DATA_PACKAGES = "data_packages"
STUDY_PERIODS = "study_periods"


class BucketPath(enum.Enum):
"""stores root level buckets for managing data processing state"""

ADMIN = "admin"
AGGREGATE = "aggregates"
ARCHIVE = "archive"
CACHE = "cache"
CSVAGGREGATE = "csv_aggregates"
ERROR = "error"
LAST_VALID = "last_valid"
LATEST = "latest"
META = "metadata"
STUDY_META = "study_metadata"
UPLOAD = "site_upload"


def get_csv_column_datatypes(dtypes):
"""helper for generating column type for dashboard API"""
column_dict = {}
for column in dtypes.index:
if column.endswith("year"):
column_dict[column] = "year"
elif column.endswith("month"):
column_dict[column] = "month"
elif column.endswith("week"):
column_dict[column] = "week"
elif column.endswith("day") or str(dtypes[column]) == "datetime64":
column_dict[column] = "day"
elif "cnt" in column or str(dtypes[column]) in (
"Int8",
"Int16",
"Int32",
"Int64",
"UInt8",
"UInt16",
"UInt32",
"UInt64",
):
column_dict[column] = "integer"
elif str(dtypes[column]) in ("Float32", "Float64"):
column_dict[column] = "float"
elif str(dtypes[column]) == "boolean":
column_dict[column] = "float"
else:
column_dict[column] = "string"
return column_dict


def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None:
"""Convenience class for writing a dict to S3"""
b_data = io.BytesIO(json.dumps(data).encode())
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data)


def update_column_type_metadata(bucket: str):
"""creates a new metadata dict for column types.
By design, this will replaces an existing column type dict if one already exists.
"""
client = boto3.client("s3")
res = client.list_objects_v2(Bucket=bucket, Prefix="aggregates/")
contents = res["Contents"]
output = {}
for resource in progress.track(contents):
dirs = resource["Key"].split("/")
study = dirs[1]
subscription = dirs[2].split("__")[1]
version = dirs[3]
bytes_buffer = io.BytesIO()
client.download_fileobj(Bucket=bucket, Key=resource["Key"], Fileobj=bytes_buffer)
df = pandas.read_parquet(bytes_buffer)
type_dict = get_csv_column_datatypes(df.dtypes)
output.setdefault(study, {})
output[study].setdefault(subscription, {})
output[study][subscription].setdefault(version, {})
output[study][subscription][version]["column_types_format_version"] = 2
output[study][subscription][version]["columns"] = type_dict
output[study][subscription][version]["last_data_update"] = (
resource["LastModified"].now().isoformat()
)
output[study][subscription][version]["s3_path"] = resource["Key"][:-8] + ".csv"
output[study][subscription][version]["total"] = int(df["cnt"][0])
_put_s3_data("metadata/column_types.json", bucket, client, output)


def get_s3_json_as_dict(bucket, key: str):
"""reads a json object as dict (typically metadata in this case)"""
s3_client = boto3.client("s3")
bytes_buffer = io.BytesIO()
print(bucket)
print(key)
s3_client.download_fileobj(
Bucket=bucket,
Key=key,
Fileobj=bytes_buffer,
)
return json.loads(bytes_buffer.getvalue().decode())


def cache_api_data(s3_bucket_name: str, db: str) -> None:
s3_client = boto3.client("s3")
df = awswrangler.athena.read_sql_query(
(
f"SELECT table_name FROM information_schema.tables " # noqa: S608
f"WHERE table_schema = '{db}'" # nosec
),
database=db,
s3_output=f"s3://{s3_bucket_name}/awswrangler",
workgroup=os.environ.get("WORKGROUP_NAME"),
)
data_packages = df[df["table_name"].str.contains("__")].iloc[:, 0]
column_types = get_s3_json_as_dict(
s3_bucket_name,
f"{BucketPath.META.value}/{JsonFilename.COLUMN_TYPES.value}.json",
)
dp_details = []
for dp in list(data_packages):
dp_detail = {
"study": dp.split("__", 1)[0],
"name": dp.split("__", 1)[1],
}
try:
versions = column_types[dp_detail["study"]][dp_detail["name"]]
for version in versions:
dp_details.append(
{
**dp_detail,
**versions[version],
"version": version,
"id": dp + "__" + version,
}
)
except KeyError:
continue
s3_client.put_object(
Bucket=s3_bucket_name,
Key=f"{BucketPath.CACHE.value}/{JsonFilename.DATA_PACKAGES.value}.json",
Body=json.dumps(dp_details),
)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Creates data package metadata for existing aggregates. """
)
parser.add_argument("-b", "--bucket", help="bucket name")
parser.add_argument("-d", "--db", help="database name")
args = parser.parse_args()
update_column_type_metadata(args.bucket)
cache_api_data(args.bucket, args.db)
35 changes: 1 addition & 34 deletions src/handlers/shared/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import UTC, datetime

import boto3
import pandas

from src.handlers.shared import enums

Expand All @@ -27,7 +26,7 @@
}

COLUMN_TYPES_METADATA_TEMPLATE = {
enums.ColumnTypesKeys.COLUMN_TYPES_FORMAT_VERSION.value: "1",
enums.ColumnTypesKeys.COLUMN_TYPES_FORMAT_VERSION.value: "2",
enums.ColumnTypesKeys.COLUMNS.value: None,
enums.ColumnTypesKeys.LAST_DATA_UPDATE.value: None,
}
Expand Down Expand Up @@ -221,35 +220,3 @@ def get_latest_data_package_version(bucket, prefix):
if highest_ver is None:
logging.error("No data package versions found for %s", prefix)
return highest_ver


def get_column_datatypes(dtypes: pandas.DataFrame):
"""helper for generating column type for dashboard API"""
column_dict = {}
for column in dtypes.index:
if column.endswith("year"):
column_dict[column] = "year"
elif column.endswith("month"):
column_dict[column] = "month"
elif column.endswith("week"):
column_dict[column] = "week"
elif column.endswith("day") or str(dtypes[column]) == "datetime64":
column_dict[column] = "day"
elif column.startswith("cnt") or str(dtypes[column]) in (
"Int8",
"Int16",
"Int32",
"Int64",
"UInt8",
"UInt16",
"UInt32",
"UInt64",
):
column_dict[column] = "integer"
elif str(dtypes[column]) in ("Float32", "Float64"):
column_dict[column] = "float"
elif str(dtypes[column]) == "boolean":
column_dict[column] = "boolean"
else:
column_dict[column] = "string"
return column_dict
35 changes: 35 additions & 0 deletions src/handlers/shared/pandas_functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Pandas functions used across different functions"""

import pandas


def get_column_datatypes(dtypes: pandas.DataFrame):
"""helper for generating column type for dashboard API"""
column_dict = {}
for column in dtypes.index:
if column.endswith("year"):
column_dict[column] = "year"
elif column.endswith("month"):
column_dict[column] = "month"
elif column.endswith("week"):
column_dict[column] = "week"
elif column.endswith("day") or str(dtypes[column]) == "datetime64":
column_dict[column] = "day"
elif column.startswith("cnt") or str(dtypes[column]) in (
"Int8",
"Int16",
"Int32",
"Int64",
"UInt8",
"UInt16",
"UInt32",
"UInt64",
):
column_dict[column] = "integer"
elif str(dtypes[column]) in ("Float32", "Float64"):
column_dict[column] = "float"
elif str(dtypes[column]) == "boolean":
column_dict[column] = "boolean"
else:
column_dict[column] = "string"
return column_dict
10 changes: 8 additions & 2 deletions src/handlers/site_upload/cache_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,20 @@ def cache_api_data(s3_client, s3_bucket_name: str, db: str, target: str) -> None
dp_details = []
for dp in list(data_packages):
dp_detail = {
"id": dp,
"study": dp.split("__", 1)[0],
"name": dp.split("__", 1)[1],
}
try:
versions = column_types[dp_detail["study"]][dp_detail["name"]]
for version in versions:
dp_details.append({**dp_detail, **versions[version], "version": version})
dp_details.append(
{
**dp_detail,
**versions[version],
"version": version,
"id": dp + "__" + version,
}
)
except KeyError:
continue
s3_client.put_object(
Expand Down
24 changes: 15 additions & 9 deletions src/handlers/site_upload/powerset_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
import pandas
from pandas.core.indexes.range import RangeIndex

from src.handlers.shared import awswrangler_functions, decorators, enums, functions
from src.handlers.shared import (
awswrangler_functions,
decorators,
enums,
functions,
pandas_functions,
)

log_level = os.environ.get("LAMBDA_LOG_LEVEL", "INFO")
logger = logging.getLogger()
Expand Down Expand Up @@ -45,6 +51,11 @@ def __init__(self, event):
self.s3_bucket_name,
meta_type=enums.JsonFilename.COLUMN_TYPES.value,
)
self.csv_aggerate_path = (
f"s3://{self.s3_bucket_name}/{enums.BucketPath.CSVAGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}__{self.data_package}__aggregate.csv"
)

# S3 Filesystem operations
def get_data_package_list(self, path) -> list:
Expand Down Expand Up @@ -86,14 +97,9 @@ def write_parquet(self, df: pandas.DataFrame, is_new_data_package: bool) -> None

def write_csv(self, df: pandas.DataFrame) -> None:
"""writes dataframe as csv to s3"""
csv_aggregate_path = (
f"s3://{self.s3_bucket_name}/{enums.BucketPath.CSVAGGREGATE.value}/"
f"{self.study}/{self.study}__{self.data_package}/{self.version}/"
f"{self.study}__{self.data_package}__aggregate.csv"
)
df = df.apply(lambda x: x.strip() if isinstance(x, str) else x).replace('""', numpy.nan)
df = df.replace(to_replace=r",", value="", regex=True)
awswrangler.s3.to_csv(df, csv_aggregate_path, index=False, quoting=csv.QUOTE_NONE)
awswrangler.s3.to_csv(df, self.csv_aggerate_path, index=False, quoting=csv.QUOTE_NONE)

# metadata
def update_local_metadata(
Expand Down Expand Up @@ -339,13 +345,13 @@ def merge_powersets(manager: S3Manager) -> None:
manager.write_local_metadata()

# Updating the typing dict for the column type API
column_dict = functions.get_column_datatypes(df.dtypes)
column_dict = pandas_functions.get_column_datatypes(df.dtypes)
manager.update_local_metadata(
enums.ColumnTypesKeys.COLUMNS.value,
value=column_dict,
metadata=manager.types_metadata,
meta_type=enums.JsonFilename.COLUMN_TYPES.value,
extra_items={"total": int(df["cnt"][0])},
extra_items={"total": int(df["cnt"][0]), "s3_path": manager.csv_aggerate_path},
)
manager.update_local_metadata(
enums.ColumnTypesKeys.LAST_DATA_UPDATE.value,
Expand Down
8 changes: 4 additions & 4 deletions template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Resources:
LoggingConfig:
ApplicationLogLevel: !Ref LogLevel
LogFormat: !Ref LogFormat
LogGroup: !Sub "/aws/lambda/CumulusAggFetchAuthorizerFunction-${DeployStage}"
LogGroup: !Sub "/aws/lambda/CumulusAggFetchAuthorizer-${DeployStage}"
MemorySize: 128
Timeout: 100
Description: Validates credentials before providing signed urls
Expand Down Expand Up @@ -115,7 +115,7 @@ Resources:
LoggingConfig:
ApplicationLogLevel: !Ref LogLevel
LogFormat: !Ref LogFormat
LogGroup: !Sub "/aws/lambda/CumulusAggFetchUploadUrlFunction-${DeployStage}"
LogGroup: !Sub "/aws/lambda/CumulusAggFetchUploadUrl-${DeployStage}"
MemorySize: 128
Timeout: 100
Description: Generates a presigned URL for uploading files to S3
Expand Down Expand Up @@ -156,7 +156,7 @@ Resources:
LoggingConfig:
ApplicationLogLevel: !Ref LogLevel
LogFormat: !Ref LogFormat
LogGroup: !Sub "/aws/lambda/CumulusAggProcessUploadFunction-${DeployStage}"
LogGroup: !Sub "/aws/lambda/CumulusAggProcessUpload-${DeployStage}"
MemorySize: 128
Timeout: 800
Description: Handles initial relocation of upload data
Expand Down Expand Up @@ -196,7 +196,7 @@ Resources:
LoggingConfig:
ApplicationLogLevel: !Ref LogLevel
LogFormat: !Ref LogFormat
LogGroup: !Sub "/aws/lambda/CumulusAggPowersetMergeFunction-${DeployStage}"
LogGroup: !Sub "/aws/lambda/CumulusAggPowersetMerge-${DeployStage}"
MemorySize: 8192
Timeout: 800
Description: Merges and aggregates powerset count data
Expand Down
4 changes: 2 additions & 2 deletions tests/shared/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas
import pytest

from src.handlers.shared import functions
from src.handlers.shared import functions, pandas_functions


@pytest.mark.parametrize(
Expand Down Expand Up @@ -48,7 +48,7 @@ def test_column_datatypes():
"string": ["string"],
}
)
col_types = functions.get_column_datatypes(df.dtypes)
col_types = pandas_functions.get_column_datatypes(df.dtypes)
assert col_types == {
"study_year": "year",
"study_month": "month",
Expand Down

0 comments on commit 5dbb54d

Please sign in to comment.