Skip to content

Commit

Permalink
[PRMP-1287] start to implement user validation
Browse files Browse the repository at this point in the history
  • Loading branch information
steph-torres-nhs committed Dec 6, 2024
1 parent ae321e1 commit 5639cda
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 35 deletions.
28 changes: 15 additions & 13 deletions lambdas/handlers/nrl_get_document_reference_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from services.nrl_get_document_reference_service import NRLGetDocumentReferenceService
from utils.audit_logging_setup import LoggingService
from utils.decorators.ensure_env_var import ensure_environment_variables
from utils.decorators.set_audit_arg import set_request_context_for_logging
from utils.exceptions import AuthorisationException
from utils.lambda_response import ApiGatewayResponse

logger = LoggingService(__name__)
Expand All @@ -24,19 +24,21 @@
)
def lambda_handler(event, context):

get_document_service = NRLGetDocumentReferenceService()
try:
# # document_id = event["pathParameters"]["id"]
# # bearer_token = event["headers"]["Authorization"]
#
# get_document_service = NRLGetDocumentReferenceService()

# document_id = event["pathParameters"]["id"]
# bearer_token = event["headers"]["Authorization"]
#
# def extract_document_details_from_event():
# pass
#
# def extract_document_details_from_event():
# pass

if not get_document_service.user_allowed_to_see_file():
return ApiGatewayResponse(
status_code=403, body="?"
).create_api_gateway_response()
placeholder = "cloudfront presigned url"

placeholder = "cloudfront presigned url"
return ApiGatewayResponse(status_code=200, body=placeholder, methods="GET")
# except NoAvailableDocument() as error:
# return ApiGatewayResponse()

return ApiGatewayResponse(status_code=200, body=placeholder, methods="GET")
except AuthorisationException() as error:
return ApiGatewayResponse(status_code=403, body=error.body, methods="GET")
95 changes: 73 additions & 22 deletions lambdas/services/nrl_get_document_reference_service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import os
from urllib.error import HTTPError

import requests
from enums.metadata_field_names import DocumentReferenceMetadataFields
from enums.patient_ods_inactive_status import PatientOdsInactiveStatus
from models.document_reference import DocumentReference
from services.base.dynamo_service import DynamoDBService
from services.base.ssm_service import SSMService
from utils.audit_logging_setup import LoggingService
from utils.lambda_exceptions import NRLGetDocumentReferenceException
from utils.request_context import request_context
from utils.utilities import get_pds_service

Expand All @@ -17,15 +23,28 @@ def __init__(self):
self.ssm_service = SSMService()
self.pds_service = get_pds_service()
# question about ssm_service as above function calls one...
# dynamo_service
# dynamo_table
self.dynamo_service = DynamoDBService()
self.table = os.getenv("LLOYD_GEORGE_DYNAMODB_NAME")
# document_service? not using NHS_numbers, using table ids
self.ssm_prefix = getattr(request_context, "auth_ssm_prefix", "")

def user_allowed_to_see_file(self, bearer_token):
def user_allowed_to_see_file(self, bearer_token, id):

user = self.fetch_user_info(bearer_token)
self.get_user_role_and_ods(user)
user_ods_codes_and_roles = self.get_user_roles_and_ods_codes(user)

document_reference = self.get_document_references(id)
patient_current_gp_ods_code = self.get_patient_current_gp_ods(
document_reference.nhs_number
)

if self.patient_is_inactive(patient_current_gp_ods_code):
return False

for ods_code, roles in user_ods_codes_and_roles.items():
if ods_code == patient_current_gp_ods_code:
for role in roles:
return role in self.get_ndr_accepted_role_codes()

# first check user has a role with correct ods code
# second check that the role id associated with this ods is in our accepted roles
Expand All @@ -49,22 +68,35 @@ def fetch_user_info(self, bearer_token) -> dict:
# Check status code, and raise?
pass

def get_ndr_accepted_role_codes(self):
pass
def get_ndr_accepted_role_codes(self) -> list[str]:
roles = []
ssm_parameters = self.ssm_service.get_ssm_parameters(
parameters_keys=[
"/auth/smartcard/role/gp_admin",
"/auth/smartcard/role/gp_clinical",
]
)

