From 1c8ea30aa8f290f5b2800de86c0f5447a52c45dc Mon Sep 17 00:00:00 2001 From: Chanaka De Silva Date: Fri, 14 Apr 2023 11:20:16 -0400 Subject: [PATCH] Adding prefect2 changes --- data_validation.py | 21 +++++++-------------- 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/data_validation.py b/data_validation.py index df9980d..4c95659 100644 --- a/data_validation.py +++ b/data_validation.py @@ -1,18 +1,13 @@ -from datetime import timedelta -import prefect -from prefect import task, Flow, Parameter +from prefect import task, flow, get_run_logger import time as ttime from tiled.client import from_profile +tiled_client = from_profile("nsls2", username=None) -@task(max_retries=2, retry_delay=timedelta(seconds=10)) +@task(retries=2, retry_delay_seconds=10) def read_all_streams(beamline_acronym, uid): - logger = prefect.context.get("logger") - c = from_profile("nsls2", username=None) - try: - run = c[beamline_acronym][uid] - except KeyError: - run = c[beamline_acronym]["raw"][uid] + logger = get_run_logger() + run = tiled_client[beamline_acronym]["raw"][uid] logger.info(f"Validating uid {run.start['uid']}") start_time = ttime.monotonic() for stream in run: @@ -25,8 +20,6 @@ def read_all_streams(beamline_acronym, uid): elapsed_time = ttime.monotonic() - start_time logger.info(f"{elapsed_time = }") - -with Flow("general-data-validation") as flow: - beamline_acronym = Parameter("beamline_acronym") - uid = Parameter("uid") +@flow +def general_data_validation(beamline_acronym, uid): read_all_streams(beamline_acronym, uid)