Skip to content

Commit

Permalink
prmp-1287 tidy up code
Browse files Browse the repository at this point in the history
  • Loading branch information
NogaNHS committed Dec 6, 2024
1 parent e77cf79 commit a4a4c37
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 40 deletions.
3 changes: 1 addition & 2 deletions lambdas/handlers/nrl_get_document_reference_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@
"APPCONFIG_APPLICATION",
"APPCONFIG_CONFIGURATION",
"APPCONFIG_ENVIRONMENT",
"NRL_API_ENDPOINT",
# "NRL_END_USER_ODS_CODE",
"LLOYD_GEORGE_DYNAMODB_NAME",
# "LLOYD_GEORGE_BUCKET_NAME",
"LLOYD_GEORGE_BUCKET_NAME",
"PRESIGNED_ASSUME_ROLE",
"CLOUDFRONT_URL",
"OIDC_USER_INFO_URL",
Expand Down
58 changes: 23 additions & 35 deletions lambdas/services/nrl_get_document_reference_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from services.base.dynamo_service import DynamoDBService
from services.base.ssm_service import SSMService
from utils.audit_logging_setup import LoggingService
from utils.constants.ssm import GP_ADMIN_USER_ROLE_CODES, GP_CLINICAL_USER_ROLE_CODE
from utils.lambda_exceptions import NRLGetDocumentReferenceException
from utils.request_context import request_context
from utils.utilities import get_pds_service
Expand All @@ -16,35 +17,32 @@


class NRLGetDocumentReferenceService:

# Needs PDS now, so oauth and ssm, may need declaring the handler

def __init__(self):
self.ssm_service = SSMService()
self.pds_service = get_pds_service()
# question about ssm_service as above function calls one...
self.dynamo_service = DynamoDBService()
self.table = os.getenv("LLOYD_GEORGE_DYNAMODB_NAME")
# document_service? not using NHS_numbers, using table ids
self.ssm_prefix = getattr(request_context, "auth_ssm_prefix", "")

def user_allowed_to_see_file(self, bearer_token, id):
def user_allowed_to_see_file(self, bearer_token, document_id):
user_details = self.fetch_user_info(bearer_token)
user_ods_codes_and_roles = self.get_user_roles_and_ods_codes(user_details)

user = self.fetch_user_info(bearer_token)
user_ods_codes_and_roles = self.get_user_roles_and_ods_codes(user)
document_reference = self.get_document_references(document_id)

document_reference = self.get_document_references(id)
patient_current_gp_ods_code = self.get_patient_current_gp_ods(
document_reference.nhs_number
)

if self.patient_is_inactive(patient_current_gp_ods_code):
return False

for ods_code, roles in user_ods_codes_and_roles.items():
if ods_code == patient_current_gp_ods_code:
for role in roles:
return role in self.get_ndr_accepted_role_codes()
if patient_current_gp_ods_code in user_ods_codes_and_roles:
accepted_roles = self.get_ndr_accepted_role_codes()
return any(
role in accepted_roles
for role in user_ods_codes_and_roles[patient_current_gp_ods_code]
)

# first check user has a role with correct ods code
# second check that the role id associated with this ods is in our accepted roles
Expand All @@ -69,46 +67,37 @@ def fetch_user_info(self, bearer_token) -> dict:
pass

def get_ndr_accepted_role_codes(self) -> list[str]:
roles = []
ssm_parameters = self.ssm_service.get_ssm_parameters(
parameters_keys=[
"/auth/smartcard/role/gp_admin",
"/auth/smartcard/role/gp_clinical",
GP_ADMIN_USER_ROLE_CODES,
GP_CLINICAL_USER_ROLE_CODE,
]
)
return [roles.split(",") for roles in ssm_parameters.values()]

for key, value in ssm_parameters.items():
for role in value:
roles.append(role)
return roles

def get_user_roles_and_ods_codes(self, user_info) -> list[dict[str, str]]:

def get_user_roles_and_ods_codes(self, user_info) -> dict[str, list[str]]:
ods_codes_and_roles = {}

nrbac_roles = user_info.get("nhsid_nrbac_roles", [])

for role in nrbac_roles:
ods_code = role["org_code"]
ods_code: str = role["org_code"]
role_code = self.process_role_code(role["role_code"])
if ods_code in ods_codes_and_roles:
ods_codes_and_roles[ods_code].append(role_code)
else:
ods_codes_and_roles[ods_code] = [role_code]

ods_codes_and_roles.setdefault(ods_code, []).append(role_code)
return ods_codes_and_roles

def process_role_code(self, role_codes) -> str:
role_codes_split = role_codes.split(":")
for role_code in role_codes_split:
if role_code[0].upper() == "R":
for role_code in role_codes.split(":"):
if role_code.startswith("R"):
return role_code

def get_dynamo_table_to_search_for_patient(self, snomed_code):
pass

def patient_is_inactive(self, current_gp_ods_code):
return current_gp_ods_code in PatientOdsInactiveStatus
try:
return current_gp_ods_code in PatientOdsInactiveStatus
except TypeError:
return False

def get_document_references(self, id: str) -> DocumentReference:

Expand All @@ -122,7 +111,6 @@ def get_document_references(self, id: str) -> DocumentReference:
return DocumentReference.model_validate(table_item["Items"][0])
else:
raise NRLGetDocumentReferenceException(
message="No document references found",
status_code=404,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def mock_service(mocker, set_env, context):
def mock_fetch_user_info(mock_service, mocker):
service = mock_service
mocker.patch.object(service, "fetch_user_info")
mocker.patch.object(service, "get_patient_current_gp_ods")
yield service


Expand Down Expand Up @@ -89,12 +90,12 @@ def test_get_document_reference_service(mock_service):


def test_user_allowed_to_see_file_happy_path(mock_service, mock_fetch_user_info):
mock_fetch_user_info.return_value = MOCK_USER_INFO
mock_fetch_user_info.fetch_user_info.return_value = MOCK_USER_INFO
mock_service.dynamo_service.query_table_by_index.return_value = (
MOCK_SINGLE_DOCUMENT_RESPONSE
)
mock_service.get_ndr_accepted_role_codes.return_value = ["R8000, R8008"]

mock_service.get_ndr_accepted_role_codes.return_value = ["R8000", "R8008"]
mock_service.get_patient_current_gp_ods.return_value = TEST_CURRENT_GP_ODS
assert (
mock_service.user_allowed_to_see_file(
TEST_UUID, "3d8683b9-1665-40d2-8499-6e8302d507ff"
Expand Down

0 comments on commit a4a4c37

Please sign in to comment.