Skip to content

Commit

Permalink
PRMP-198_199 - Lambdas: DataCollectionLambda and StatisticalReportLam…
Browse files Browse the repository at this point in the history
…bda (#376)

* [PRMDR-855] tidy up spike lambda for packaging

* [PRMDR-855] minor fix

* [PRMDR-855] add method to deserialise statistic data from dynamodb response

* [PRMDR-855] Add cloudwatch logs query for LG document stored

* [PRMDR-855] minor fix

* store current progress of reporting lambda

* minor fix

* bugfix: take account of s3 versioning when calculating total size

* [PRMDR-855] Change the way to calculate record file size (only count active record in dynamodb)

* [PRMDR-855] start replacing aggregate function with polars

* [PRMDR-855] use polars for data processing in stat report lambda

* [PRMDR-855] code refactoring, add metric `Average size of documents per patient`

* [PRMDR-855] handle empty string ods_code, which may cause client error at runtime

* fix unit test

* [PRMDR-855] Add ARF table to data collection service

* [PRMDR-919] Start adding unit tests to data collection service

* [PRMDR-919] adding more unit tests

* [PRMDR-919] add more unit tests

* [PRMDR-919] add more unit tests

* [PRMDR-919] add more unit tests

* [PRMDR-919] refactor test data structure, add more unit tests

* fix Makefile command

* fix wrong import

* [PRMDR-919] add more unit tests

* [PRMDR-919] Add more unit tests

* [PRMDR-919] Add unit test for data_collection_handler

* rename method: simple_query --> query_all_fields

* [PRMDR-919] Add more unit tests

* [PRMDR-919] Add unit test for scan_whole_table method

* [PRMDR-919] start adding unit test for cloudwatch logs query service

* add pytest plugin

* [PRMDR-919] add unit test for cloudwatch logs query service

* move test data to correct directory

* [PRMDR-919] improve logging

* minor fix

* [PRMDR-919] add unit tests for statistic report service

* [PRMDR-919] Adding unit tests for statistic report lambda

* tidy up mock data

* [PRMDR-919] Add unit test for statistical report service

* fix import path

* [PRMDR-920] Edit github action to deploy new lambdas

* [PRMDR-920] Fix github action

* [PRMDR-919] minor patch in lambda

* minor fix

* fix sonarcloud issues

* fix sonarcloud issues

* fix sonarcloud issues

* fix sonarcloud issues

* fix sonarcloud issues

* [PRMDR-919] improve coverage, refactor

* amend incorrect comment

* [PRMDR-919] Add error handling for the case if no data exist for the whole week

* fix sonarcloud checking coverage for python test files

* add error message

* improve test coverage

* improve test coverage

* remove unused decorator, minor change for clarity

* move cloudwatch_logs_query class to utils folder

* fix an incorrect method name (write_to_local_dynamodb_table --> write_to_dynamodb_table)

* replace hardcoded field names with enum doc refs, rename method get_average_number_of_file_per_patient -> get_average_number_of_files_per_patient

* rename class CloudwatchLogsQueryService --> CloudwatchService
  • Loading branch information
joefong-nhs authored Jun 12, 2024
1 parent 0f0c4b2 commit 500a792
Show file tree
Hide file tree
Showing 41 changed files with 2,980 additions and 25 deletions.
30 changes: 29 additions & 1 deletion .github/workflows/base-lambdas-reusable-deploy-all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -316,4 +316,32 @@ jobs:
lambda_aws_name: UpdateUploadStateLambda
lambda_layer_names: "core_lambda_layer"
secrets:
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}

deploy_data_collection_lambda:
name: Deploy data collection lambda
uses: ./.github/workflows/base-lambdas-reusable-deploy.yml
with:
environment: ${{ inputs.environment}}
python_version: ${{ inputs.python_version }}
build_branch: ${{ inputs.build_branch}}
sandbox: ${{ inputs.sandbox }}
lambda_handler_name: data_collection_handler
lambda_aws_name: DataCollectionLambda
lambda_layer_names: "core_lambda_layer,data_lambda_layer"
secrets:
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}