for key, value in ssm_parameters.items():
for role in value:
roles.append(role)
return roles

def get_user_roles_and_ods_codes(self, user_info) -> list[dict[str, str]]:

# ods_codes_and_roles = []
#
# nrbac_roles = user_info.get("nhsid_nrbac_roles", []);
#
# for role in nrbac_roles:
# ods_code = role["org_code"]
# role_code = self.process_role_code(role["role_code"])
ods_codes_and_roles = {}

# get all roles, loop through, get ods and role code,
# check these against doc ref ods and role codes stored in ssm of both clinical and admin
pass
nrbac_roles = user_info.get("nhsid_nrbac_roles", [])

for role in nrbac_roles:
ods_code = role["org_code"]
role_code = self.process_role_code(role["role_code"])
if ods_code in ods_codes_and_roles:
ods_codes_and_roles[ods_code].append(role_code)
else:
ods_codes_and_roles[ods_code] = [role_code]

return ods_codes_and_roles

def process_role_code(self, role_codes) -> str:
role_codes_split = role_codes.split(":")
Expand All @@ -75,14 +107,33 @@ def process_role_code(self, role_codes) -> str:
def get_dynamo_table_to_search_for_patient(self, snomed_code):
pass

def patient_is_active(self, current_gp_ods_code):
pass
def patient_is_inactive(self, current_gp_ods_code):
return current_gp_ods_code in PatientOdsInactiveStatus

def get_document_reference(self, table, id):
pass
def get_document_references(self, id: str) -> DocumentReference:

def get_patient_current_gp(self, nhs_number):
pass
table_item = self.dynamo_service.query_table_by_index(
table_name=self.table,
index_name=DocumentReferenceMetadataFields.ID,
search_key=DocumentReferenceMetadataFields.ID,
search_condition=id,
)
if len(table_item["Items"]) > 0:
return DocumentReference.model_validate(table_item["Items"][0])
else:
raise NRLGetDocumentReferenceException(
message="No document references found",
status_code=404,
)

# otherwise we don't have the patient, and want to return what status code? do that here?
# or raise/throw an error that the handler catches and returns the api gateway.

def get_patient_current_gp_ods(self, nhs_number):
patient_details = self.pds_service.fetch_patient_details(nhs_number)
return patient_details.general_practice_ods

# seeing as we're calling pds here, do we want to update our table at the same time?

def generate_cloud_front_url(self):
pass
35 changes: 35 additions & 0 deletions lambdas/tests/unit/helpers/data/dynamo_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,38 @@
"RetryAttempts": 0,
},
}

MOCK_SINGLE_DOCUMENT_RESPONSE = {
"Items": [
{
"ID": "3d8683b9-1665-40d2-8499-6e8302d507ff",
"ContentType": "type",
"Created": "2024-01-01T12:00:00.000Z",
"Deleted": "",
"FileLocation": "s3://test-s3-bucket/9000000009/test-key-123",
"FileName": "document.csv",
"NhsNumber": "9000000009",
"VirusScannerResult": "Clean",
"CurrentGpOds": "Y12345",
"Uploaded": "True",
"Uploading": "False",
"LastUpdated": 1704110400, # Timestamp: 2024-01-01T12:00:00
},
],
"Count": 1,
"ScannedCount": 1,
"ResponseMetadata": {
"RequestId": "JHJBP4GU007VMB2V8C9NEKUL8VVV4KQNSO5AEMVJF66Q9ASUAAJG",
"HTTPStatusCode": 200,
"HTTPHeaders": {
"server": "Server",
"date": "Tue, 29 Aug 2023 11:08:21 GMT",
"content-type": "application/x-amz-json-1.0",
"content-length": "510",
"connection": "keep-alive",
"x-amzn-requestid": "JHJBP4GU007VMB2V8C9NEKUL8VVV4KQNSO5AEMVJF66Q9ASUAAJG",
"x-amz-crc32": "820258331",
},
"RetryAttempts": 0,
},
}
103 changes: 103 additions & 0 deletions lambdas/tests/unit/services/test_nrl_get_document_reference_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import pytest
from models.document_reference import DocumentReference
from services.nrl_get_document_reference_service import NRLGetDocumentReferenceService
from tests.unit.conftest import TEST_CURRENT_GP_ODS, TEST_UUID
from tests.unit.helpers.data.dynamo_responses import MOCK_SINGLE_DOCUMENT_RESPONSE

