Skip to content

Commit

Permalink
PRMDR 620 - processing NEMS messages lambda
Browse files Browse the repository at this point in the history
Co-authored-by: NogaNHS <noga.sasson1@nhs.net>
Co-authored-by: RachelHowellNHS <rachel.howell6@nhs.net>
  • Loading branch information
3 people authored Feb 7, 2024
1 parent 4ef67d4 commit 6238360
Show file tree
Hide file tree
Showing 17 changed files with 1,505 additions and 1 deletion.
13 changes: 13 additions & 0 deletions .github/workflows/base-lambdas-reusable-deploy-all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,16 @@ jobs:
lambda_aws_name: SendFeedbackLambda
secrets:
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}

deploy_nems_message_lambda:
name: Deploy nems_message_lambda
uses: ./.github/workflows/base-lambdas-reusable-deploy.yml
with:
environment: ${{ inputs.environment}}
python_version: ${{ inputs.python_version }}
build_branch: ${{ inputs.build_branch}}
sandbox: ${{ inputs.sandbox }}
lambda_handler_name: nems_message_handler
lambda_aws_name: NemsMessageLambda
secrets:
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}
5 changes: 5 additions & 0 deletions lambdas/enums/fhir_resource_message_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from enum import Enum


class FHIR_RESOURCE_MESSAGE_TYPES(str, Enum):
ChangeOfGP = "pds-change-of-gp-1"
7 changes: 7 additions & 0 deletions lambdas/enums/fhir_resource_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import Enum


class FHIR_RESOURCE_TYPES(str, Enum):
MessageHeader = "MessageHeader"
Patient = "Patient"
Organisation = "Organization"
7 changes: 7 additions & 0 deletions lambdas/enums/nems_error_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import Enum


class NEMS_ERROR_TYPES(str, Enum):
Transient = "transient"
Data = "data"
Validation = "validation"
32 changes: 32 additions & 0 deletions lambdas/handlers/nems_message_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from services.process_nems_message_service import ProcessNemsMessageService
from utils.audit_logging_setup import LoggingService
from utils.decorators.ensure_env_var import ensure_environment_variables
from utils.decorators.override_error_check import override_error_check
from utils.decorators.set_audit_arg import set_request_context_for_logging
from utils.lambda_response import ApiGatewayResponse

logger = LoggingService(__name__)