deploy_statistical_report_lambda:
name: Deploy statistical report lambda
uses: ./.github/workflows/base-lambdas-reusable-deploy.yml
with:
environment: ${{ inputs.environment}}
python_version: ${{ inputs.python_version }}
build_branch: ${{ inputs.build_branch}}
sandbox: ${{ inputs.sandbox }}
lambda_handler_name: statistical_report_handler
lambda_aws_name: StatisticalReportLambda
lambda_layer_names: "core_lambda_layer,data_lambda_layer"
secrets:
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,7 @@ node_modules/
lambdas/tests/unit/helpers/data/pdf/tmp
/lambdas/package_/

batch_update_progress.json
batch_update_progress.json

# jupyter notebook files
*.ipynb
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ test-unit-coverage:
cd ./lambdas && ./venv/bin/python3 -m pytest --cov=. --cov-report xml:coverage.xml

test-unit-coverage-html:
cd ./lambdas
coverage run --source=. --omit=tests/* -m pytest -v tests && coverage report && coverage html
cd ./lambdas && coverage run --source=. --omit="tests/*" -m pytest -v tests && coverage report && coverage html

test-unit-collect:
cd ./lambdas && ./venv/bin/python3 -m pytest tests/ --collect-only
Expand Down
7 changes: 7 additions & 0 deletions lambdas/enums/supported_document_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,10 @@ def get_dynamodb_table_name(self) -> str:
SupportedDocumentTypes.LG: os.getenv("LLOYD_GEORGE_DYNAMODB_NAME"),
}
return document_type_to_table_name[self]

def get_s3_bucket_name(self) -> str:
lookup_dict = {
SupportedDocumentTypes.ARF: os.getenv("DOCUMENT_STORE_BUCKET_NAME"),
SupportedDocumentTypes.LG: os.getenv("LLOYD_GEORGE_BUCKET_NAME"),
}
return lookup_dict[self]
23 changes: 23 additions & 0 deletions lambdas/handlers/data_collection_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from services.data_collection_service import DataCollectionService
from utils.audit_logging_setup import LoggingService
from utils.decorators.ensure_env_var import ensure_environment_variables_for_non_webapi
from utils.decorators.override_error_check import override_error_check

logger = LoggingService(__name__)


@ensure_environment_variables_for_non_webapi(
names=[
"LLOYD_GEORGE_DYNAMODB_NAME",
"LLOYD_GEORGE_BUCKET_NAME",
"DOCUMENT_STORE_DYNAMODB_NAME",
"DOCUMENT_STORE_BUCKET_NAME",
"WORKSPACE",
"STATISTICS_TABLE",
]
)
@override_error_check
def lambda_handler(_event, _context):
logger.info("Starting data collection process")
service = DataCollectionService()
service.collect_all_data_and_write_to_dynamodb()
20 changes: 20 additions & 0 deletions lambdas/handlers/statistical_report_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from services.statistical_report_service import StatisticalReportService
from utils.audit_logging_setup import LoggingService
from utils.decorators.ensure_env_var import ensure_environment_variables_for_non_webapi
from utils.decorators.override_error_check import override_error_check

logger = LoggingService(__name__)


@ensure_environment_variables_for_non_webapi(
names=[
"WORKSPACE",
"STATISTICS_TABLE",
"STATISTICAL_REPORTS_BUCKET",
]
)
@override_error_check
def lambda_handler(_event, _context):
logger.info("Starting creating statistical report")
service = StatisticalReportService()
service.make_weekly_summary_and_output_to_bucket()
88 changes: 88 additions & 0 deletions lambdas/models/statistics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import uuid
from decimal import Decimal
from typing import NamedTuple

from models.config import to_capitalized_camel
from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator


class StatisticData(BaseModel):
model_config = ConfigDict(
alias_generator=to_capitalized_camel, populate_by_name=True
)
statistic_id: str = Field(
default_factory=lambda: str(uuid.uuid4()), alias="StatisticID"
)
date: str
ods_code: str

@field_serializer("statistic_id")
def serialise_id(self, statistic_id) -> str:
return f"{self.__class__.__name__}#{statistic_id}"

# noinspection PyNestedDecorators
@field_validator("statistic_id")
@classmethod
def deserialize_id(cls, raw_statistic_id: str) -> str:
if "#" in raw_statistic_id:
record_type, uuid_part = raw_statistic_id.split("#")
class_name = cls.__name__
assert (
record_type == class_name
), f"StatisticID must be in the form of `{class_name}#uuid`"
else:
uuid_part = raw_statistic_id

return uuid_part

# noinspection PyNestedDecorators
@field_validator("ods_code")
@classmethod
def fill_empty_ods_code(cls, ods_code: str) -> str:
if not ods_code:
return "NO_ODS_CODE"
return ods_code


class RecordStoreData(StatisticData):
total_number_of_records: int = 0
number_of_document_types: int = 0
total_size_of_records_in_megabytes: Decimal = Decimal(0)
average_size_of_documents_per_patient_in_megabytes: Decimal = Decimal(0)


class OrganisationData(StatisticData):
number_of_patients: int = 0
average_records_per_patient: Decimal = Decimal(0)
daily_count_stored: int = 0
daily_count_viewed: int = 0
daily_count_downloaded: int = 0
daily_count_deleted: int = 0


class ApplicationData(StatisticData):
active_user_ids_hashed: list[str] = []


class LoadedStatisticData(NamedTuple):
record_store_data: list[RecordStoreData]
organisation_data: list[OrganisationData]
application_data: list[ApplicationData]


def load_from_dynamodb_items(dynamodb_items: list[dict]) -> LoadedStatisticData:
output = LoadedStatisticData([], [], [])

for item in dynamodb_items:
data_type = item["StatisticID"].split("#")[0]
match data_type:
case "RecordStoreData":
output.record_store_data.append(RecordStoreData.model_validate(item))
case "OrganisationData":
output.organisation_data.append(OrganisationData.model_validate(item))
case "ApplicationData":
output.application_data.append(ApplicationData.model_validate(item))
case _:
raise ValueError(f"unknown type of statistic data: {data_type}")

return output
1 change: 1 addition & 0 deletions lambdas/requirements/requirements_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ isort==5.13.0
pip-audit==2.6.1
pytest-cov==4.1.0
pytest-mock==3.11.1
pytest-unordered==0.6.0
pytest==7.4.3
requests_mock==1.11.0
ruff==0.0.284
2 changes: 1 addition & 1 deletion lambdas/services/authoriser_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def find_login_session(self, ndr_session_id):
)
session_table_name = os.environ["AUTH_SESSION_TABLE_NAME"]
db_service = DynamoDBService()
query_response = db_service.simple_query(
query_response = db_service.query_all_fields(
table_name=session_table_name,
key_condition_expression=Key("NDRSessionId").eq(ndr_session_id),
)
Expand Down
59 changes: 59 additions & 0 deletions lambdas/services/base/cloudwatch_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import os
import time

import boto3
from utils.audit_logging_setup import LoggingService
from utils.cloudwatch_logs_query import CloudwatchLogsQueryParams
from utils.exceptions import LogsQueryException

logger = LoggingService(__name__)


class CloudwatchService:
def __init__(self):
self.logs_client = boto3.client("logs")
self.workspace = os.environ["WORKSPACE"]
self.initialised = True

def query_logs(
self, query_params: CloudwatchLogsQueryParams, start_time: int, end_time: int
) -> list[dict]:
response = self.logs_client.start_query(
logGroupName=f"/aws/lambda/{self.workspace}_{query_params.lambda_name}",
startTime=start_time,
endTime=end_time,
queryString=query_params.query_string,
)
query_id = response["queryId"]

raw_query_result = self.poll_query_result(query_id)
query_result = self.regroup_raw_query_result(raw_query_result)
return query_result

def poll_query_result(self, query_id: str, max_retries=20) -> list[list]:
for _ in range(max_retries):
response = self.logs_client.get_query_results(queryId=query_id)
if response["status"] == "Complete":
return response["results"]
elif response["status"] in ["Failed", "Cancelled", "Timeout"]:
self.log_and_raise_error(
f"Logs query failed with status: {response['status']}"
)
time.sleep(1)

self.log_and_raise_error(
f"Failed to get query result within max retries of {max_retries} times"
)

@staticmethod
def regroup_raw_query_result(raw_query_result: list[list[dict]]) -> list[dict]:
query_result = [
{column["field"]: column["value"] for column in row}
for row in raw_query_result
]
return query_result

@staticmethod
def log_and_raise_error(error_message: str) -> None:
logger.error(error_message)
raise LogsQueryException(error_message)
35 changes: 33 additions & 2 deletions lambdas/services/base/dynamo_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Optional

import boto3
from boto3.dynamodb.conditions import Attr, ConditionBase, Key
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -76,7 +78,7 @@ def query_with_requested_fields(
logger.error(str(e), {"Result": f"Unable to query table: {table_name}"})
raise e

def simple_query(self, table_name: str, key_condition_expression):
def query_all_fields(self, table_name: str, key_condition_expression):
"""
Allow querying dynamodb table without explicitly defining the fields to retrieve.
:param table_name: Dynamodb table name
Expand All @@ -85,7 +87,7 @@ def simple_query(self, table_name: str, key_condition_expression):
example usage:
from boto3.dynamodb.conditions import Key
query_response = db_service.simple_query(
query_response = db_service.query_all_fields(
table_name=session_table_name,
key_condition_expression=Key("NDRSessionId").eq(ndr_session_id)
)
Expand Down Expand Up @@ -161,6 +163,35 @@ def scan_table(
logger.error(str(e), {"Result": f"Unable to scan table: {table_name}"})
raise e

def scan_whole_table(
self,
table_name: str,
project_expression: Optional[str] = None,
filter_expression: Optional[str] = None,
) -> list[dict]:
try:
table = self.get_table(table_name)
scan_arguments = {}
if project_expression:
scan_arguments["ProjectionExpression"] = project_expression
if filter_expression:
scan_arguments["FilterExpression"] = filter_expression

paginated_result = table.scan(**scan_arguments)
dynamodb_scan_result = paginated_result.get("Items", [])
while "LastEvaluatedKey" in paginated_result:
start_key_for_next_page = paginated_result["LastEvaluatedKey"]
paginated_result = table.scan(
**scan_arguments,
ExclusiveStartKey=start_key_for_next_page,
)
dynamodb_scan_result += paginated_result["Items"]
return dynamodb_scan_result

except ClientError as e:
logger.error(str(e), {"Result": f"Unable to scan table: {table_name}"})
raise e

def batch_writing(self, table_name: str, item_list: list[dict]):
try:
table = self.get_table(table_name)
Expand Down
7 changes: 7 additions & 0 deletions lambdas/services/base/s3_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,10 @@ def file_exist_on_s3(self, s3_bucket_name: str, file_key: str) -> bool:
return False
logger.error(str(e), {"Result": "Failed to check if file exists on s3"})
raise e

def list_all_objects(self, bucket_name: str) -> list[dict]:
s3_paginator = self.client.get_paginator("list_objects_v2")
s3_list_objects_result = []
for paginated_result in s3_paginator.paginate(Bucket=bucket_name):
s3_list_objects_result += paginated_result.get("Contents", [])
return s3_list_objects_result
Loading

0 comments on commit 500a792

Please sign in to comment.