From 168301b75a37347de8a15f8873eeec05c2692910 Mon Sep 17 00:00:00 2001 From: jack-nhs Date: Tue, 10 Dec 2024 13:47:17 +0000 Subject: [PATCH] [PRMP-1122] Create NRL pointer during bulk upload (#463) --------- Co-authored-by: NogaNHS Co-authored-by: Jack Sutton --- .../handlers/manage_nrl_pointer_handler.py | 2 +- lambdas/models/nrl_fhir_document_reference.py | 2 +- lambdas/models/nrl_sqs_message.py | 9 +-- .../bulk_upload/bulk_upload_sqs_repository.py | 11 ++++ lambdas/services/base/s3_service.py | 4 ++ lambdas/services/bulk_upload_service.py | 30 +++++++++- lambdas/services/nrl_api_service.py | 2 +- lambdas/tests/unit/conftest.py | 3 + .../helpers/data/bulk_upload/test_data.py | 21 ++++++- .../test_bulk_upload_sqs_repository.py | 21 ++++++- .../unit/services/base/test_s3_service.py | 29 ++++++++++ .../unit/services/test_bulk_upload_service.py | 57 +++++++++++++++++++ 12 files changed, 179 insertions(+), 12 deletions(-) diff --git a/lambdas/handlers/manage_nrl_pointer_handler.py b/lambdas/handlers/manage_nrl_pointer_handler.py index 615359bb1..cc0c24e27 100644 --- a/lambdas/handlers/manage_nrl_pointer_handler.py +++ b/lambdas/handlers/manage_nrl_pointer_handler.py @@ -45,7 +45,7 @@ def lambda_handler(event, context): f"Processing SQS message for nhs number: {nrl_message.nhs_number}" ) nrl_verified_message = nrl_message.model_dump( - by_alias=True, exclude_none=True, exclude_defaults=True + by_alias=True, exclude_none=True ) match nrl_message.action: case NrlActionTypes.CREATE: diff --git a/lambdas/models/nrl_fhir_document_reference.py b/lambdas/models/nrl_fhir_document_reference.py index 4b47a1974..ed1f67901 100644 --- a/lambdas/models/nrl_fhir_document_reference.py +++ b/lambdas/models/nrl_fhir_document_reference.py @@ -13,7 +13,7 @@ class FhirDocumentReference(BaseModel): snomed_code_doc_type: str = "None" snomed_code_category: str = "None" snomed_code_category_display: str = "Care plan" - attachment: Optional[NrlAttachment] = {} + attachment: Optional[NrlAttachment] = NrlAttachment() def build_fhir_dict(self): snomed_url = "http://snomed.info/sct" diff --git a/lambdas/models/nrl_sqs_message.py b/lambdas/models/nrl_sqs_message.py index 8f08cadc6..c6c82c22d 100644 --- a/lambdas/models/nrl_sqs_message.py +++ b/lambdas/models/nrl_sqs_message.py @@ -1,12 +1,13 @@ from typing import Optional +from enums.snomed_codes import SnomedCodesCategory, SnomedCodesType from pydantic import AliasGenerator, BaseModel, ConfigDict from pydantic.alias_generators import to_camel class NrlAttachment(BaseModel): - content_type: str = "" - language: str = "en-US" + content_type: str = "application/pdf" + language: str = "en-UK" url: str = "" size: int = 0 hash: str = "" @@ -20,8 +21,8 @@ class NrlSqsMessage(BaseModel): ) nhs_number: str - snomed_code_doc_type: str - snomed_code_category: str + snomed_code_doc_type: str = SnomedCodesType.LLOYD_GEORGE + snomed_code_category: str = SnomedCodesCategory.CARE_PLAN description: str = "" attachment: Optional[NrlAttachment] = None action: str diff --git a/lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py b/lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py index 9ba6034e0..35f2d3ae3 100644 --- a/lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py +++ b/lambdas/repositories/bulk_upload/bulk_upload_sqs_repository.py @@ -1,6 +1,7 @@ import os import uuid +from models.nrl_sqs_message import NrlSqsMessage from models.staging_metadata import StagingMetadata from services.base.sqs_service import SQSService from utils.audit_logging_setup import LoggingService @@ -38,4 +39,14 @@ def put_sqs_message_back_to_queue(self, sqs_message: dict): queue_url=self.metadata_queue_url, message_body=sqs_message["body"], nhs_number=nhs_number, + group_id=f"back_to_queue_bulk_upload_{uuid.uuid4()}", + ) + + def send_message_to_nrl_fifo( + self, queue_url: str, message: NrlSqsMessage, group_id: str + ): + self.sqs_repository.send_message_fifo( + queue_url=queue_url, + message_body=message.model_dump_json(), + group_id=group_id, ) diff --git a/lambdas/services/base/s3_service.py b/lambdas/services/base/s3_service.py index fdcf2c2e7..e28cd512b 100644 --- a/lambdas/services/base/s3_service.py +++ b/lambdas/services/base/s3_service.py @@ -139,3 +139,7 @@ def list_all_objects(self, bucket_name: str) -> list[dict]: for paginated_result in s3_paginator.paginate(Bucket=bucket_name): s3_list_objects_result += paginated_result.get("Contents", []) return s3_list_objects_result + + def get_file_size(self, s3_bucket_name: str, object_key: str) -> int: + response = self.client.head_object(Bucket=s3_bucket_name, Key=object_key) + return response.get("ContentLength", 0) diff --git a/lambdas/services/bulk_upload_service.py b/lambdas/services/bulk_upload_service.py index 16395ab58..72cd35499 100644 --- a/lambdas/services/bulk_upload_service.py +++ b/lambdas/services/bulk_upload_service.py @@ -4,10 +4,12 @@ import pydantic from botocore.exceptions import ClientError +from enums.nrl_sqs_upload import NrlActionTypes from enums.patient_ods_inactive_status import PatientOdsInactiveStatus from enums.upload_status import UploadStatus from enums.virus_scan_result import VirusScanResult from models.nhs_document_reference import NHSDocumentReference +from models.nrl_sqs_message import NrlAttachment, NrlSqsMessage from models.staging_metadata import MetadataFile, StagingMetadata from repositories.bulk_upload.bulk_upload_dynamo_repository import ( BulkUploadDynamoRepository, @@ -52,6 +54,7 @@ def __init__(self): self.pdf_content_type = "application/pdf" self.unhandled_messages = [] self.file_path_cache = {} + self.nrl_queue_url = os.environ["NRL_SQS_URL"] def process_message_queue(self, records: list): for index, message in enumerate(records, start=1): @@ -231,7 +234,9 @@ def handle_sqs_message(self, message: dict): ) try: - self.create_lg_records_and_copy_files(staging_metadata, patient_ods_code) + last_document_processed = self.create_lg_records_and_copy_files( + staging_metadata, patient_ods_code + ) logger.info( f"Successfully uploaded the Lloyd George records for patient: {staging_metadata.nhs_number}", {"Result": "Successful upload"}, @@ -268,6 +273,25 @@ def handle_sqs_message(self, message: dict): accepted_reason, patient_ods_code, ) + if len(file_names) == 1: + document_api_endpoint = ( + os.environ.get("APIM_API_URL", "") + + "/DocumentReference/" + + last_document_processed.id + ) + doc_details = NrlAttachment( + url=document_api_endpoint, + ) + nrl_sqs_message = NrlSqsMessage( + nhs_number=staging_metadata.nhs_number, + action=NrlActionTypes.CREATE, + attachment=doc_details, + ) + self.sqs_repository.send_message_to_nrl_fifo( + queue_url=self.nrl_queue_url, + message=nrl_sqs_message, + group_id=f"nrl_sqs_{uuid.uuid4()}", + ) def resolve_source_file_path(self, staging_metadata: StagingMetadata): sample_file_path = staging_metadata.files[0].file_path @@ -313,7 +337,7 @@ def create_lg_records_and_copy_files( self, staging_metadata: StagingMetadata, current_gp_ods: str ): nhs_number = staging_metadata.nhs_number - + document_reference = None for file_metadata in staging_metadata.files: document_reference = self.convert_to_document_reference( file_metadata, nhs_number, current_gp_ods @@ -327,6 +351,8 @@ def create_lg_records_and_copy_files( ) document_reference.set_uploaded_to_true() self.dynamo_repository.create_record_in_lg_dynamo_table(document_reference) + # returning last document ref until stitching as default is implemented + return document_reference def rollback_transaction(self): try: diff --git a/lambdas/services/nrl_api_service.py b/lambdas/services/nrl_api_service.py index 9676645e1..9e33d01bd 100644 --- a/lambdas/services/nrl_api_service.py +++ b/lambdas/services/nrl_api_service.py @@ -47,7 +47,7 @@ def create_new_pointer(self, body, retry_on_expired: bool = True): response.raise_for_status() logger.info("Successfully created new pointer") except HTTPError as e: - logger.error(e.response) + logger.error(e.response.content) if e.response.status_code == 401 and retry_on_expired: self.headers["Authorization"] = ( f"Bearer {self.auth_service.get_active_access_token()}" diff --git a/lambdas/tests/unit/conftest.py b/lambdas/tests/unit/conftest.py index 43951c6c7..0ab97fc6b 100644 --- a/lambdas/tests/unit/conftest.py +++ b/lambdas/tests/unit/conftest.py @@ -27,6 +27,7 @@ MOCK_ZIP_OUTPUT_BUCKET_ENV_NAME = "ZIPPED_STORE_BUCKET_NAME" MOCK_ZIP_TRACE_TABLE_ENV_NAME = "ZIPPED_STORE_DYNAMODB_NAME" +MOCK_METADATA_NRL_SQS_URL_ENV_NAME = "NRL_SQS_URL" MOCK_LG_STAGING_STORE_BUCKET_ENV_NAME = "STAGING_STORE_BUCKET_NAME" MOCK_LG_METADATA_SQS_QUEUE_ENV_NAME = "METADATA_SQS_QUEUE_URL" @@ -76,6 +77,7 @@ TEST_CURRENT_GP_ODS = "Y12345" AUTH_STATE_TABLE_NAME = "test_state_table" +NRL_SQS_URL = "https://test-queue.com" AUTH_SESSION_TABLE_NAME = "test_session_table" FAKE_URL = "https://fake-url.com" OIDC_CALLBACK_URL = FAKE_URL @@ -125,6 +127,7 @@ def set_env(monkeypatch): monkeypatch.setenv(MOCK_LG_METADATA_SQS_QUEUE_ENV_NAME, MOCK_LG_METADATA_SQS_QUEUE) monkeypatch.setenv(MOCK_LG_INVALID_SQS_QUEUE_ENV_NAME, MOCK_LG_INVALID_SQS_QUEUE) monkeypatch.setenv(MOCK_AUTH_STATE_TABLE_NAME_ENV_NAME, AUTH_STATE_TABLE_NAME) + monkeypatch.setenv(MOCK_METADATA_NRL_SQS_URL_ENV_NAME, NRL_SQS_URL) monkeypatch.setenv(MOCK_AUTH_SESSION_TABLE_NAME_ENV_NAME, AUTH_SESSION_TABLE_NAME) monkeypatch.setenv(MOCK_OIDC_CALLBACK_URL_ENV_NAME, OIDC_CALLBACK_URL) monkeypatch.setenv(MOCK_OIDC_CLIENT_ID_ENV_NAME, OIDC_CLIENT_ID) diff --git a/lambdas/tests/unit/helpers/data/bulk_upload/test_data.py b/lambdas/tests/unit/helpers/data/bulk_upload/test_data.py index 0e23b31a4..5bae4b002 100644 --- a/lambdas/tests/unit/helpers/data/bulk_upload/test_data.py +++ b/lambdas/tests/unit/helpers/data/bulk_upload/test_data.py @@ -3,9 +3,12 @@ from enums.virus_scan_result import VirusScanResult from freezegun import freeze_time from models.nhs_document_reference import NHSDocumentReference +from models.nrl_sqs_message import NrlSqsMessage from models.staging_metadata import MetadataFile, StagingMetadata from tests.unit.conftest import MOCK_LG_BUCKET, TEST_CURRENT_GP_ODS, TEST_UUID +from lambdas.enums.nrl_sqs_upload import NrlActionTypes + sample_metadata_model = MetadataFile( file_path="/1234567890/1of2_Lloyd_George_Record_[Joe Bloggs]_[1234567890]_[25-12-2019].pdf", page_count="", @@ -143,6 +146,15 @@ def build_test_sqs_message_from_nhs_number(nhs_number: str) -> dict: return build_test_sqs_message(staging_metadata) +def build_test_nrl_sqs_fifo_message(nhs_number: str, action: str) -> NrlSqsMessage: + message_body = { + "nhs_number": nhs_number, + "action": action, + } + nrl_sqs_message = NrlSqsMessage(**message_body) + return nrl_sqs_message + + @freeze_time("2024-01-01 12:00:00") def build_test_document_reference(file_name: str, nhs_number: str = "9000000009"): doc_ref = NHSDocumentReference( @@ -160,8 +172,15 @@ def build_test_document_reference(file_name: str, nhs_number: str = "9000000009" TEST_NHS_NUMBER_FOR_BULK_UPLOAD = "9000000009" TEST_STAGING_METADATA = build_test_staging_metadata(make_valid_lg_file_names(3)) TEST_SQS_MESSAGE = build_test_sqs_message(TEST_STAGING_METADATA) +TEST_STAGING_METADATA_SINGLE_FILE = build_test_staging_metadata( + make_valid_lg_file_names(1) +) +TEST_SQS_MESSAGE_SINGLE_FILE = build_test_sqs_message(TEST_STAGING_METADATA_SINGLE_FILE) TEST_FILE_METADATA = TEST_STAGING_METADATA.files[0] - +TEST_GROUP_ID = "123" +TEST_NRL_SQS_MESSAGE = build_test_nrl_sqs_fifo_message( + TEST_NHS_NUMBER_FOR_BULK_UPLOAD, NrlActionTypes.CREATE +) TEST_STAGING_METADATA_WITH_INVALID_FILENAME = build_test_staging_metadata( [*make_valid_lg_file_names(2), "invalid_file_name.txt"] ) diff --git a/lambdas/tests/unit/repositories/bulk_upload/test_bulk_upload_sqs_repository.py b/lambdas/tests/unit/repositories/bulk_upload/test_bulk_upload_sqs_repository.py index a2ec3afe4..5b5748018 100644 --- a/lambdas/tests/unit/repositories/bulk_upload/test_bulk_upload_sqs_repository.py +++ b/lambdas/tests/unit/repositories/bulk_upload/test_bulk_upload_sqs_repository.py @@ -2,9 +2,11 @@ import pytest from repositories.bulk_upload.bulk_upload_sqs_repository import BulkUploadSqsRepository -from tests.unit.conftest import MOCK_LG_METADATA_SQS_QUEUE +from tests.unit.conftest import MOCK_LG_METADATA_SQS_QUEUE, NRL_SQS_URL from tests.unit.helpers.data.bulk_upload.test_data import ( + TEST_GROUP_ID, TEST_NHS_NUMBER_FOR_BULK_UPLOAD, + TEST_NRL_SQS_MESSAGE, TEST_SQS_MESSAGE, TEST_STAGING_METADATA, ) @@ -34,11 +36,26 @@ def test_put_staging_metadata_back_to_queue_and_increases_retries( ) -def test_put_sqs_message_back_to_queue(set_env, repo_under_test): +def test_put_sqs_message_back_to_queue(set_env, repo_under_test, mock_uuid): repo_under_test.put_sqs_message_back_to_queue(TEST_SQS_MESSAGE) repo_under_test.sqs_repository.send_message_with_nhs_number_attr_fifo.assert_called_with( queue_url=MOCK_LG_METADATA_SQS_QUEUE, message_body=TEST_SQS_MESSAGE["body"], nhs_number=TEST_NHS_NUMBER_FOR_BULK_UPLOAD, + group_id=f"back_to_queue_bulk_upload_{mock_uuid}", + ) + + +def test_send_message_to_nrl_sqs_fifo(set_env, repo_under_test): + repo_under_test.send_message_to_nrl_fifo( + NRL_SQS_URL, + TEST_NRL_SQS_MESSAGE, + TEST_GROUP_ID, + ) + message_body = TEST_NRL_SQS_MESSAGE + repo_under_test.sqs_repository.send_message_fifo.assert_called_with( + queue_url=NRL_SQS_URL, + message_body=message_body.model_dump_json(), + group_id="123", ) diff --git a/lambdas/tests/unit/services/base/test_s3_service.py b/lambdas/tests/unit/services/base/test_s3_service.py index 621ce2e7a..2485329dd 100755 --- a/lambdas/tests/unit/services/base/test_s3_service.py +++ b/lambdas/tests/unit/services/base/test_s3_service.py @@ -345,3 +345,32 @@ def test_list_all_objects_raises_client_error_if_unexpected_response( with pytest.raises(ClientError): mock_service.list_all_objects(MOCK_BUCKET) + + +def test_file_size_return_int(mock_service, mock_client): + mock_response = { + "ResponseMetadata": { + "RequestId": "mock_req", + "HostId": "", + "HTTPStatusCode": 200, + "HTTPHeaders": {}, + "RetryAttempts": 0, + }, + "ContentLength": "3191", + "ETag": '"eb2996dae99afd8308e4c97bdb6a4178"', + "ContentType": "application/pdf", + "Metadata": {}, + } + + mock_client.head_object.return_value = mock_response + + expected = "3191" + actual = mock_service.get_file_size( + s3_bucket_name=MOCK_BUCKET, object_key=TEST_FILE_NAME + ) + assert actual == expected + + mock_client.head_object.assert_called_once_with( + Bucket=MOCK_BUCKET, + Key=TEST_FILE_NAME, + ) diff --git a/lambdas/tests/unit/services/test_bulk_upload_service.py b/lambdas/tests/unit/services/test_bulk_upload_service.py index c0569579c..b4be3f6ab 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_service.py +++ b/lambdas/tests/unit/services/test_bulk_upload_service.py @@ -3,10 +3,12 @@ import pytest from botocore.exceptions import ClientError +from enums.nrl_sqs_upload import NrlActionTypes from enums.patient_ods_inactive_status import PatientOdsInactiveStatus from enums.upload_status import UploadStatus from enums.virus_scan_result import SCAN_RESULT_TAG_KEY, VirusScanResult from freezegun import freeze_time +from models.nrl_sqs_message import NrlAttachment, NrlSqsMessage from models.pds_models import Patient from repositories.bulk_upload.bulk_upload_s3_repository import BulkUploadS3Repository from repositories.bulk_upload.bulk_upload_sqs_repository import BulkUploadSqsRepository @@ -14,6 +16,7 @@ from tests.unit.conftest import ( MOCK_LG_BUCKET, MOCK_STAGING_STORE_BUCKET, + NRL_SQS_URL, TEST_CURRENT_GP_ODS, ) from tests.unit.helpers.data.bulk_upload.test_data import ( @@ -21,9 +24,11 @@ TEST_FILE_METADATA, TEST_SQS_10_MESSAGES_AS_LIST, TEST_SQS_MESSAGE, + TEST_SQS_MESSAGE_SINGLE_FILE, TEST_SQS_MESSAGE_WITH_INVALID_FILENAME, TEST_SQS_MESSAGES_AS_LIST, TEST_STAGING_METADATA, + TEST_STAGING_METADATA_SINGLE_FILE, TEST_STAGING_METADATA_WITH_INVALID_FILENAME, build_test_sqs_message, build_test_staging_metadata_from_patient_name, @@ -201,6 +206,7 @@ def test_handle_sqs_message_happy_path( mock_ods_validation, ): TEST_STAGING_METADATA.retries = 0 + mock_create_lg_records_and_copy_files = mocker.patch.object( BulkUploadService, "create_lg_records_and_copy_files" ) @@ -218,6 +224,52 @@ def test_handle_sqs_message_happy_path( ) mock_report_upload_complete.assert_called() mock_remove_ingested_file_from_source_bucket.assert_called() + repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called() + + +def test_handle_sqs_message_happy_path_single_file( + set_env, + mocker, + mock_uuid, + repo_under_test, + mock_validate_files, + mock_pds_service, + mock_pds_validation, + mock_ods_validation, +): + TEST_STAGING_METADATA.retries = 0 + mock_nrl_attachment = NrlAttachment( + url=f"/DocumentReference/{TEST_DOCUMENT_REFERENCE.id}", + ) + mock_nrl_message = NrlSqsMessage( + nhs_number=TEST_STAGING_METADATA.nhs_number, + action=NrlActionTypes.CREATE, + attachment=mock_nrl_attachment, + ) + mock_create_lg_records_and_copy_files = mocker.patch.object( + BulkUploadService, "create_lg_records_and_copy_files" + ) + mock_create_lg_records_and_copy_files.return_value = TEST_DOCUMENT_REFERENCE + mock_report_upload_complete = mocker.patch.object( + repo_under_test.dynamo_repository, "write_report_upload_to_dynamo" + ) + mock_remove_ingested_file_from_source_bucket = mocker.patch.object( + repo_under_test.s3_repository, "remove_ingested_file_from_source_bucket" + ) + mocker.patch.object(repo_under_test.s3_repository, "check_virus_result") + + repo_under_test.handle_sqs_message(message=TEST_SQS_MESSAGE_SINGLE_FILE) + + mock_create_lg_records_and_copy_files.assert_called_with( + TEST_STAGING_METADATA_SINGLE_FILE, TEST_CURRENT_GP_ODS + ) + mock_report_upload_complete.assert_called() + mock_remove_ingested_file_from_source_bucket.assert_called() + repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_called_with( + queue_url=NRL_SQS_URL, + message=mock_nrl_message, + group_id=f"nrl_sqs_{mock_uuid}", + ) def set_up_mocks_for_non_ascii_files( @@ -330,6 +382,7 @@ def test_handle_sqs_message_calls_report_upload_failure_when_patient_record_alre mock_report_upload_failure.assert_called_with( TEST_STAGING_METADATA, UploadStatus.FAILED, str(mocked_error), "" ) + repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called() def test_handle_sqs_message_calls_report_upload_failure_when_lg_file_name_invalid( @@ -366,6 +419,7 @@ def test_handle_sqs_message_calls_report_upload_failure_when_lg_file_name_invali str(mocked_error), "", ) + repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called() def test_handle_sqs_message_report_failure_when_document_is_infected( @@ -403,6 +457,7 @@ def test_handle_sqs_message_report_failure_when_document_is_infected( ) mock_create_lg_records_and_copy_files.assert_not_called() mock_remove_ingested_file_from_source_bucket.assert_not_called() + repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called() def test_handle_sqs_message_report_failure_when_document_not_exist( @@ -432,6 +487,7 @@ def test_handle_sqs_message_report_failure_when_document_not_exist( "One or more of the files is not accessible from staging bucket", "Y12345", ) + repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called() def test_handle_sqs_message_calls_report_upload_successful_when_patient_is_formally_deceased( @@ -617,6 +673,7 @@ def test_handle_sqs_message_put_staging_metadata_back_to_queue_when_virus_scan_r mock_report_upload_failure.assert_not_called() mock_create_lg_records_and_copy_files.assert_not_called() mock_remove_ingested_file_from_source_bucket.assert_not_called() + repo_under_test.sqs_repository.send_message_to_nrl_fifo.assert_not_called() def test_handle_sqs_message_rollback_transaction_when_validation_pass_but_file_transfer_failed_halfway(