diff --git a/lambdas/handlers/bulk_upload_handler.py b/lambdas/handlers/bulk_upload_handler.py index ac5287f9a..5fb14521c 100644 --- a/lambdas/handlers/bulk_upload_handler.py +++ b/lambdas/handlers/bulk_upload_handler.py @@ -3,7 +3,8 @@ from utils.audit_logging_setup import LoggingService from utils.decorators.override_error_check import override_error_check from utils.decorators.set_audit_arg import set_request_context_for_logging -from utils.exceptions import InvalidMessageException +from utils.exceptions import (InvalidMessageException, + PdsTooManyRequestsException) from utils.lloyd_george_validator import LGInvalidFilesException logger = LoggingService(__name__) @@ -23,6 +24,17 @@ def lambda_handler(event, _context): try: logger.info(f"Processing message {index} of {len(event['Records'])}") bulk_upload_service.handle_sqs_message(message) + except PdsTooManyRequestsException as error: + logger.error(error) + logger.info("Cannot process for now due to PDS rate limit reached.") + logger.info( + "All remaining messages in this batch will be returned to sqs queue to retry later." + ) + + all_unprocessed_message = event["Records"][index - 1 :] + for unprocessed_message in all_unprocessed_message: + bulk_upload_service.put_sqs_message_back_to_queue(unprocessed_message) + return except ( ClientError, InvalidMessageException, diff --git a/lambdas/handlers/bulk_upload_metadata_handler.py b/lambdas/handlers/bulk_upload_metadata_handler.py index 5d543738f..6aba98720 100644 --- a/lambdas/handlers/bulk_upload_metadata_handler.py +++ b/lambdas/handlers/bulk_upload_metadata_handler.py @@ -6,12 +6,8 @@ import pydantic from botocore.exceptions import ClientError -from models.staging_metadata import ( - METADATA_FILENAME, - NHS_NUMBER_FIELD_NAME, - MetadataFile, - StagingMetadata, -) +from models.staging_metadata import (METADATA_FILENAME, NHS_NUMBER_FIELD_NAME, + MetadataFile, StagingMetadata) from services.s3_service import S3Service from services.sqs_service import SQSService from utils.audit_logging_setup import LoggingService diff --git a/lambdas/handlers/login_redirect_handler.py b/lambdas/handlers/login_redirect_handler.py index bba7c93dd..a3fb455bc 100644 --- a/lambdas/handlers/login_redirect_handler.py +++ b/lambdas/handlers/login_redirect_handler.py @@ -18,9 +18,7 @@ @set_request_context_for_logging @override_error_check -@ensure_environment_variables( - names=["AUTH_DYNAMODB_NAME"] -) +@ensure_environment_variables(names=["AUTH_DYNAMODB_NAME"]) def lambda_handler(event, context): request_context.app_interaction = LoggingAppInteraction.LOGIN.value return prepare_redirect_response(WebApplicationClient) diff --git a/lambdas/handlers/token_handler.py b/lambdas/handlers/token_handler.py index 24de30f6a..0cbeebf9b 100644 --- a/lambdas/handlers/token_handler.py +++ b/lambdas/handlers/token_handler.py @@ -49,7 +49,9 @@ def lambda_handler(event, context): try: if not have_matching_state_value_in_record(state): - logger.info(f"Mismatching state values. Cannot find state {state} in record") + logger.info( + f"Mismatching state values. Cannot find state {state} in record" + ) return ApiGatewayResponse( 400, "Failed to authenticate user", diff --git a/lambdas/services/bulk_upload_service.py b/lambdas/services/bulk_upload_service.py index 6834f26ee..f39c9a354 100644 --- a/lambdas/services/bulk_upload_service.py +++ b/lambdas/services/bulk_upload_service.py @@ -12,15 +12,15 @@ from services.s3_service import S3Service from services.sqs_service import SQSService from utils.audit_logging_setup import LoggingService -from utils.exceptions import ( - DocumentInfectedException, - InvalidMessageException, - S3FileNotFoundException, - TagNotFoundException, - VirusScanFailedException, - VirusScanNoResultException, -) -from utils.lloyd_george_validator import LGInvalidFilesException, validate_lg_file_names +from utils.exceptions import (DocumentInfectedException, + InvalidMessageException, + PdsTooManyRequestsException, + S3FileNotFoundException, TagNotFoundException, + VirusScanFailedException, + VirusScanNoResultException) +from utils.lloyd_george_validator import (LGInvalidFilesException, + validate_lg_file_names) +from utils.request_context import request_context from utils.utilities import create_reference_id logger = LoggingService(__name__) @@ -59,8 +59,14 @@ def handle_sqs_message(self, message: dict): raise InvalidMessageException(str(e)) try: + request_context.patient_nhs_no = staging_metadata.nhs_number logger.info("Running validation for file names...") self.validate_files(staging_metadata) + except PdsTooManyRequestsException as error: + logger.info( + "Cannot validate patient due to PDS responded with Too Many Requests" + ) + raise error except LGInvalidFilesException as error: logger.info( f"Detected invalid file name related to patient number: {staging_metadata.nhs_number}. Will stop " @@ -81,7 +87,7 @@ def handle_sqs_message(self, message: dict): logger.info( f"Waiting on virus scan results for: {staging_metadata.nhs_number}, adding message back to queue" ) - self.put_message_back_to_queue(staging_metadata) + self.put_staging_metadata_back_to_queue(staging_metadata) return except (VirusScanFailedException, DocumentInfectedException) as e: logger.info(e) @@ -190,7 +196,10 @@ def check_virus_result(self, staging_metadata: StagingMetadata): f"Verified that all documents for patient {staging_metadata.nhs_number} are clean." ) - def put_message_back_to_queue(self, staging_metadata: StagingMetadata): + def put_staging_metadata_back_to_queue(self, staging_metadata: StagingMetadata): + request_context.patient_nhs_no = staging_metadata.nhs_number + + logger.info("Returning message to sqs queue...") self.sqs_service.send_message_with_nhs_number_attr_fifo( queue_url=self.metadata_queue_url, message_body=staging_metadata.model_dump_json(by_alias=True), @@ -199,6 +208,21 @@ def put_message_back_to_queue(self, staging_metadata: StagingMetadata): group_id=f"back_to_queue_bulk_upload_{uuid.uuid4()}", ) + def put_sqs_message_back_to_queue(self, sqs_message: dict): + try: + nhs_number = sqs_message["messageAttributes"]["NhsNumber"]["stringValue"] + request_context.patient_nhs_no = nhs_number + except KeyError: + nhs_number = "" + + logger.info("Returning message to sqs queue...") + self.sqs_service.send_message_with_nhs_number_attr_fifo( + queue_url=self.metadata_queue_url, + message_body=sqs_message["body"], + nhs_number=nhs_number, + delay_seconds=60 * 5, + ) + def init_transaction(self): self.dynamo_records_in_transaction = [] self.source_bucket_files_in_transaction = [] diff --git a/lambdas/services/mock_pds_service.py b/lambdas/services/mock_pds_service.py index e195b0160..317838f45 100644 --- a/lambdas/services/mock_pds_service.py +++ b/lambdas/services/mock_pds_service.py @@ -1,4 +1,6 @@ import json +import os +import random from requests import Response from services.patient_search_service import PatientSearch @@ -12,6 +14,10 @@ def __init__(self, *args, **kwargs): def pds_request(self, nhs_number: str, *args, **kwargs) -> Response: mock_pds_results: list[dict] = [] + if os.getenv("MOCK_PDS_TOO_MANY_REQUESTS_ERROR") == "true": + if random.random() < 0.333: + return self.too_many_requests_response() + try: with open("services/mock_data/pds_patient_9000000001_X4S4L_pcse.json") as f: mock_pds_results.append(json.load(f)) @@ -41,3 +47,9 @@ def pds_request(self, nhs_number: str, *args, **kwargs) -> Response: response.status_code = 404 return response + + def too_many_requests_response(self) -> Response: + response = Response() + response.status_code = 429 + response._content = b"Too Many Requests" + return response diff --git a/lambdas/services/pds_api_service.py b/lambdas/services/pds_api_service.py index 9a95a0b73..7a00a9e3c 100644 --- a/lambdas/services/pds_api_service.py +++ b/lambdas/services/pds_api_service.py @@ -43,6 +43,7 @@ def pds_request(self, nhs_number: str, retry_on_expired: bool): pds_response = requests.get(url=url_endpoint, headers=authorization_header) if pds_response.status_code == 401 and retry_on_expired: return self.pds_request(nhs_number, retry_on_expired=False) + return pds_response except ClientError as e: diff --git a/lambdas/tests/unit/handlers/test_bulk_upload_handler.py b/lambdas/tests/unit/handlers/test_bulk_upload_handler.py index cdf67c276..ec295722b 100644 --- a/lambdas/tests/unit/handlers/test_bulk_upload_handler.py +++ b/lambdas/tests/unit/handlers/test_bulk_upload_handler.py @@ -1,8 +1,9 @@ import pytest from handlers.bulk_upload_handler import lambda_handler -from tests.unit.helpers.data.bulk_upload.test_data import \ - TEST_EVENT_WITH_SQS_MESSAGES -from utils.exceptions import InvalidMessageException +from tests.unit.helpers.data.bulk_upload.test_data import ( + TEST_EVENT_WITH_10_SQS_MESSAGES, TEST_EVENT_WITH_SQS_MESSAGES) +from utils.exceptions import (InvalidMessageException, + PdsTooManyRequestsException) @pytest.fixture @@ -12,7 +13,9 @@ def mocked_service(mocker): yield mocked_service -def test_lambda_process_each_sqs_message_one_by_one(set_env, mocked_service, context): +def test_lambda_handler_process_each_sqs_message_one_by_one( + set_env, mocked_service, context +): lambda_handler(TEST_EVENT_WITH_SQS_MESSAGES, context) assert mocked_service.handle_sqs_message.call_count == len( @@ -22,7 +25,7 @@ def test_lambda_process_each_sqs_message_one_by_one(set_env, mocked_service, con mocked_service.handle_sqs_message.assert_any_call(message) -def test_lambda_continue_process_next_message_after_handled_error( +def test_lambda_handler_continue_process_next_message_after_handled_error( set_env, mocked_service, context ): # emulate that unexpected error happen at 2nd message @@ -37,3 +40,24 @@ def test_lambda_continue_process_next_message_after_handled_error( mocked_service.handle_sqs_message.assert_called_with( TEST_EVENT_WITH_SQS_MESSAGES["Records"][2] ) + + +def test_lambda_handler_handle_pds_too_many_requests_exception( + set_env, mocked_service, context +): + # emulate that unexpected error happen at 7th message + mocked_service.handle_sqs_message.side_effect = ( + [None] * 6 + [PdsTooManyRequestsException] + [None] * 3 + ) + expected_handled_messages = TEST_EVENT_WITH_10_SQS_MESSAGES["Records"][0:6] + expected_unhandled_message = TEST_EVENT_WITH_10_SQS_MESSAGES["Records"][6:] + + lambda_handler(TEST_EVENT_WITH_10_SQS_MESSAGES, context) + + assert mocked_service.handle_sqs_message.call_count == 7 + + for message in expected_handled_messages: + mocked_service.handle_sqs_message.assert_any_call(message) + + for message in expected_unhandled_message: + mocked_service.put_sqs_message_back_to_queue.assert_any_call(message) diff --git a/lambdas/tests/unit/handlers/test_bulk_upload_metadata_handler.py b/lambdas/tests/unit/handlers/test_bulk_upload_metadata_handler.py index 9b9956fce..3e5c46b0a 100644 --- a/lambdas/tests/unit/handlers/test_bulk_upload_metadata_handler.py +++ b/lambdas/tests/unit/handlers/test_bulk_upload_metadata_handler.py @@ -3,20 +3,17 @@ import pytest from botocore.exceptions import ClientError -from handlers.bulk_upload_metadata_handler import ( - csv_to_staging_metadata, - download_metadata_from_s3, - lambda_handler, - send_metadata_to_fifo_sqs, -) +from handlers.bulk_upload_metadata_handler import (csv_to_staging_metadata, + download_metadata_from_s3, + lambda_handler, + send_metadata_to_fifo_sqs) from models.staging_metadata import METADATA_FILENAME from pydantic import ValidationError -from tests.unit.conftest import MOCK_LG_METADATA_SQS_QUEUE, MOCK_LG_STAGING_STORE_BUCKET +from tests.unit.conftest import (MOCK_LG_METADATA_SQS_QUEUE, + MOCK_LG_STAGING_STORE_BUCKET) from tests.unit.helpers.data.bulk_upload.test_data import ( - EXPECTED_PARSED_METADATA, - EXPECTED_SQS_MSG_FOR_PATIENT_1234567890, - EXPECTED_SQS_MSG_FOR_PATIENT_1234567891, -) + EXPECTED_PARSED_METADATA, EXPECTED_SQS_MSG_FOR_PATIENT_1234567890, + EXPECTED_SQS_MSG_FOR_PATIENT_1234567891) MOCK_METADATA_CSV = "tests/unit/helpers/data/bulk_upload/metadata.csv" MOCK_INVALID_METADATA_CSV_FILES = [ diff --git a/lambdas/tests/unit/helpers/data/bulk_upload/test_data.py b/lambdas/tests/unit/helpers/data/bulk_upload/test_data.py index 0a60725f1..f74d1a853 100644 --- a/lambdas/tests/unit/helpers/data/bulk_upload/test_data.py +++ b/lambdas/tests/unit/helpers/data/bulk_upload/test_data.py @@ -93,6 +93,14 @@ def build_test_sqs_message(staging_metadata: StagingMetadata): } +def build_test_sqs_message_from_nhs_number(nhs_number: str) -> dict: + file_names = make_valid_lg_file_names(total_number=3, nhs_number=nhs_number) + staging_metadata = build_test_staging_metadata( + file_names=file_names, nhs_number=nhs_number + ) + return build_test_sqs_message(staging_metadata) + + def build_test_document_reference(file_name: str, nhs_number: str = "9000000009"): doc_ref = NHSDocumentReference( nhs_number=nhs_number, @@ -132,3 +140,10 @@ def build_test_document_reference(file_name: str, nhs_number: str = "9000000009" TEST_SQS_MESSAGE, ] } + +TEST_EVENT_WITH_10_SQS_MESSAGES = { + "Records": [ + build_test_sqs_message_from_nhs_number(str(nhs_number)) + for nhs_number in range(9_000_000_000, 9_000_000_010) + ] +} diff --git a/lambdas/tests/unit/services/test_bulk_upload_service.py b/lambdas/tests/unit/services/test_bulk_upload_service.py index d9fa9fa36..006e9f72b 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service.py @@ -5,32 +5,19 @@ from enums.virus_scan_result import VirusScanResult from freezegun import freeze_time from services.bulk_upload_service import BulkUploadService -from tests.unit.conftest import ( - MOCK_BULK_REPORT_TABLE_NAME, - MOCK_LG_BUCKET, - MOCK_LG_METADATA_SQS_QUEUE, - MOCK_LG_STAGING_STORE_BUCKET, - MOCK_LG_TABLE_NAME, - TEST_OBJECT_KEY, -) +from tests.unit.conftest import (MOCK_BULK_REPORT_TABLE_NAME, MOCK_LG_BUCKET, + MOCK_LG_METADATA_SQS_QUEUE, + MOCK_LG_STAGING_STORE_BUCKET, + MOCK_LG_TABLE_NAME, TEST_OBJECT_KEY) from tests.unit.helpers.data.bulk_upload.test_data import ( - TEST_DOCUMENT_REFERENCE, - TEST_DOCUMENT_REFERENCE_LIST, - TEST_FILE_METADATA, - TEST_NHS_NUMBER_FOR_BULK_UPLOAD, - TEST_SQS_MESSAGE, - TEST_SQS_MESSAGE_WITH_INVALID_FILENAME, - TEST_STAGING_METADATA, - TEST_STAGING_METADATA_WITH_INVALID_FILENAME, -) -from utils.exceptions import ( - DocumentInfectedException, - InvalidMessageException, - S3FileNotFoundException, - TagNotFoundException, - VirusScanFailedException, - VirusScanNoResultException, -) + TEST_DOCUMENT_REFERENCE, TEST_DOCUMENT_REFERENCE_LIST, TEST_FILE_METADATA, + TEST_NHS_NUMBER_FOR_BULK_UPLOAD, TEST_SQS_MESSAGE, + TEST_SQS_MESSAGE_WITH_INVALID_FILENAME, TEST_STAGING_METADATA, + TEST_STAGING_METADATA_WITH_INVALID_FILENAME) +from utils.exceptions import (DocumentInfectedException, + InvalidMessageException, S3FileNotFoundException, + TagNotFoundException, VirusScanFailedException, + VirusScanNoResultException) from utils.lloyd_george_validator import LGInvalidFilesException @@ -144,7 +131,7 @@ def test_handle_sqs_message_report_failure_when_document_not_exist( ) -def test_handle_sqs_message_put_message_back_to_queue_when_virus_scan_result_not_available( +def test_handle_sqs_message_put_staging_metadata_back_to_queue_when_virus_scan_result_not_available( set_env, mocker, mock_uuid, mock_validate_files, mock_check_virus_result ): mock_check_virus_result.side_effect = VirusScanNoResultException @@ -157,14 +144,14 @@ def test_handle_sqs_message_put_message_back_to_queue_when_virus_scan_result_not mock_remove_ingested_file_from_source_bucket = mocker.patch.object( BulkUploadService, "remove_ingested_file_from_source_bucket" ) - mock_put_message_back_to_queue = mocker.patch.object( - BulkUploadService, "put_message_back_to_queue" + mock_put_staging_metadata_back_to_queue = mocker.patch.object( + BulkUploadService, "put_staging_metadata_back_to_queue" ) service = BulkUploadService() service.handle_sqs_message(message=TEST_SQS_MESSAGE) - mock_put_message_back_to_queue.assert_called_with(TEST_STAGING_METADATA) + mock_put_staging_metadata_back_to_queue.assert_called_with(TEST_STAGING_METADATA) mock_report_upload_failure.assert_not_called() mock_create_lg_records_and_copy_files.assert_not_called() @@ -314,12 +301,12 @@ def test_check_virus_result_raise_VirusScanFailedException_for_special_cases( service.check_virus_result(TEST_STAGING_METADATA) -def test_put_message_back_to_queue(set_env, mocker): +def test_put_staging_metadata_back_to_queue(set_env, mocker): service = BulkUploadService() service.sqs_service = mocker.MagicMock() mocker.patch("uuid.uuid4", return_value="123412342") - service.put_message_back_to_queue(TEST_STAGING_METADATA) + service.put_staging_metadata_back_to_queue(TEST_STAGING_METADATA) service.sqs_service.send_message_with_nhs_number_attr_fifo.assert_called_with( group_id="back_to_queue_bulk_upload_123412342", @@ -330,6 +317,20 @@ def test_put_message_back_to_queue(set_env, mocker): ) +def test_put_sqs_message_back_to_queue(set_env, mocker): + service = BulkUploadService() + service.sqs_service = mocker.MagicMock() + + service.put_sqs_message_back_to_queue(TEST_SQS_MESSAGE) + + service.sqs_service.send_message_with_nhs_number_attr_fifo.assert_called_with( + queue_url=MOCK_LG_METADATA_SQS_QUEUE, + message_body=TEST_SQS_MESSAGE["body"], + nhs_number=TEST_NHS_NUMBER_FOR_BULK_UPLOAD, + delay_seconds=60 * 5, + ) + + def test_create_lg_records_and_copy_files(set_env, mocker, mock_uuid): service = BulkUploadService() service.s3_service = mocker.MagicMock() diff --git a/lambdas/tests/unit/utils/test_lloyd_george_validator.py b/lambdas/tests/unit/utils/test_lloyd_george_validator.py index 85a0ef3d9..c05e9091e 100644 --- a/lambdas/tests/unit/utils/test_lloyd_george_validator.py +++ b/lambdas/tests/unit/utils/test_lloyd_george_validator.py @@ -4,6 +4,7 @@ from botocore.exceptions import ClientError from requests import Response from tests.unit.helpers.data.pds.pds_patient_response import PDS_PATIENT +from utils.exceptions import PdsTooManyRequestsException from utils.lloyd_george_validator import ( LGInvalidFilesException, check_for_duplicate_files, check_for_file_names_agrees_with_each_other, @@ -318,6 +319,26 @@ def test_raise_client_error_from_ssm_with_pds_service(mocker, mock_pds_call): mock_odc_code.assert_called_once() +def test_validate_with_pds_service_raise_PdsTooManyRequestsException( + mocker, mock_pds_call +): + response = Response() + response.status_code = 429 + response._content = b"Too Many Requests" + lg_file_list = [ + "1of2_Lloyd_George_Record_[Jane Smith]_[9000000009]_[22-10-2010].pdf", + "2of2_Lloyd_George_Record_[Jane Smith]_[9000000009]_[22-10-2010].pdf", + ] + mock_pds_call.return_value = response + + mocker.patch("utils.lloyd_george_validator.get_user_ods_code") + + with pytest.raises(PdsTooManyRequestsException): + validate_with_pds_service(lg_file_list, "9000000009") + + mock_pds_call.assert_called_with(nhs_number="9000000009", retry_on_expired=True) + + @pytest.fixture def mock_pds_call(mocker): yield mocker.patch("services.mock_pds_service.MockPdsApiService.pds_request") diff --git a/lambdas/utils/audit_logging_setup.py b/lambdas/utils/audit_logging_setup.py index 4b887d586..d1d13aaf9 100644 --- a/lambdas/utils/audit_logging_setup.py +++ b/lambdas/utils/audit_logging_setup.py @@ -5,7 +5,6 @@ class LoggingService: - audit_logger = None def __init__(self, name): diff --git a/lambdas/utils/exceptions.py b/lambdas/utils/exceptions.py index ce51b8946..bbda4e751 100644 --- a/lambdas/utils/exceptions.py +++ b/lambdas/utils/exceptions.py @@ -10,6 +10,10 @@ class PdsErrorException(Exception): pass +class PdsTooManyRequestsException(Exception): + pass + + class AuthorisationException(Exception): pass diff --git a/lambdas/utils/lloyd_george_validator.py b/lambdas/utils/lloyd_george_validator.py index d0462349c..6ca43e0ae 100644 --- a/lambdas/utils/lloyd_george_validator.py +++ b/lambdas/utils/lloyd_george_validator.py @@ -11,6 +11,7 @@ from requests import HTTPError from services.ssm_service import SSMService from utils.audit_logging_setup import LoggingService +from utils.exceptions import PdsTooManyRequestsException from utils.utilities import get_pds_service logger = LoggingService(__name__) @@ -123,6 +124,13 @@ def validate_with_pds_service(file_name_list: list[str], nhs_number: str): pds_response = pds_service.pds_request( nhs_number=nhs_number, retry_on_expired=True ) + + if pds_response.status_code == 429: + logger.error("Got 429 Too Many Requests error from PDS.") + raise PdsTooManyRequestsException( + "Failed to validate filename against PDS record due to too many requests" + ) + pds_response.raise_for_status() patient = Patient.model_validate(pds_response.json()) patient_details = patient.get_minimum_patient_details(nhs_number)