Skip to content

Commit

Permalink
Trigger the insert lambda upon creation of the file in s3 bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
rohith1122 committed Feb 6, 2024
1 parent 430548d commit 6c84330
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 92 deletions.
35 changes: 0 additions & 35 deletions build/jenkins/dev/Dev_Deployment.Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -105,41 +105,6 @@ pipeline {
}
}
}
stage('Perform Insert Lambda function 1 of 5') {
steps {
script {
sh "make postcode-insert-etl PROFILE=${env.PROFILE}"
}
}
}
stage('Perform Insert Lambda function 2 of 5') {
steps {
script {
sh "make postcode-insert-etl PROFILE=${env.PROFILE}"
}
}
}
stage('Perform Insert Lambda function 3 of 5') {
steps {
script {
sh "make postcode-insert-etl PROFILE=${env.PROFILE}"
}
}
}
stage('Perform Insert Lambda function 4 of 5') {
steps {
script {
sh "make postcode-insert-etl PROFILE=${env.PROFILE}"
}
}
}
stage('Perform Insert Lambda function 5 of 5') {
steps {
script {
sh "make postcode-insert-etl PROFILE=${env.PROFILE}"
}
}
}
stage('Smoke Tests') {
steps {
script {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
DOS_READ_ONLY_USER = os.environ.get("DOS_READ_ONLY_USER")
LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL")

logging.basicConfig(level=LOGGING_LEVEL)
logger=logging.getLogger(__name__)

logger = logging.getLogger()
logger.setLevel(LOGGING_LEVEL)

def get_secret():

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import os
import logging
import pandas as pd
import glob
import time
import urllib.parse
s3 = boto3.resource("s3")
dynamodb = boto3.resource("dynamodb")
SOURCE_BUCKET = os.environ.get("SOURCE_BUCKET")
Expand All @@ -15,57 +15,47 @@
DYNAMODB_DESTINATION_TABLE = os.environ.get("DYNAMODB_DESTINATION_TABLE")
LOGGING_LEVEL = os.environ.get("LOGGING_LEVEL")

logging.basicConfig(level=LOGGING_LEVEL)
logger = logging.getLogger(__name__)
logger = logging.getLogger()
logger.setLevel(LOGGING_LEVEL)
data_frame = None

# method to loop over items in s3 Bucket
def read_csv_files():
bucket = s3.Bucket(SOURCE_BUCKET)
def read_csv_file(bucket_name,key):
bucket = s3.Bucket(bucket_name)
response = dict()

try:
# delete the previously processed files so we have a clean workspace
bucket.objects.filter(Prefix=PROCESSED_FOLDER).delete()
# run the process for the first 8 files and this is due to the lambda timeout
postcode_location_csv_files = list(bucket.objects.filter(Prefix=INPUT_FOLDER, Delimiter="/"))[:8]
print(postcode_location_csv_files)


for postcode_location_csv_file in postcode_location_csv_files:
name = postcode_location_csv_file.key
if name != INPUT_FOLDER:
print("processing: " + name)
status = read_csv_file_data(postcode_location_csv_file.get())
print("read CSV data status: " + status)
response[name] = status
process_completed_csv_files(bucket, status, postcode_location_csv_file)
print("process completed: " + name)

logger.info("processing: " + str(key))
s3_object = s3.get_object(Bucket=bucket, Key=key)
status = read_csv_file_data(s3_object)
logger.info("read CSV data status: " + status)
response[key] = status
process_completed_csv_files(bucket, status, s3_object)
logger.info("process completed: " + key)
return response
except Exception as e:
logger.error("unable to retrieve csv files due to {}".format(e))
finally:
return response


def process_completed_csv_files(bucket, status, postcode_location_csv_file):
if status.startswith("FAILED"):
post_process_file_path = ERROR_FOLDER + postcode_location_csv_file.key.replace(INPUT_FOLDER, "")
else:
post_process_file_path = SUCCESS_FOLDER + postcode_location_csv_file.key.replace(INPUT_FOLDER, "")

print("moving processed file to: " + post_process_file_path)
logger.info("moving processed file to: " + post_process_file_path)

# copy the processed file to the relevant processed folder
copy_source = {"Bucket": SOURCE_BUCKET, "Key": postcode_location_csv_file.key}
s3.meta.client.copy(copy_source, SOURCE_BUCKET, post_process_file_path)

print("copied: " + postcode_location_csv_file.key + " to: " + SOURCE_BUCKET + " with path " + post_process_file_path)
logger.info("copied: " + postcode_location_csv_file.key + " to: " + SOURCE_BUCKET + " with path " + post_process_file_path)

# delete the processed file so we have a clean workspace
bucket.objects.filter(Prefix=postcode_location_csv_file.key).delete()

print("filtered: " + postcode_location_csv_file.key + " from: " + SOURCE_BUCKET)
logger.info("filtered: " + postcode_location_csv_file.key + " from: " + SOURCE_BUCKET)


# method to read data from a single postcode_locations csv file
Expand All @@ -90,13 +80,12 @@ def read_csv_file_data(postcode_location_csv_file):
postcode_location_records.append(record)

i = i + 1

insert_bulk_data(postcode_location_records)
status = "PASSED"
return status
except Exception as e:
logger.error("read csv failed due to {}".format(e))
status = "FAILED {}".format(e)
finally:
return status

# binary search method to find a postcode in the dataframe
Expand Down Expand Up @@ -135,7 +124,7 @@ def insert_bulk_data(postcode_location_records):
"orgcode": orgcode
}
)
print("inserted {} records into table {}".format(len(postcode_location_records), DYNAMODB_DESTINATION_TABLE))
logger.info("inserted {} records into table {}".format(len(postcode_location_records), DYNAMODB_DESTINATION_TABLE))

# This is the entry point for the Lambda function
def lambda_handler(event, context):
Expand All @@ -145,10 +134,14 @@ def lambda_handler(event, context):
logger.info("Reading csv file from data/combined.zip")
tic = time.perf_counter()
data_frame = pd.read_csv("./data/combined.zip", compression='zip', header=0, usecols=cols,dtype={'postcode': str, 'orgcode': str})
print("data_frame {}".format(data_frame.size))
logger.info("data_frame {}".format(data_frame.size))
toc = time.perf_counter()
logger.info(f"loaded into dataframe in {toc - tic:0.4f} seconds")
logger.info("Reading csv files from: " + SOURCE_BUCKET)
logger.info("Inserting postcode data to: " + DYNAMODB_DESTINATION_TABLE)
response = read_csv_files()
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
logger.info("bucket: " + bucket)
logger.info("key: " + key)
response = read_csv_file(bucket,key)
return {"statusCode": 200, "body": response}
14 changes: 7 additions & 7 deletions infrastructure/stacks/postcode_etl/locals.tf
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ locals {
dynamoDb_full_access_policy_arn = "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess"
rds_data_read_only_access_policy_arn = "arn:aws:iam::aws:policy/AmazonRDSReadOnlyAccess"

postcode_insert_cloudwatch_event_name = "${var.service_prefix}-postcode-insert-rule"
postcode_insert_cloudwatch_event_description = "Run service every day at 2am, 2:20am, 2:40am, 3:00am, and 3:20am"
postcode_insert_cloudwatch_event_cron_expression = "cron(0 2,2:20,2:40,3,3:20 * * ? *)"
postcode_insert_cloudwatch_event_target = "lambda"
postcode_insert_cloudwatch_event_statement = "AllowExecutionFromCloudWatch"
postcode_insert_cloudwatch_event_action = "lambda:InvokeFunction"
postcode_insert_cloudwatch_event_princinple = "events.amazonaws.com"
# postcode_insert_cloudwatch_event_name = "${var.service_prefix}-postcode-insert-rule"
# postcode_insert_cloudwatch_event_description = "Run service every day at 2am, 2:20am, 2:40am, 3:00am, and 3:20am"
# postcode_insert_cloudwatch_event_cron_expression = "cron(0 2,2:20,2:40,3,3:20 * * ? *)"
# postcode_insert_cloudwatch_event_target = "lambda"
# postcode_insert_cloudwatch_event_statement = "AllowExecutionFromCloudWatch"
# postcode_insert_cloudwatch_event_action = "lambda:InvokeFunction"
# postcode_insert_cloudwatch_event_princinple = "events.amazonaws.com"

postcode_extract_cloudwatch_event_name = "${var.service_prefix}-postcode-extract-rule"
postcode_extract_cloudwatch_event_description = "Daily timer to extract postcode and postcode data out of DoS an into the s3 bucket at 1am"
Expand Down
34 changes: 17 additions & 17 deletions infrastructure/stacks/postcode_etl/postcode_insert.tf
Original file line number Diff line number Diff line change
Expand Up @@ -100,25 +100,25 @@ resource "aws_iam_role_policy_attachment" "dynamoDbFullAccessInsert" {
policy_arn = local.dynamoDb_full_access_policy_arn
}

resource "aws_cloudwatch_event_rule" "postcode_insert_cloudwatch_event" {
name = local.postcode_insert_cloudwatch_event_name
description = local.postcode_insert_cloudwatch_event_description
schedule_expression = local.postcode_insert_cloudwatch_event_cron_expression
}
# resource "aws_cloudwatch_event_rule" "postcode_insert_cloudwatch_event" {
# name = local.postcode_insert_cloudwatch_event_name
# description = local.postcode_insert_cloudwatch_event_description
# schedule_expression = local.postcode_insert_cloudwatch_event_cron_expression
# }

resource "aws_cloudwatch_event_target" "daily_postcode_insert_job" {
rule = aws_cloudwatch_event_rule.postcode_insert_cloudwatch_event.name
target_id = local.postcode_insert_cloudwatch_event_target
arn = aws_lambda_function.postcode_insert_lambda.arn
}
# resource "aws_cloudwatch_event_target" "daily_postcode_insert_job" {
# rule = aws_cloudwatch_event_rule.postcode_insert_cloudwatch_event.name
# target_id = local.postcode_insert_cloudwatch_event_target
# arn = aws_lambda_function.postcode_insert_lambda.arn
# }

resource "aws_lambda_permission" "allow_cloudwatch_to_call_insert_postcode" {
statement_id = local.postcode_insert_cloudwatch_event_statement
action = local.postcode_insert_cloudwatch_event_action
function_name = aws_lambda_function.postcode_insert_lambda.function_name
principal = local.postcode_insert_cloudwatch_event_princinple
source_arn = aws_cloudwatch_event_rule.postcode_insert_cloudwatch_event.arn
}
# resource "aws_lambda_permission" "allow_cloudwatch_to_call_insert_postcode" {
# statement_id = local.postcode_insert_cloudwatch_event_statement
# action = local.postcode_insert_cloudwatch_event_action
# function_name = aws_lambda_function.postcode_insert_lambda.function_name
# principal = local.postcode_insert_cloudwatch_event_princinple
# source_arn = aws_cloudwatch_event_rule.postcode_insert_cloudwatch_event.arn
# }

resource "aws_cloudwatch_log_group" "postcode_insert_log_group" {
name = "/aws/lambda/${aws_lambda_function.postcode_insert_lambda.function_name}"
Expand Down
18 changes: 18 additions & 0 deletions infrastructure/stacks/postcode_etl/s3.tf
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@ resource "aws_s3_bucket" "postcode_etl_s3" {
tags = local.standard_tags
}

resource "aws_lambda_permission" "s3_trigger_permission" {
statement_id = "AllowExecutionFromS3"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.postcode_insert_lambda.function_name
principal = "s3.amazonaws.com"

source_arn = aws_s3_bucket.postcode_etl_s3.arn
}

resource "aws_s3_bucket_notification" "postcode_etl_s3_bucket_notification" {
bucket = aws_s3_bucket.postcode_etl_s3.id

lambda_function {
lambda_function_arn = aws_lambda_function.postcode_insert_lambda.arn
events = ["s3:ObjectCreated:*"]
filter_prefix = "postcode_locations/"
}
}

resource "aws_s3_bucket_public_access_block" "postcode_etl_s3_block_public_access" {
bucket = aws_s3_bucket.postcode_etl_s3.id
Expand Down

0 comments on commit 6c84330

Please sign in to comment.