Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use the new capepy library to simplify code #5

Merged
merged 1 commit into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 5 additions & 57 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,13 @@
"""ETL script Template."""

import sys
from pathlib import Path

import boto3 as boto3
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from capepy.aws.glue import EtlJob

# Initialize logging and context
spark_ctx = SparkContext()
glue_ctx = GlueContext(spark_ctx)
logger = glue_ctx.get_logger()

# Evaluate parameters
parameters = getResolvedOptions(
sys.argv,
[
"RAW_BUCKET_NAME",
"ALERT_OBJ_KEY",
"CLEAN_BUCKET_NAME",
],
)
raw_bucket = parameters["RAW_BUCKET_NAME"]
raw_key = parameters["ALERT_OBJ_KEY"]
clean_bucket = parameters["CLEAN_BUCKET_NAME"]

# Retrieve the raw file passed into the ETL script
# Fail nicely if there is an error and log it
s3_client = boto3.client("s3")
response = s3_client.get_object(Bucket=raw_bucket, Key=raw_key)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status != 200:
err = (
f"ERROR - Could not get object {raw_key} from bucket "
f"{raw_bucket}. ETL Cannot continue."
)
logger.error(err)
raise Exception(err)

logger.info(f"Obtained object {raw_key} from bucket {raw_bucket}.")
etl_job = EtlJob()

# `raw` has the contents of the raw file passed into the script
raw = response.get("Body")
raw = etl_job.get_raw_file()

# TODO: Here you want to clean the contents of the `raw` variable
# and produce the "cleaned" content to the `cleaned` variable
Expand All @@ -52,24 +16,8 @@
# TODO: Specify the name of the new clean file
# We typically just want to replace the file extension with a new one
# Below is an example of this, update with the correct extension
clean_key = str(Path(raw_key).with_suffix(".csv"))
clean_key = str(Path(etl_job.parameters["OBJECT_KEY"]).with_suffix(".csv"))

# Put the new cleaned object into the clean bucket
if cleaned is not None:
response = s3_client.put_object(
Bucket=clean_bucket, Key=clean_key, Body=cleaned
)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status != 200:
err = (
f"ERROR - Could not write transformed data object {clean_key} "
f"to bucket {clean_bucket}. ETL Cannot continue."
)
logger.error(err)
raise Exception(err)

logger.info(
f"Transformed {raw_bucket}/{raw_key} and wrote result "
f"to {clean_bucket}/{clean_key}"
)
etl_job.write_clean_file(cleaned, clean_key=clean_key)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aws-glue-libs @ git+https://github.com/awslabs/aws-glue-libs@9d8293962e6ffc607e5dc328e246f40b24010fa8
boto3==1.34.103
capepy>=1.0.0,<2.0.0
pyspark==3.5.1