MOCK_USER_INFO = {
"nhsid_useruid": TEST_UUID,
"name": "TestUserOne Caius Mr",
"nhsid_nrbac_roles": [
{
"person_orgid": "500000000000",
"person_roleid": TEST_UUID,
"org_code": "B9A5A",
"role_name": '"Support":"Systems Support":"Systems Support Access Role"',
"role_code": "S8001:G8005:R8000",
},
{
"person_orgid": "500000000000",
"person_roleid": "500000000000",
"org_code": "B9A5A",
"role_name": '"Primary Care Support England":"Systems Support Access Role"',
"role_code": "S8001:G8005:R8015",
},
{
"person_orgid": "500000000000",
"person_roleid": "500000000000",
"org_code": TEST_CURRENT_GP_ODS,
"role_name": '"Primary Care Support England":"Systems Support Access Role"',
"role_code": "S8001:G8005:R8008",
},
],
"given_name": "Caius",
"family_name": "TestUserOne",
"uid": "500000000000",
"nhsid_user_orgs": [
{"org_name": "NHSID DEV", "org_code": "A9A5A"},
{"org_name": "Primary Care Support England", "org_code": "B9A5A"},
],
"sub": "500000000000",
}


@pytest.fixture
def mock_service(mocker, set_env, context):
service = NRLGetDocumentReferenceService()
mocker.patch.object(service, "ssm_service")
mocker.patch.object(service, "pds_service")
mocker.patch.object(service, "dynamo_service")
mocker.patch.object(service, "get_ndr_accepted_role_codes")
yield service


@pytest.fixture
def mock_fetch_user_info(mock_service, mocker):
service = mock_service
mocker.patch.object(service, "fetch_user_info")
yield service


@pytest.mark.parametrize(
"input, expected",
[
("S8001:G8005:R8000", "R8000"),
("S8001:G8005:R8015", "R8015"),
("S8001:G8005:R8008", "R8008"),
],
)
def test_process_role_code_returns_correct_role(mock_service, input, expected):
assert mock_service.process_role_code(input) == expected


def test_get_user_roles_and_ods_codes(mock_service):
expected = {"B9A5A": ["R8000", "R8015"], TEST_CURRENT_GP_ODS: ["R8008"]}

assert mock_service.get_user_roles_and_ods_codes(MOCK_USER_INFO) == expected


def test_get_document_reference_service(mock_service):
response = mock_service.dynamo_service.query_table_by_index.return_value = (
MOCK_SINGLE_DOCUMENT_RESPONSE
)
expected = DocumentReference.model_validate(response["Items"][0])

actual = mock_service.get_document_references(
"3d8683b9-1665-40d2-8499-6e8302d507ff"
)
assert actual == expected


def test_user_allowed_to_see_file_happy_path(mock_service, mock_fetch_user_info):
mock_fetch_user_info.return_value = MOCK_USER_INFO
mock_service.dynamo_service.query_table_by_index.return_value = (
MOCK_SINGLE_DOCUMENT_RESPONSE
)
mock_service.get_ndr_accepted_role_codes.return_value = ["R8000, R8008"]

assert (
mock_service.user_allowed_to_see_file(
TEST_UUID, "3d8683b9-1665-40d2-8499-6e8302d507ff"
)
is True
)
4 changes: 4 additions & 0 deletions lambdas/utils/lambda_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@ class GenerateManifestZipException(LambdaException):

class CloudFrontEdgeException(LambdaException):
pass


class NRLGetDocumentReferenceException(LambdaException):
pass

0 comments on commit 5639cda

Please sign in to comment.