Skip to content

Commit

Permalink
PRMDR-518 FIFO queue change (#162)
Browse files Browse the repository at this point in the history
* add group id to fit FIFO queue changes
  • Loading branch information
NogaNHS authored Nov 27, 2023
1 parent 5dcd7aa commit a0a03aa
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 53 deletions.
17 changes: 12 additions & 5 deletions lambdas/handlers/bulk_upload_metadata_handler.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import csv
import os
import tempfile
import uuid
from typing import Iterable

import pydantic
from botocore.exceptions import ClientError
from models.staging_metadata import (METADATA_FILENAME, NHS_NUMBER_FIELD_NAME,
MetadataFile, StagingMetadata)
from models.staging_metadata import (
METADATA_FILENAME,
NHS_NUMBER_FIELD_NAME,
MetadataFile,
StagingMetadata,
)
from services.s3_service import S3Service
from services.sqs_service import SQSService
from utils.audit_logging_setup import LoggingService
Expand Down Expand Up @@ -34,7 +39,7 @@ def lambda_handler(_event, _context):
staging_metadata_list = csv_to_staging_metadata(metadata_file)

logger.info("Finished parsing metadata")
send_metadata_to_sqs(staging_metadata_list, metadata_queue_url)
send_metadata_to_fifo_sqs(staging_metadata_list, metadata_queue_url)

logger.info("Sent bulk upload metadata to sqs queue")
except pydantic.ValidationError as e:
Expand Down Expand Up @@ -85,17 +90,19 @@ def csv_to_staging_metadata(csv_file_path: str) -> list[StagingMetadata]:
]


