Skip to content

Commit

Permalink
CSV file download API (#114)
Browse files Browse the repository at this point in the history
* CSV file download API

* PR feedback, removed unused pylint excepts

* remove parquet, looping for >1000 results

* PR feedback

* addded column-descriptions header
  • Loading branch information
dogversioning authored Feb 27, 2024
1 parent 85c926d commit eb2fff4
Show file tree
Hide file tree
Showing 31 changed files with 905 additions and 285 deletions.
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ dependencies= [
"arrow >=1.2.3",
"awswrangler >=3.5, <4",
"boto3",
"pandas >=2, <3"
"pandas >=2, <3",
"rich",
]
authors = [
{ name="Matt Garber", email="matthew.garber@childrens.harvard.edu" },
Expand Down
86 changes: 86 additions & 0 deletions scripts/migrations/migration.002.column_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
""" Adds a new metadata type, column_types """

import argparse
import io
import json

import boto3
import pandas
from rich import progress


def get_csv_column_datatypes(dtypes):
"""helper for generating column type for dashboard API"""
column_dict = {}
for column in dtypes.index:
if column.endswith("year"):
column_dict[column] = "year"
elif column.endswith("month"):
column_dict[column] = "month"
elif column.endswith("week"):
column_dict[column] = "week"
elif column.endswith("day") or str(dtypes[column]) == "datetime64":
column_dict[column] = "day"
elif "cnt" in column or str(dtypes[column]) in (
"Int8",
"Int16",
"Int32",
"Int64",
"UInt8",
"UInt16",
"UInt32",
"UInt64",
):
column_dict[column] = "integer"
elif str(dtypes[column]) in ("Float32", "Float64"):
column_dict[column] = "float"
elif str(dtypes[column]) == "boolean":
column_dict[column] = "float"
else:
column_dict[column] = "string"
return column_dict


def _put_s3_data(key: str, bucket_name: str, client, data: dict) -> None:
"""Convenience class for writing a dict to S3"""
b_data = io.BytesIO(json.dumps(data).encode())
client.upload_fileobj(Bucket=bucket_name, Key=key, Fileobj=b_data)


def create_column_type_metadata(bucket: str):
"""creates a new metadata dict for column types.
By design, this will replaces an existing column type dict if one already exists.
"""
client = boto3.client("s3")
res = client.list_objects_v2(Bucket=bucket, Prefix="aggregates/")
contents = res["Contents"]
output = {}
for resource in progress.track(contents):
dirs = resource["Key"].split("/")
study = dirs[1]
subscription = dirs[2].split("__")[1]
version = dirs[3]
bytes_buffer = io.BytesIO()
client.download_fileobj(
Bucket=bucket, Key=resource["Key"], Fileobj=bytes_buffer
)
df = pandas.read_parquet(bytes_buffer)
type_dict = get_csv_column_datatypes(df.dtypes)
filename = f"{resource['Key'].split('/')[-1].split('.')[0]}.csv"
output.setdefault(study, {})
output[study].setdefault(subscription, {})
output[study][subscription].setdefault(version, {})
output[study][subscription][version]["columns"] = type_dict
output[study][subscription][version]["filename"] = filename
# print(json.dumps(output, indent=2))
_put_s3_data("metadata/column_types.json", bucket, client, output)


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Creates column types for existing aggregates. """
)
parser.add_argument("-b", "--bucket", help="bucket name")
args = parser.parse_args()
create_column_type_metadata(args.bucket)
8 changes: 4 additions & 4 deletions src/handlers/dashboard/get_chart_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
import boto3
import pandas

from ..dashboard.filter_config import get_filter_string
from ..shared.decorators import generic_error_handler
from ..shared.enums import BucketPath
from ..shared.functions import get_latest_data_package_version, http_response
from src.handlers.dashboard.filter_config import get_filter_string
from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.enums import BucketPath
from src.handlers.shared.functions import get_latest_data_package_version, http_response


def _get_table_cols(table_name: str, version: str | None = None) -> list:
Expand Down
122 changes: 122 additions & 0 deletions src/handlers/dashboard/get_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import os

import boto3
import botocore

from src.handlers.shared import decorators, enums, functions


def _format_and_validate_key(
s3_client,
s3_bucket_name: str,
study: str,
subscription: str,
version: str,
filename: str,
site: str | None = None,
):
"""Creates S3 key from url params"""
if site is not None:
key = f"last_valid/{study}/{study}__{subscription}/{site}/{version}/{filename}"
else:
key = f"csv_aggregates/{study}/{study}__{subscription}/{version}/{filename}"
try:
s3_client.head_object(Bucket=s3_bucket_name, Key=key)
return key
except botocore.exceptions.ClientError as e:
raise OSError(f"No object found at key {key}") from e


def _get_column_types(
s3_client,
s3_bucket_name: str,
study: str,
subscription: str,
version: str,
**kwargs,
) -> dict:
"""Gets column types from the metadata store for a given subscription"""
types_metadata = functions.read_metadata(
s3_client,
s3_bucket_name,
meta_type=enums.JsonFilename.COLUMN_TYPES.value,
)
try:
return types_metadata[study][subscription][version][
enums.ColumnTypesKeys.COLUMNS.value
]
except KeyError:
return {}


@decorators.generic_error_handler(msg="Error retrieving chart data")
def get_csv_handler(event, context):
"""manages event from dashboard api call and creates a temporary URL"""
del context
s3_bucket_name = os.environ.get("BUCKET_NAME")
s3_client = boto3.client("s3")
key = _format_and_validate_key(s3_client, s3_bucket_name, **event["pathParameters"])
types = _get_column_types(s3_client, s3_bucket_name, **event["pathParameters"])
presign_url = s3_client.generate_presigned_url(
"get_object",
Params={
"Bucket": s3_bucket_name,
"Key": key,
"ResponseContentType": "text/csv",
},
ExpiresIn=600,
)
extra_headers = {
"Location": presign_url,
"x-column-names": ",".join(key for key in types.keys()),
"x-column-types": ",".join(key for key in types.values()),
# TODO: add data to x-column-descriptions once a source for column descriptions
# has been established
"x-column-descriptions": "",
}
res = functions.http_response(302, "", extra_headers=extra_headers)
return res


@decorators.generic_error_handler(msg="Error retrieving csv data")
def get_csv_list_handler(event, context):
"""manages event from dashboard api call and creates a temporary URL"""
del context
s3_bucket_name = os.environ.get("BUCKET_NAME")
s3_client = boto3.client("s3")
if event["path"].startswith("/last_valid"):
key_prefix = "last_valid"
url_prefix = "last_valid"
elif event["path"].startswith("/aggregates"):
key_prefix = "csv_aggregates"
url_prefix = "aggregates"
else:
raise Exception("Unexpected url encountered")

urls = []
s3_objs = s3_client.list_objects_v2(Bucket=s3_bucket_name, Prefix=key_prefix)
if s3_objs["KeyCount"] == 0:
return functions.http_response(200, urls)
while True:
for obj in s3_objs["Contents"]:
if not obj["Key"].endswith(".csv"):
continue
key_parts = obj["Key"].split("/")
study = key_parts[1]
subscription = key_parts[2].split("__")[1]
version = key_parts[-2]
filename = key_parts[-1]
site = key_parts[3] if url_prefix == "last_valid" else None
url_parts = [url_prefix, study, subscription, version, filename]
if url_prefix == "last_valid":
url_parts.insert(3, site)
urls.append("/".join(url_parts))
if not s3_objs["IsTruncated"]:
break
s3_objs = s3_client.list_objects_v2(
Bucket=s3_bucket_name,
Prefix=key_prefix,
ContinuationToken=s3_objs["NextContinuationToken"],
)
res = functions.http_response(200, urls)
return res
6 changes: 3 additions & 3 deletions src/handlers/dashboard/get_data_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@

import os

from ..shared.decorators import generic_error_handler
from ..shared.enums import BucketPath, JsonFilename
from ..shared.functions import get_s3_json_as_dict, http_response
from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.enums import BucketPath, JsonFilename
from src.handlers.shared.functions import get_s3_json_as_dict, http_response


@generic_error_handler(msg="Error retrieving data packages")
Expand Down
6 changes: 4 additions & 2 deletions src/handlers/dashboard/get_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

import boto3

from ..shared.decorators import generic_error_handler
from ..shared.functions import http_response, read_metadata
from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.functions import http_response, read_metadata


@generic_error_handler(msg="Error retrieving metadata")
Expand All @@ -22,5 +22,7 @@ def metadata_handler(event, context):
metadata = metadata[params["study"]]
if "data_package" in params:
metadata = metadata[params["data_package"]]
if "version" in params:
metadata = metadata[params["version"]]
res = http_response(200, metadata)
return res
6 changes: 3 additions & 3 deletions src/handlers/dashboard/get_study_periods.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

import boto3

from ..shared.decorators import generic_error_handler
from ..shared.enums import JsonFilename
from ..shared.functions import http_response, read_metadata
from src.handlers.shared.decorators import generic_error_handler
from src.handlers.shared.enums import JsonFilename
from src.handlers.shared.functions import http_response, read_metadata


@generic_error_handler(msg="Error retrieving study period")
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/shared/awswrangler_functions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
""" functions specifically requiring AWSWranger, which requires a lambda layer"""
import awswrangler

from .enums import BucketPath
from src.handlers.shared.enums import BucketPath


def get_s3_data_package_list(
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/shared/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import functools
import logging

from .functions import http_response
from src.handlers.shared.functions import http_response


def generic_error_handler(msg="Internal server error"):
Expand Down
19 changes: 14 additions & 5 deletions src/handlers/shared/enums.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""Enums shared across lambda functions"""
from enum import Enum
import enum


class BucketPath(Enum):
class BucketPath(enum.Enum):
"""stores root level buckets for managing data processing state"""

ADMIN = "admin"
Expand All @@ -18,15 +18,24 @@ class BucketPath(Enum):
UPLOAD = "site_upload"


class JsonFilename(Enum):
class ColumnTypesKeys(enum.Enum):
"""stores names of expected keys in the study period metadata dictionary"""

COLUMN_TYPES_FORMAT_VERSION = "column_types_format_version"
COLUMNS = "columns"
LAST_DATA_UPDATE = "last_data_update"


class JsonFilename(enum.Enum):
"""stores names of expected kinds of persisted S3 JSON files"""

COLUMN_TYPES = "column_types"
TRANSACTIONS = "transactions"
DATA_PACKAGES = "data_packages"
STUDY_PERIODS = "study_periods"


class TransactionKeys(Enum):
class TransactionKeys(enum.Enum):
"""stores names of expected keys in the transaction dictionary"""

TRANSACTION_FORMAT_VERSION = "transaction_format_version"
Expand All @@ -37,7 +46,7 @@ class TransactionKeys(Enum):
DELETED = "deleted"


class StudyPeriodMetadataKeys(Enum):
class StudyPeriodMetadataKeys(enum.Enum):
"""stores names of expected keys in the study period metadata dictionary"""

STUDY_PERIOD_FORMAT_VERSION = "study_period_format_version"
Expand Down
Loading

0 comments on commit eb2fff4

Please sign in to comment.