-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[PRMP-1188] Create a lambda to handle MNS notifications (#470)
- Loading branch information
1 parent
37d6f3c
commit 90207ac
Showing
9 changed files
with
629 additions
and
61 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,61 +1,61 @@ | ||
name: Subscribe to MNS | ||
|
||
on: | ||
workflow_dispatch: | ||
inputs: | ||
sandbox: | ||
description: Which sandbox would you like to run against? | ||
required: true | ||
type: choice | ||
options: | ||
- ndr-dev | ||
- ndr-test | ||
- pre-prod | ||
- prod | ||
environment: | ||
description: Which environment settings to use? | ||
required: true | ||
type: string | ||
default: development | ||
|
||
permissions: | ||
id-token: write # This is required for requesting the JWT | ||
contents: read # This is required for actions/checkout | ||
|
||
env: | ||
SANDBOX: ${{ inputs.sandbox }} | ||
AWS_REGION: ${{ vars.AWS_REGION }} | ||
URL: ${{ vars.MNS_SUBSCRIPTION_URL }} | ||
|
||
jobs: | ||
Subscribe_to_MNS: | ||
runs-on: ubuntu-latest | ||
environment: ${{ inputs.environment }} | ||
steps: | ||
- name: Configure AWS Credentials | ||
uses: aws-actions/configure-aws-credentials@v4 | ||
with: | ||
role-to-assume: ${{ secrets.AWS_ASSUME_ROLE }} | ||
role-skip-session-tagging: true | ||
aws-region: ${{ vars.AWS_REGION }} | ||
mask-aws-account-id: true | ||
|
||
- name: Checkout | ||
uses: actions/checkout@v4 | ||
|
||
- name: Set up Python | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: '3.11' | ||
cache: 'pip' | ||
|
||
- name: Install dependencies | ||
run: | | ||
pip install boto3 requests pyjwt cryptography | ||
echo "Installed requirements" | ||
- name: Run script | ||
working-directory: ./lambdas | ||
run: | | ||
python3 -m scripts.mns_subscription | ||
echo "Subscription complete" | ||
name: Subscribe to MNS | ||
|
||
on: | ||
workflow_dispatch: | ||
inputs: | ||
sandbox: | ||
description: Which sandbox would you like to run against? | ||
required: true | ||
type: choice | ||
options: | ||
- ndr-dev | ||
- ndr-test | ||
- pre-prod | ||
- prod | ||
environment: | ||
description: Which environment settings to use? | ||
required: true | ||
type: string | ||
default: development | ||
|
||
permissions: | ||
id-token: write # This is required for requesting the JWT | ||
contents: read # This is required for actions/checkout | ||
|
||
env: | ||
SANDBOX: ${{ inputs.sandbox }} | ||
AWS_REGION: ${{ vars.AWS_REGION }} | ||
URL: ${{ vars.MNS_SUBSCRIPTION_URL }} | ||
|
||
jobs: | ||
Subscribe_to_MNS: | ||
runs-on: ubuntu-latest | ||
environment: ${{ inputs.environment }} | ||
steps: | ||
- name: Configure AWS Credentials | ||
uses: aws-actions/configure-aws-credentials@v4 | ||
with: | ||
role-to-assume: ${{ secrets.AWS_ASSUME_ROLE }} | ||
role-skip-session-tagging: true | ||
aws-region: ${{ vars.AWS_REGION }} | ||
mask-aws-account-id: true | ||
|
||
- name: Checkout | ||
uses: actions/checkout@v4 | ||
|
||
- name: Set up Python | ||
uses: actions/setup-python@v5 | ||
with: | ||
python-version: '3.11' | ||
cache: 'pip' | ||
|
||
- name: Install dependencies | ||
run: | | ||
pip install boto3 requests pyjwt cryptography | ||
echo "Installed requirements" | ||
- name: Run script | ||
working-directory: ./lambdas | ||
run: | | ||
python3 -m scripts.mns_subscription | ||
echo "Subscription complete" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from enum import StrEnum | ||
|
||
|
||
class MNSNotificationTypes(StrEnum): | ||
CHANGE_OF_GP = "pds-change-of-gp-1" | ||
DEATH_NOTIFICATION = "pds-death-notification-1" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
import json | ||
|
||
from enums.mns_notification_types import MNSNotificationTypes | ||
from models.mns_sqs_message import MNSSQSMessage | ||
from pydantic_core._pydantic_core import ValidationError | ||
from services.process_mns_message_service import MNSNotificationService | ||
from utils.audit_logging_setup import LoggingService | ||
from utils.decorators.ensure_env_var import ensure_environment_variables | ||
from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions | ||
from utils.decorators.override_error_check import override_error_check | ||
from utils.decorators.set_audit_arg import set_request_context_for_logging | ||
from utils.request_context import request_context | ||
|
||
logger = LoggingService(__name__) | ||
|
||
|
||
@set_request_context_for_logging | ||
@ensure_environment_variables( | ||
names=[ | ||
"APPCONFIG_CONFIGURATION", | ||
"APPCONFIG_ENVIRONMENT", | ||
"LLOYD_GEORGE_DYNAMODB_NAME", | ||
"MNS_NOTIFICATION_QUEUE_URL", | ||
] | ||
) | ||
@override_error_check | ||
@handle_lambda_exceptions | ||
def lambda_handler(event, context): | ||
logger.info(f"Received MNS notification event: {event}") | ||
notification_service = MNSNotificationService() | ||
sqs_messages = event.get("Records", []) | ||
|
||
for sqs_message in sqs_messages: | ||
try: | ||
sqs_message = json.loads(sqs_message["body"]) | ||
|
||
mns_message = MNSSQSMessage(**sqs_message) | ||
MNSSQSMessage.model_validate(mns_message) | ||
|
||
request_context.patient_nhs_no = mns_message.subject.nhs_number | ||
|
||
if mns_message.type in MNSNotificationTypes.__members__.values(): | ||
notification_service.handle_mns_notification(mns_message) | ||
|
||
except ValidationError as error: | ||
logger.error("Malformed MNS notification message") | ||
logger.error(error) | ||
except Exception as error: | ||
logger.error(f"Error processing SQS message: {error}.") | ||
logger.info("Continuing to next message.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
from pydantic import AliasGenerator, BaseModel, ConfigDict | ||
from pydantic.alias_generators import to_camel | ||
|
||
|
||
class MNSMessageSubject(BaseModel): | ||
model_config = ConfigDict( | ||
alias_generator=AliasGenerator( | ||
validation_alias=to_camel, serialization_alias=to_camel | ||
), | ||
) | ||
nhs_number: str | ||
|
||
|
||
class MNSSQSMessage(BaseModel): | ||
model_config = ConfigDict( | ||
alias_generator=AliasGenerator( | ||
validation_alias=to_camel, serialization_alias=to_camel | ||
), | ||
) | ||
id: str | ||
type: str | ||
subject: MNSMessageSubject | ||
data: dict |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import os | ||
from datetime import datetime | ||
|
||
from botocore.exceptions import ClientError | ||
from enums.death_notification_status import DeathNotificationStatus | ||
from enums.metadata_field_names import DocumentReferenceMetadataFields | ||
from enums.mns_notification_types import MNSNotificationTypes | ||
from enums.patient_ods_inactive_status import PatientOdsInactiveStatus | ||
from models.mns_sqs_message import MNSSQSMessage | ||
from services.base.dynamo_service import DynamoDBService | ||
from services.base.sqs_service import SQSService | ||
from utils.audit_logging_setup import LoggingService | ||
from utils.exceptions import PdsErrorException | ||
from utils.utilities import get_pds_service | ||
|
||
logger = LoggingService(__name__) | ||
|
||
|
||
class MNSNotificationService: | ||
def __init__(self): | ||
self.dynamo_service = DynamoDBService() | ||
self.table = os.getenv("LLOYD_GEORGE_DYNAMODB_NAME") | ||
self.pds_service = get_pds_service() | ||
self.sqs_service = SQSService() | ||
self.queue = os.getenv("MNS_NOTIFICATION_QUEUE_URL") | ||
|
||
def handle_mns_notification(self, message: MNSSQSMessage): | ||
try: | ||
match message.type: | ||
case MNSNotificationTypes.CHANGE_OF_GP: | ||
logger.info("Handling GP change notification.") | ||
self.handle_gp_change_notification(message) | ||
case MNSNotificationTypes.DEATH_NOTIFICATION: | ||
logger.info("Handling death status notification.") | ||
self.handle_death_notification(message) | ||
|
||
except PdsErrorException: | ||
logger.info("An error occurred when calling PDS") | ||
self.send_message_back_to_queue(message) | ||
|
||
except ClientError as e: | ||
logger.info( | ||
f"Unable to process message: {message.id}, of type: {message.type}" | ||
) | ||
logger.info(f"{e}") | ||
|
||
def handle_gp_change_notification(self, message: MNSSQSMessage): | ||
patient_document_references = self.get_patient_documents( | ||
message.subject.nhs_number | ||
) | ||
|
||
if not self.patient_is_present_in_ndr(patient_document_references): | ||
return | ||
|
||
updated_ods_code = self.get_updated_gp_ods(message.subject.nhs_number) | ||
|
||
for reference in patient_document_references: | ||
if reference["CurrentGpOds"] is not updated_ods_code: | ||
self.dynamo_service.update_item( | ||
table_name=self.table, | ||
key=reference["ID"], | ||
updated_fields={ | ||
DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: updated_ods_code, | ||
DocumentReferenceMetadataFields.LAST_UPDATED.value: int( | ||
datetime.now().timestamp() | ||
), | ||
}, | ||
) | ||
|
||
logger.info("Update complete for change of GP") | ||
|
||
def handle_death_notification(self, message: MNSSQSMessage): | ||
death_notification_type = message.data["deathNotificationStatus"] | ||
match death_notification_type: | ||
case DeathNotificationStatus.INFORMAL: | ||
logger.info( | ||
"Patient is deceased - INFORMAL, moving on to the next message." | ||
) | ||
return | ||
|
||
case DeathNotificationStatus.REMOVED: | ||
patient_documents = self.get_patient_documents( | ||
message.subject.nhs_number | ||
) | ||
if not self.patient_is_present_in_ndr(patient_documents): | ||
return | ||
|
||
updated_ods_code = self.get_updated_gp_ods(message.subject.nhs_number) | ||
self.update_patient_ods_code(patient_documents, updated_ods_code) | ||
logger.info("Update complete for death notification change.") | ||
|
||
case DeathNotificationStatus.FORMAL: | ||
patient_documents = self.get_patient_documents( | ||
message.subject.nhs_number | ||
) | ||
if not self.patient_is_present_in_ndr(patient_documents): | ||
return | ||
|
||
self.update_patient_ods_code( | ||
patient_documents, PatientOdsInactiveStatus.DECEASED | ||
) | ||
logger.info( | ||
f"Update complete, patient marked {PatientOdsInactiveStatus.DECEASED}." | ||
) | ||
|
||
def get_patient_documents(self, nhs_number: str): | ||
logger.info("Getting patient document references...") | ||
response = self.dynamo_service.query_table_by_index( | ||
table_name=self.table, | ||
index_name="NhsNumberIndex", | ||
search_key="NhsNumber", | ||
search_condition=nhs_number, | ||
) | ||
return response["Items"] | ||
|
||
def update_patient_ods_code(self, patient_documents: list[dict], code: str) -> None: | ||
for document in patient_documents: | ||
logger.info("Updating patient document reference...") | ||
self.dynamo_service.update_item( | ||
table_name=self.table, | ||
key=document["ID"], | ||
updated_fields={ | ||
DocumentReferenceMetadataFields.CURRENT_GP_ODS.value: code, | ||
DocumentReferenceMetadataFields.LAST_UPDATED.value: int( | ||
datetime.now().timestamp() | ||
), | ||
}, | ||
) | ||
|
||
def get_updated_gp_ods(self, nhs_number: str) -> str: | ||
patient_details = self.pds_service.fetch_patient_details(nhs_number) | ||
return patient_details.general_practice_ods | ||
|
||
def send_message_back_to_queue(self, message: MNSSQSMessage): | ||
logger.info("Sending message back to queue...") | ||
self.sqs_service.send_message_standard( | ||
queue_url=self.queue, message_body=message.model_dump_json(by_alias=True) | ||
) | ||
|
||
def patient_is_present_in_ndr(self, dynamo_response): | ||
if len(dynamo_response) < 1: | ||
logger.info("Patient is not held in the National Document Repository.") | ||
logger.info("Moving on to the next message.") | ||
return False | ||
else: | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.