Skip to content

Commit

Permalink
Merge branch 'main' into prmdr-506
Browse files Browse the repository at this point in the history
  • Loading branch information
RioKnightleyNHS committed Nov 28, 2023
2 parents 3cc4665 + 11fe2f5 commit 4649b60
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 74 deletions.
14 changes: 13 additions & 1 deletion lambdas/handlers/bulk_upload_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from utils.audit_logging_setup import LoggingService
from utils.decorators.override_error_check import override_error_check
from utils.decorators.set_audit_arg import set_request_context_for_logging
from utils.exceptions import InvalidMessageException
from utils.exceptions import (InvalidMessageException,
PdsTooManyRequestsException)
from utils.lloyd_george_validator import LGInvalidFilesException

logger = LoggingService(__name__)
Expand All @@ -23,6 +24,17 @@ def lambda_handler(event, _context):
try:
logger.info(f"Processing message {index} of {len(event['Records'])}")
bulk_upload_service.handle_sqs_message(message)
except PdsTooManyRequestsException as error:
logger.error(error)
logger.info("Cannot process for now due to PDS rate limit reached.")
logger.info(
"All remaining messages in this batch will be returned to sqs queue to retry later."
)

all_unprocessed_message = event["Records"][index - 1 :]
for unprocessed_message in all_unprocessed_message:
bulk_upload_service.put_sqs_message_back_to_queue(unprocessed_message)
return
except (
ClientError,
InvalidMessageException,
Expand Down
8 changes: 2 additions & 6 deletions lambdas/handlers/bulk_upload_metadata_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@

import pydantic
from botocore.exceptions import ClientError
from models.staging_metadata import (
METADATA_FILENAME,
NHS_NUMBER_FIELD_NAME,
MetadataFile,
StagingMetadata,
)
from models.staging_metadata import (METADATA_FILENAME, NHS_NUMBER_FIELD_NAME,
MetadataFile, StagingMetadata)
from services.s3_service import S3Service
from services.sqs_service import SQSService
from utils.audit_logging_setup import LoggingService
Expand Down
4 changes: 1 addition & 3 deletions lambdas/handlers/login_redirect_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

@set_request_context_for_logging
@override_error_check
@ensure_environment_variables(
names=["AUTH_DYNAMODB_NAME"]
)
@ensure_environment_variables(names=["AUTH_DYNAMODB_NAME"])
def lambda_handler(event, context):
request_context.app_interaction = LoggingAppInteraction.LOGIN.value
return prepare_redirect_response(WebApplicationClient)
Expand Down
4 changes: 3 additions & 1 deletion lambdas/handlers/token_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ def lambda_handler(event, context):

try:
if not have_matching_state_value_in_record(state):
logger.info(f"Mismatching state values. Cannot find state {state} in record")
logger.info(
f"Mismatching state values. Cannot find state {state} in record"
)
return ApiGatewayResponse(
400,
"Failed to authenticate user",
Expand Down
46 changes: 35 additions & 11 deletions lambdas/services/bulk_upload_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
from services.s3_service import S3Service
from services.sqs_service import SQSService
from utils.audit_logging_setup import LoggingService
from utils.exceptions import (
DocumentInfectedException,
InvalidMessageException,
S3FileNotFoundException,
TagNotFoundException,
VirusScanFailedException,
VirusScanNoResultException,
)
from utils.lloyd_george_validator import LGInvalidFilesException, validate_lg_file_names
from utils.exceptions import (DocumentInfectedException,
InvalidMessageException,
PdsTooManyRequestsException,
S3FileNotFoundException, TagNotFoundException,
VirusScanFailedException,
VirusScanNoResultException)
from utils.lloyd_george_validator import (LGInvalidFilesException,
validate_lg_file_names)
from utils.request_context import request_context
from utils.utilities import create_reference_id

logger = LoggingService(__name__)
Expand Down Expand Up @@ -59,8 +59,14 @@ def handle_sqs_message(self, message: dict):
raise InvalidMessageException(str(e))

try:
request_context.patient_nhs_no = staging_metadata.nhs_number
logger.info("Running validation for file names...")
self.validate_files(staging_metadata)
except PdsTooManyRequestsException as error:
logger.info(
"Cannot validate patient due to PDS responded with Too Many Requests"
)
raise error
except LGInvalidFilesException as error:
logger.info(
f"Detected invalid file name related to patient number: {staging_metadata.nhs_number}. Will stop "
Expand All @@ -81,7 +87,7 @@ def handle_sqs_message(self, message: dict):
logger.info(
f"Waiting on virus scan results for: {staging_metadata.nhs_number}, adding message back to queue"
)
self.put_message_back_to_queue(staging_metadata)
self.put_staging_metadata_back_to_queue(staging_metadata)
return
except (VirusScanFailedException, DocumentInfectedException) as e:
logger.info(e)
Expand Down Expand Up @@ -190,7 +196,10 @@ def check_virus_result(self, staging_metadata: StagingMetadata):
f"Verified that all documents for patient {staging_metadata.nhs_number} are clean."
)

def put_message_back_to_queue(self, staging_metadata: StagingMetadata):
def put_staging_metadata_back_to_queue(self, staging_metadata: StagingMetadata):
request_context.patient_nhs_no = staging_metadata.nhs_number

logger.info("Returning message to sqs queue...")
self.sqs_service.send_message_with_nhs_number_attr_fifo(
queue_url=self.metadata_queue_url,
message_body=staging_metadata.model_dump_json(by_alias=True),
Expand All @@ -199,6 +208,21 @@ def put_message_back_to_queue(self, staging_metadata: StagingMetadata):
group_id=f"back_to_queue_bulk_upload_{uuid.uuid4()}",
)

def put_sqs_message_back_to_queue(self, sqs_message: dict):
try:
nhs_number = sqs_message["messageAttributes"]["NhsNumber"]["stringValue"]
request_context.patient_nhs_no = nhs_number
except KeyError:
nhs_number = ""

logger.info("Returning message to sqs queue...")
self.sqs_service.send_message_with_nhs_number_attr_fifo(
queue_url=self.metadata_queue_url,
message_body=sqs_message["body"],
nhs_number=nhs_number,
delay_seconds=60 * 5,
)

def init_transaction(self):
self.dynamo_records_in_transaction = []
self.source_bucket_files_in_transaction = []
Expand Down
12 changes: 12 additions & 0 deletions lambdas/services/mock_pds_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import os
import random

from requests import Response
from services.patient_search_service import PatientSearch
Expand All @@ -12,6 +14,10 @@ def __init__(self, *args, **kwargs):
def pds_request(self, nhs_number: str, *args, **kwargs) -> Response:
mock_pds_results: list[dict] = []

if os.getenv("MOCK_PDS_TOO_MANY_REQUESTS_ERROR") == "true":
if random.random() < 0.333:
return self.too_many_requests_response()

try:
with open("services/mock_data/pds_patient_9000000001_X4S4L_pcse.json") as f:
mock_pds_results.append(json.load(f))
Expand Down Expand Up @@ -41,3 +47,9 @@ def pds_request(self, nhs_number: str, *args, **kwargs) -> Response:
response.status_code = 404

return response

def too_many_requests_response(self) -> Response:
response = Response()
response.status_code = 429
response._content = b"Too Many Requests"
return response
1 change: 1 addition & 0 deletions lambdas/services/pds_api_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def pds_request(self, nhs_number: str, retry_on_expired: bool):
pds_response = requests.get(url=url_endpoint, headers=authorization_header)
if pds_response.status_code == 401 and retry_on_expired:
return self.pds_request(nhs_number, retry_on_expired=False)

return pds_response

except ClientError as e:
Expand Down
34 changes: 29 additions & 5 deletions lambdas/tests/unit/handlers/test_bulk_upload_handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import pytest
from handlers.bulk_upload_handler import lambda_handler
from tests.unit.helpers.data.bulk_upload.test_data import \
TEST_EVENT_WITH_SQS_MESSAGES
from utils.exceptions import InvalidMessageException
from tests.unit.helpers.data.bulk_upload.test_data import (
TEST_EVENT_WITH_10_SQS_MESSAGES, TEST_EVENT_WITH_SQS_MESSAGES)
from utils.exceptions import (InvalidMessageException,
PdsTooManyRequestsException)


@pytest.fixture
Expand All @@ -12,7 +13,9 @@ def mocked_service(mocker):
yield mocked_service


def test_lambda_process_each_sqs_message_one_by_one(set_env, mocked_service, context):
def test_lambda_handler_process_each_sqs_message_one_by_one(
set_env, mocked_service, context
):
lambda_handler(TEST_EVENT_WITH_SQS_MESSAGES, context)

assert mocked_service.handle_sqs_message.call_count == len(
Expand All @@ -22,7 +25,7 @@ def test_lambda_process_each_sqs_message_one_by_one(set_env, mocked_service, con
mocked_service.handle_sqs_message.assert_any_call(message)


def test_lambda_continue_process_next_message_after_handled_error(
def test_lambda_handler_continue_process_next_message_after_handled_error(
set_env, mocked_service, context
):
# emulate that unexpected error happen at 2nd message
Expand All @@ -37,3 +40,24 @@ def test_lambda_continue_process_next_message_after_handled_error(
mocked_service.handle_sqs_message.assert_called_with(
TEST_EVENT_WITH_SQS_MESSAGES["Records"][2]
)


def test_lambda_handler_handle_pds_too_many_requests_exception(
set_env, mocked_service, context
):
# emulate that unexpected error happen at 7th message
mocked_service.handle_sqs_message.side_effect = (
[None] * 6 + [PdsTooManyRequestsException] + [None] * 3
)
expected_handled_messages = TEST_EVENT_WITH_10_SQS_MESSAGES["Records"][0:6]
expected_unhandled_message = TEST_EVENT_WITH_10_SQS_MESSAGES["Records"][6:]

lambda_handler(TEST_EVENT_WITH_10_SQS_MESSAGES, context)

assert mocked_service.handle_sqs_message.call_count == 7

for message in expected_handled_messages:
mocked_service.handle_sqs_message.assert_any_call(message)

for message in expected_unhandled_message:
mocked_service.put_sqs_message_back_to_queue.assert_any_call(message)
19 changes: 8 additions & 11 deletions lambdas/tests/unit/handlers/test_bulk_upload_metadata_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,17 @@

import pytest
from botocore.exceptions import ClientError
from handlers.bulk_upload_metadata_handler import (
csv_to_staging_metadata,
download_metadata_from_s3,
lambda_handler,
send_metadata_to_fifo_sqs,
)
from handlers.bulk_upload_metadata_handler import (csv_to_staging_metadata,
download_metadata_from_s3,
lambda_handler,
send_metadata_to_fifo_sqs)
from models.staging_metadata import METADATA_FILENAME
from pydantic import ValidationError
from tests.unit.conftest import MOCK_LG_METADATA_SQS_QUEUE, MOCK_LG_STAGING_STORE_BUCKET
from tests.unit.conftest import (MOCK_LG_METADATA_SQS_QUEUE,
MOCK_LG_STAGING_STORE_BUCKET)
from tests.unit.helpers.data.bulk_upload.test_data import (
EXPECTED_PARSED_METADATA,
EXPECTED_SQS_MSG_FOR_PATIENT_1234567890,
EXPECTED_SQS_MSG_FOR_PATIENT_1234567891,
)
EXPECTED_PARSED_METADATA, EXPECTED_SQS_MSG_FOR_PATIENT_1234567890,
EXPECTED_SQS_MSG_FOR_PATIENT_1234567891)

MOCK_METADATA_CSV = "tests/unit/helpers/data/bulk_upload/metadata.csv"
MOCK_INVALID_METADATA_CSV_FILES = [
Expand Down
15 changes: 15 additions & 0 deletions lambdas/tests/unit/helpers/data/bulk_upload/test_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ def build_test_sqs_message(staging_metadata: StagingMetadata):
}


def build_test_sqs_message_from_nhs_number(nhs_number: str) -> dict:
file_names = make_valid_lg_file_names(total_number=3, nhs_number=nhs_number)
staging_metadata = build_test_staging_metadata(
file_names=file_names, nhs_number=nhs_number
)
return build_test_sqs_message(staging_metadata)


def build_test_document_reference(file_name: str, nhs_number: str = "9000000009"):
doc_ref = NHSDocumentReference(
nhs_number=nhs_number,
Expand Down Expand Up @@ -132,3 +140,10 @@ def build_test_document_reference(file_name: str, nhs_number: str = "9000000009"
TEST_SQS_MESSAGE,
]
}

TEST_EVENT_WITH_10_SQS_MESSAGES = {
"Records": [
build_test_sqs_message_from_nhs_number(str(nhs_number))
for nhs_number in range(9_000_000_000, 9_000_000_010)
]
}
Loading

0 comments on commit 4649b60

Please sign in to comment.