@set_request_context_for_logging
@override_error_check
@ensure_environment_variables(["LLOYD_GEORGE_DYNAMODB_NAME"])
def lambda_handler(event, context):
logger.info(f"Received event: {event}")
if "Records" not in event or len(event["Records"]) < 1:
http_status_code = 400
response_body = (
f"No sqs messages found in event: {event}. Will ignore this event"
)
logger.error(response_body, {"Result": "Process nems message failed"})
return ApiGatewayResponse(
status_code=http_status_code, body=response_body, methods="GET"
).create_api_gateway_response()
else:
sqs_batch_response = {}
process_nems_message_service = ProcessNemsMessageService()
batch_item_failures = process_nems_message_service.process_messages_from_event(
event["Records"]
)
sqs_batch_response["batchItemFailures"] = batch_item_failures
return sqs_batch_response
3 changes: 2 additions & 1 deletion lambdas/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ botocore==1.33.11
certifi==2023.7.22
cffi==1.15.1
charset-normalizer==3.2.0
cryptography==41.0.7
cryptography==42.0.2
email-validator==2.1.0.post1
fhir.resources[xml]==7.1.0
idna==3.4
inflection==0.5.1
jmespath==1.0.1
Expand Down
11 changes: 11 additions & 0 deletions lambdas/services/document_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,14 @@ def delete_documents(
self.dynamo_service.update_item(
table_name, reference.id, updated_fields=update_fields
)

def update_documents(
self,
table_name: str,
document_references: list[DocumentReference],
update_fields: dict,
):
for reference in document_references:
self.dynamo_service.update_item(
table_name, reference.id, updated_fields=update_fields
)
178 changes: 178 additions & 0 deletions lambdas/services/process_nems_message_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import os
from xml.etree import ElementTree
from xml.etree.ElementTree import ParseError

from botocore.exceptions import ClientError
from enums.fhir_resource_message_types import FHIR_RESOURCE_MESSAGE_TYPES
from enums.fhir_resource_types import FHIR_RESOURCE_TYPES
from enums.nems_error_types import NEMS_ERROR_TYPES
from fhir.resources.STU3.bundle import Bundle
from pydantic import ValidationError as PydanticValidationError
from pydantic.v1.error_wrappers import ValidationError as PydanticValidationErrorWrapper
from services.document_service import DocumentService
from services.ods_api_service import OdsApiService
from utils.audit_logging_setup import LoggingService
from utils.exceptions import (
FhirResourceNotFound,
InvalidResourceIdException,
OdsErrorException,
OrganisationNotFoundException,
)
from utils.fhir_bundle_parser import map_bundle_entries_to_dict
from utils.request_context import request_context
from utils.utilities import validate_id

logger = LoggingService(__name__)


class ProcessNemsMessageService:
def __init__(self):
self.ods_api_service = OdsApiService()
self.document_service = DocumentService()
self.table = os.environ["LLOYD_GEORGE_DYNAMODB_NAME"]

def process_messages_from_event(self, records: list):
batch_item_failures = []

for message in records:
response = self.handle_message(message)
logger.info(response)
if response is not None:
batch_item_failures.append({"itemIdentifier": message["messageId"]})

return batch_item_failures

def handle_message(self, message: dict):
try:
mesh_message_id = "No mesh id identified"
mesh_attributes = message["messageAttributes"]
mesh_message_id = mesh_attributes.get("meshMessageId")
nems_message = message["body"]

ElementTree.fromstring(nems_message)
bundle = Bundle.parse_raw(nems_message, content_type="text/xml")
mapped_bundle = map_bundle_entries_to_dict(bundle)
message_header = mapped_bundle.get(
FHIR_RESOURCE_TYPES.MessageHeader.value, None
)

if message_header is None:
raise FhirResourceNotFound(
{"resourceType": FHIR_RESOURCE_TYPES.MessageHeader, "details": ""}
)

# We do not process messages that are not of an expected type
if (
message_header[0].resource.event.code
== FHIR_RESOURCE_MESSAGE_TYPES.ChangeOfGP
):
return self.handle_change_of_gp_message(mapped_bundle)
else:
logger.info(
f"The NEMs message is not of a type that we support: {mesh_message_id}"
)

except (PydanticValidationError, PydanticValidationErrorWrapper):
logger.error(
f"Validation error - Invalid NEMS message, message id: {mesh_message_id}, ignoring message"
)
return NEMS_ERROR_TYPES.Validation
except SyntaxError as e:
logger.error(
f"Syntax error: {e}, message id: {mesh_message_id}, ignoring message"
)
return NEMS_ERROR_TYPES.Validation
except KeyError:
logger.error(
f"Message body is not a Bundle, message id: {mesh_message_id}, ignoring message"
)
return NEMS_ERROR_TYPES.Validation
except ParseError:
logger.error(
f"Parse error - Invalid NEMS message, message id: {mesh_message_id}, ignoring message"
)
return NEMS_ERROR_TYPES.Validation
except InvalidResourceIdException:
logger.error(
f"Invalid nhs number, message id: {mesh_message_id}, ignoring message"
)
return NEMS_ERROR_TYPES.Data
except OdsErrorException:
return NEMS_ERROR_TYPES.Data
except OrganisationNotFoundException:
logger.error(
f"ODS code for new GPP is invalid, message id: {mesh_message_id}"
)
return NEMS_ERROR_TYPES.Data
except FhirResourceNotFound as e:
args = e.args[0]
resource_type = args["resourceType"]
logger.error(
f"Expected FHIR entry is missing of type: {resource_type.value}"
)
return NEMS_ERROR_TYPES.Data
except ClientError as e:
logger.error(f"Error with one of our services, {e}")
logger.error(
f"Returning the message back to the queue, message id: {mesh_message_id}"
)
return NEMS_ERROR_TYPES.Transient

def handle_change_of_gp_message(self, mapped_bundle: dict):
patient_entries = mapped_bundle.get(FHIR_RESOURCE_TYPES.Patient.value)

if patient_entries is None:
raise FhirResourceNotFound(
{"resourceType": FHIR_RESOURCE_TYPES.Patient, "details": ""}
)

patient = patient_entries[0].resource
patient_active_gp = patient.generalPractitioner[0].reference
patient_nhs_number = patient.identifier[0].value
validate_id(patient_nhs_number)

organisation_entries = mapped_bundle.get(FHIR_RESOURCE_TYPES.Organisation.value)
if organisation_entries is None:
raise FhirResourceNotFound(
{"resourceType": FHIR_RESOURCE_TYPES.Organisation, "details": ""}
)

active_gp_resource = None
for organisation in organisation_entries:
if organisation.fullUrl == patient_active_gp:
active_gp_resource = organisation.resource

if not active_gp_resource:
raise FhirResourceNotFound(
{
"resourceType": FHIR_RESOURCE_TYPES.Organisation,
"details": "Active GP could not be identified",
}
)

# We use NHS number to look up where the ods needs to be updated,
# therefore we're not worried about getting the previous gp resource
active_gp_ods = active_gp_resource.identifier[0].value

request_context.patient_nhs_no = patient_nhs_number
self.validate_nems_details(active_gp_ods)
self.update_LG_table_with_current_GP(patient_nhs_number, active_gp_ods)
return

def validate_nems_details(self, new_ods_code: str):
self.ods_api_service.fetch_organisation_data(new_ods_code)

def update_LG_table_with_current_GP(self, nhs_number: str, new_ods_code: str):
logger.info("getting record from DB")
documents = self.document_service.fetch_documents_from_table(
nhs_number, self.table
)
if documents:
logger.info(f"doc: {documents}")
self.document_service.update_documents(
self.table, documents, {"CurrentGpOds": new_ods_code}
)
else:
logger.info(
f"no records were found for the following nhs number: {nhs_number}, ignoring message"
)
Loading

0 comments on commit 6238360

Please sign in to comment.