def send_metadata_to_sqs(
def send_metadata_to_fifo_sqs(
staging_metadata_list: list[StagingMetadata], metadata_queue_url: str
) -> None:
sqs_service = SQSService()
sqs_group_id = f"bulk_upload_{uuid.uuid4()}"

for staging_metadata in staging_metadata_list:
nhs_number = staging_metadata.nhs_number
logger.info(f"Sending metadata for patientId: {nhs_number}")

sqs_service.send_message_with_nhs_number_attr(
sqs_service.send_message_with_nhs_number_attr_fifo(
queue_url=metadata_queue_url,
message_body=staging_metadata.model_dump_json(by_alias=True),
nhs_number=nhs_number,
group_id=sqs_group_id,
)
19 changes: 12 additions & 7 deletions lambdas/services/bulk_upload_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import uuid

import pydantic
from botocore.exceptions import ClientError
Expand All @@ -11,12 +12,15 @@
from services.s3_service import S3Service
from services.sqs_service import SQSService
from utils.audit_logging_setup import LoggingService
from utils.exceptions import (DocumentInfectedException,
InvalidMessageException, S3FileNotFoundException,
TagNotFoundException, VirusScanFailedException,
VirusScanNoResultException)
from utils.lloyd_george_validator import (LGInvalidFilesException,
validate_lg_file_names)
from utils.exceptions import (
DocumentInfectedException,
InvalidMessageException,
S3FileNotFoundException,
TagNotFoundException,
VirusScanFailedException,
VirusScanNoResultException,
)
from utils.lloyd_george_validator import LGInvalidFilesException, validate_lg_file_names
from utils.utilities import create_reference_id

logger = LoggingService(__name__)
Expand Down Expand Up @@ -187,11 +191,12 @@ def check_virus_result(self, staging_metadata: StagingMetadata):
)

def put_message_back_to_queue(self, staging_metadata: StagingMetadata):
self.sqs_service.send_message_with_nhs_number_attr(
self.sqs_service.send_message_with_nhs_number_attr_fifo(
queue_url=self.metadata_queue_url,
message_body=staging_metadata.model_dump_json(by_alias=True),
nhs_number=staging_metadata.nhs_number,
delay_seconds=60 * 5,
group_id=f"back_to_queue_bulk_upload_{uuid.uuid4()}",
)

def init_transaction(self):
Expand Down
5 changes: 3 additions & 2 deletions lambdas/services/sensitive_audit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ def emit(self, record):
if self.splunk_sqs_queue:
if self.sqs_client is None:
self.sqs_client = SQSService()
self.sqs_client.send_message(
queue_url=self.splunk_sqs_queue, message_body=self.format(record)
self.sqs_client.send_message_standard(
queue_url=self.splunk_sqs_queue,
message_body=self.format(record),
)
11 changes: 5 additions & 6 deletions lambdas/services/sqs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ def __init__(self, *args, **kwargs):
self.client = boto3.client("sqs", config=config)
super().__init__(*args, **kwargs)

def send_message(self, queue_url: str, message_body: str):
self.client.send_message(
QueueUrl=queue_url,
MessageBody=message_body,
)
def send_message_standard(self, queue_url: str, message_body: str):
self.client.send_message(QueueUrl=queue_url, MessageBody=message_body)

def send_message_with_attr(
self, queue_url: str, message_body: str, attributes: dict
Expand All @@ -21,12 +18,13 @@ def send_message_with_attr(
QueueUrl=queue_url, MessageBody=message_body, MessageAttributes=attributes
)

def send_message_with_nhs_number_attr(
def send_message_with_nhs_number_attr_fifo(
self,
queue_url: str,
message_body: str,
nhs_number: str,
delay_seconds: int = 0,
group_id=str,
):
self.client.send_message(
QueueUrl=queue_url,
Expand All @@ -35,4 +33,5 @@ def send_message_with_nhs_number_attr(
},
MessageBody=message_body,
DelaySeconds=delay_seconds,
MessageGroupId=group_id,
)
51 changes: 32 additions & 19 deletions lambdas/tests/unit/handlers/test_bulk_upload_metadata_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@

import pytest
from botocore.exceptions import ClientError
from handlers.bulk_upload_metadata_handler import (csv_to_staging_metadata,
download_metadata_from_s3,
lambda_handler,
send_metadata_to_sqs)
from handlers.bulk_upload_metadata_handler import (
csv_to_staging_metadata,
download_metadata_from_s3,
lambda_handler,
send_metadata_to_fifo_sqs,
)
from models.staging_metadata import METADATA_FILENAME
from pydantic import ValidationError
from tests.unit.conftest import (MOCK_LG_METADATA_SQS_QUEUE,
MOCK_LG_STAGING_STORE_BUCKET)
from tests.unit.conftest import MOCK_LG_METADATA_SQS_QUEUE, MOCK_LG_STAGING_STORE_BUCKET
from tests.unit.helpers.data.bulk_upload.test_data import (
EXPECTED_PARSED_METADATA, EXPECTED_SQS_MSG_FOR_PATIENT_1234567890,
EXPECTED_SQS_MSG_FOR_PATIENT_1234567891)
EXPECTED_PARSED_METADATA,
EXPECTED_SQS_MSG_FOR_PATIENT_1234567890,
EXPECTED_SQS_MSG_FOR_PATIENT_1234567891,
)

MOCK_METADATA_CSV = "tests/unit/helpers/data/bulk_upload/metadata.csv"
MOCK_INVALID_METADATA_CSV_FILES = [
Expand All @@ -31,24 +34,29 @@ def test_lambda_send_metadata_to_sqs_queue(
"handlers.bulk_upload_metadata_handler.download_metadata_from_s3",
return_value=MOCK_METADATA_CSV,
)
mocker.patch("uuid.uuid4", return_value="123412342")

lambda_handler(event, context)

assert mock_sqs_service.send_message_with_nhs_number_attr.call_count == 2
assert mock_sqs_service.send_message_with_nhs_number_attr_fifo.call_count == 2

expected_calls = [
call(
group_id="bulk_upload_123412342",
queue_url=MOCK_LG_METADATA_SQS_QUEUE,
message_body=EXPECTED_SQS_MSG_FOR_PATIENT_1234567890,
nhs_number="1234567890",
),
call(
group_id="bulk_upload_123412342",
queue_url=MOCK_LG_METADATA_SQS_QUEUE,
message_body=EXPECTED_SQS_MSG_FOR_PATIENT_1234567891,
nhs_number="1234567891",
),
]
mock_sqs_service.send_message_with_nhs_number_attr.assert_has_calls(expected_calls)
mock_sqs_service.send_message_with_nhs_number_attr_fifo.assert_has_calls(
expected_calls
)


def test_handler_log_error_when_fail_to_get_metadata_csv_from_s3(
Expand All @@ -65,7 +73,7 @@ def test_handler_log_error_when_fail_to_get_metadata_csv_from_s3(
assert caplog.records[-1].msg == expected_err_msg
assert caplog.records[-1].levelname == "ERROR"

mock_sqs_service.send_message_with_nhs_number_attr.assert_not_called()
mock_sqs_service.send_message_with_nhs_number_attr_fifo.assert_not_called()


def test_handler_log_error_when_metadata_csv_is_invalid(
Expand All @@ -82,13 +90,13 @@ def test_handler_log_error_when_metadata_csv_is_invalid(
assert "validation error" in caplog.records[-1].msg
assert caplog.records[-1].levelname == "ERROR"

mock_sqs_service.send_message_with_nhs_number_attr.assert_not_called()
mock_sqs_service.send_message_with_nhs_number_attr_fifo.assert_not_called()


def test_handler_log_error_when_failed_to_send_message_to_sqs(
set_env, mock_s3_service, mock_sqs_service, mock_tempfile, caplog, event, context
):
mock_sqs_service.send_message_with_nhs_number_attr.side_effect = ClientError(
mock_sqs_service.send_message_with_nhs_number_attr_fifo.side_effect = ClientError(
{
"Error": {
"Code": "AWS.SimpleQueueService.NonExistentQueue",
Expand Down Expand Up @@ -150,29 +158,34 @@ def test_csv_to_staging_metadata_raise_error_when_metadata_invalid():
csv_to_staging_metadata(invalid_csv_file)


def test_send_metadata_to_sqs(mock_sqs_service):
def test_send_metadata_to_sqs(mocker, mock_sqs_service):
mock_parsed_metadata = EXPECTED_PARSED_METADATA
send_metadata_to_sqs(mock_parsed_metadata, MOCK_LG_METADATA_SQS_QUEUE)
mocker.patch("uuid.uuid4", return_value="123412342")

assert mock_sqs_service.send_message_with_nhs_number_attr.call_count == 2
send_metadata_to_fifo_sqs(mock_parsed_metadata, MOCK_LG_METADATA_SQS_QUEUE)

expected_calls = [
call(
queue_url=MOCK_LG_METADATA_SQS_QUEUE,
message_body=EXPECTED_SQS_MSG_FOR_PATIENT_1234567890,
nhs_number="1234567890",
group_id="bulk_upload_123412342",
),
call(
queue_url=MOCK_LG_METADATA_SQS_QUEUE,
message_body=EXPECTED_SQS_MSG_FOR_PATIENT_1234567891,
nhs_number="1234567891",
group_id="bulk_upload_123412342",
),
]
mock_sqs_service.send_message_with_nhs_number_attr.assert_has_calls(expected_calls)
mock_sqs_service.send_message_with_nhs_number_attr_fifo.assert_has_calls(
expected_calls
)
assert mock_sqs_service.send_message_with_nhs_number_attr_fifo.call_count == 2


def test_send_metadata_to_sqs_raise_error_when_fail_to_send_message(mock_sqs_service):
mock_sqs_service.send_message_with_nhs_number_attr.side_effect = ClientError(
mock_sqs_service.send_message_with_nhs_number_attr_fifo.side_effect = ClientError(
{
"Error": {
"Code": "AWS.SimpleQueueService.NonExistentQueue",
Expand All @@ -183,7 +196,7 @@ def test_send_metadata_to_sqs_raise_error_when_fail_to_send_message(mock_sqs_ser
)

with pytest.raises(ClientError):
send_metadata_to_sqs(EXPECTED_PARSED_METADATA, MOCK_LG_METADATA_SQS_QUEUE)
send_metadata_to_fifo_sqs(EXPECTED_PARSED_METADATA, MOCK_LG_METADATA_SQS_QUEUE)


@pytest.fixture
Expand Down
41 changes: 28 additions & 13 deletions lambdas/tests/unit/services/test_bulk_upload_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,32 @@
from enums.virus_scan_result import VirusScanResult
from freezegun import freeze_time
from services.bulk_upload_service import BulkUploadService
from tests.unit.conftest import (MOCK_BULK_REPORT_TABLE_NAME, MOCK_LG_BUCKET,
MOCK_LG_METADATA_SQS_QUEUE,
MOCK_LG_STAGING_STORE_BUCKET,
MOCK_LG_TABLE_NAME, TEST_OBJECT_KEY)
from tests.unit.conftest import (
MOCK_BULK_REPORT_TABLE_NAME,
MOCK_LG_BUCKET,
MOCK_LG_METADATA_SQS_QUEUE,
MOCK_LG_STAGING_STORE_BUCKET,
MOCK_LG_TABLE_NAME,
TEST_OBJECT_KEY,
)
from tests.unit.helpers.data.bulk_upload.test_data import (
TEST_DOCUMENT_REFERENCE, TEST_DOCUMENT_REFERENCE_LIST, TEST_FILE_METADATA,
TEST_NHS_NUMBER_FOR_BULK_UPLOAD, TEST_SQS_MESSAGE,
TEST_SQS_MESSAGE_WITH_INVALID_FILENAME, TEST_STAGING_METADATA,
TEST_STAGING_METADATA_WITH_INVALID_FILENAME)
from utils.exceptions import (DocumentInfectedException,
InvalidMessageException, S3FileNotFoundException,
TagNotFoundException, VirusScanFailedException,
VirusScanNoResultException)
TEST_DOCUMENT_REFERENCE,
TEST_DOCUMENT_REFERENCE_LIST,
TEST_FILE_METADATA,
TEST_NHS_NUMBER_FOR_BULK_UPLOAD,
TEST_SQS_MESSAGE,
TEST_SQS_MESSAGE_WITH_INVALID_FILENAME,
TEST_STAGING_METADATA,
TEST_STAGING_METADATA_WITH_INVALID_FILENAME,
)
from utils.exceptions import (
DocumentInfectedException,
InvalidMessageException,
S3FileNotFoundException,
TagNotFoundException,
VirusScanFailedException,
VirusScanNoResultException,
)
from utils.lloyd_george_validator import LGInvalidFilesException


Expand Down Expand Up @@ -304,10 +317,12 @@ def test_check_virus_result_raise_VirusScanFailedException_for_special_cases(
def test_put_message_back_to_queue(set_env, mocker):
service = BulkUploadService()
service.sqs_service = mocker.MagicMock()
mocker.patch("uuid.uuid4", return_value="123412342")

service.put_message_back_to_queue(TEST_STAGING_METADATA)

service.sqs_service.send_message_with_nhs_number_attr.assert_called_with(
service.sqs_service.send_message_with_nhs_number_attr_fifo.assert_called_with(
group_id="back_to_queue_bulk_upload_123412342",
queue_url=MOCK_LG_METADATA_SQS_QUEUE,
message_body=TEST_STAGING_METADATA.model_dump_json(by_alias=True),
nhs_number=TEST_STAGING_METADATA.nhs_number,
Expand Down
4 changes: 3 additions & 1 deletion lambdas/tests/unit/services/test_sqs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def return_mock(service_name, **_kwargs):
{"NHS-NO": "1234567890", "files": ["file1.pdf", "file2.pdf"]}
)

service.send_message_with_nhs_number_attr(
service.send_message_with_nhs_number_attr_fifo(
group_id="test_group_id",
queue_url=MOCK_LG_METADATA_SQS_QUEUE,
message_body=test_message_body,
nhs_number=TEST_NHS_NUMBER,
Expand All @@ -32,4 +33,5 @@ def return_mock(service_name, **_kwargs):
},
MessageBody=test_message_body,
DelaySeconds=0,
MessageGroupId="test_group_id",
)

0 comments on commit a0a03aa

Please sign in to comment.