Skip to content

Commit

Permalink
[PRMP-763] add logger to batchupdate
Browse files Browse the repository at this point in the history
  • Loading branch information
steph-torres-nhs committed Aug 20, 2024
1 parent d83f6a1 commit 1f1b794
Showing 1 changed file with 45 additions and 47 deletions.
92 changes: 45 additions & 47 deletions lambdas/services/batch_update_ods_code.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import importlib
import logging
import os.path
import sys
import time
from typing import Dict

Expand All @@ -11,9 +8,12 @@
from services.base.dynamo_service import DynamoDBService
from services.base.ssm_service import SSMService
from services.pds_api_service import PdsApiService
from utils.audit_logging_setup import LoggingService

Fields = DocumentReferenceMetadataFields

logger = LoggingService(__name__)


class ProgressForPatient(BaseModel):
nhs_number: str
Expand All @@ -35,28 +35,26 @@ def __init__(
self.dynamo_service = DynamoDBService()
self.progress: Dict[str, ProgressForPatient] = {}

self.logger = logging.getLogger("BatchUpdateOds")

def main(self):
self.logger.info("Starting batch update script")
self.logger.info(f"Table to be updated: {self.table_name}")
logger.info("Starting batch update script")
logger.info(f"Table to be updated: {self.table_name}")

if self.found_previous_progress():
self.logger.info("Resuming from previous job")
logger.info("Resuming from previous job")
self.resume_previous_progress()
else:
self.logger.info("Starting a new job")
logger.info("Starting a new job")
self.initialise_new_job()

try:
self.run_update()
except Exception as e:
self.logger.error(e)
logger.error(e)
raise e

def run_update(self):
if len(self.progress) == 0:
self.logger.info(
logger.info(
f'No patient found in local progress file. Please try removing the local progress file: "{self.progress}"'
)
exit()
Expand All @@ -67,7 +65,7 @@ def run_update(self):
if not status.update_completed
]
if not patients_to_be_updated:
self.logger.info(
logger.info(
"Already updated the ODS codes for all patients in previous run."
)
exit()
Expand All @@ -77,17 +75,17 @@ def run_update(self):
for [current_count, nhs_number] in enumerate(
patients_to_be_updated, start=count_of_completed + 1
):
self.logger.info(
logger.info(
f"Updating record for NHS number {nhs_number} ({current_count} of {total_count})"
)
self.update_patient_ods(nhs_number)

self.logger.info("Finished updating all patient's ODS codes")
logger.info("Finished updating all patient's ODS codes")

def update_patient_ods(self, nhs_number: str):
updated_gp_ods = self.get_updated_gp_ods(nhs_number)
if updated_gp_ods == self.progress[nhs_number].prev_ods_code:
self.logger.info(f"No change in GP for patient: {nhs_number}")
logger.info(f"No change in GP for patient: {nhs_number}")
else:
documents_to_update = self.progress[nhs_number].doc_ref_ids
updated_fields = {Fields.CURRENT_GP_ODS.value: updated_gp_ods}
Expand All @@ -99,15 +97,15 @@ def update_patient_ods(self, nhs_number: str):
updated_fields=updated_fields,
)

self.logger.info(f"Updated ODS code for patient: {nhs_number}")
logger.info(f"Updated ODS code for patient: {nhs_number}")

self.progress[nhs_number].new_ods_code = updated_gp_ods
self.progress[nhs_number].update_completed = True
self.save_progress()

def get_updated_gp_ods(self, nhs_number: str) -> str:
time.sleep(0.2) # buffer to avoid over stretching PDS API
self.logger.debug("Getting the latest ODS code from PDS...")
logger.debug("Getting the latest ODS code from PDS...")

pds_response = self.pds_service.pds_request(
nhs_number=nhs_number, retry_on_expired=True
Expand All @@ -131,7 +129,7 @@ def get_updated_gp_ods(self, nhs_number: str) -> str:
def initialise_new_job(self):
all_entries = self.list_all_entries()
if len(all_entries) == 0:
self.logger.info(
logger.info(
f"No records was found in table {self.table_name}. Please check the table name."
)
exit()
Expand All @@ -146,7 +144,7 @@ def resume_previous_progress(self):
Dict[str, ProgressForPatient]
).validate_json(json_str)
except FileNotFoundError:
self.logger.info("Cannot find a progress file. Will start a new job.")
logger.info("Cannot find a progress file. Will start a new job.")
self.initialise_new_job()

def found_previous_progress(self) -> bool:
Expand All @@ -160,7 +158,7 @@ def save_progress(self):
return f.write(json_str)

def list_all_entries(self) -> list[dict]:
self.logger.info("Fetching all records from dynamodb table...")
logger.info("Fetching all records from dynamodb table...")

table = DynamoDBService().get_table(self.table_name)
results = []
Expand All @@ -180,12 +178,12 @@ def list_all_entries(self) -> list[dict]:

results += response["Items"]

self.logger.info(f"Downloaded {len(results)} records from table")
logger.info(f"Downloaded {len(results)} records from table")

return results

def build_progress_dict(self, dynamodb_records: list[dict]) -> dict:
self.logger.info("Grouping the records according to NHS number...")
logger.info("Grouping the records according to NHS number...")

progress_dict = {}
for entry in dynamodb_records:
Expand All @@ -207,31 +205,31 @@ def build_progress_dict(self, dynamodb_records: list[dict]) -> dict:
"[multiple ods codes in records]"
)

self.logger.info(f"Totally {len(progress_dict)} patients found in record.")
logger.info(f"Totally {len(progress_dict)} patients found in record.")
return progress_dict


def setup_logging_for_local_script():
importlib.reload(logging)

logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s",
datefmt="%d/%b/%Y %H:%M:%S",
stream=sys.stdout,
)


if __name__ == "__main__":
import argparse

setup_logging_for_local_script()

parser = argparse.ArgumentParser(
prog="batch_update_ods_code.py",
description="A utility script to update the ODS Codes for all patients in a dynamoDB doc reference table",
)
parser.add_argument("table_name", type=str, help="The name of dynamodb table")
args = parser.parse_args()

BatchUpdate(table_name=args.table_name).main()
# def setup_logging_for_local_script():
# importlib.reload(logging)
#
# logging.basicConfig(
# level=logging.INFO,
# format="[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d] %(message)s",
# datefmt="%d/%b/%Y %H:%M:%S",
# stream=sys.stdout,
# )
#
#
# if __name__ == "__main__":
# import argparse
#
# setup_logging_for_local_script()
#
# parser = argparse.ArgumentParser(
# prog="batch_update_ods_code.py",
# description="A utility script to update the ODS Codes for all patients in a dynamoDB doc reference table",
# )
# parser.add_argument("table_name", type=str, help="The name of dynamodb table")
# args = parser.parse_args()
#
# BatchUpdate(table_name=args.table_name).main()

0 comments on commit 1f1b794

Please sign in to comment.