Skip to content

Commit

Permalink
Adding prefect2 changes
Browse files Browse the repository at this point in the history
  • Loading branch information
lcdesilva committed Apr 14, 2023
1 parent 6fcb8d7 commit 1c8ea30
Showing 1 changed file with 7 additions and 14 deletions.
21 changes: 7 additions & 14 deletions data_validation.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
from datetime import timedelta
import prefect
from prefect import task, Flow, Parameter
from prefect import task, flow, get_run_logger
import time as ttime
from tiled.client import from_profile

tiled_client = from_profile("nsls2", username=None)

@task(max_retries=2, retry_delay=timedelta(seconds=10))
@task(retries=2, retry_delay_seconds=10)
def read_all_streams(beamline_acronym, uid):
logger = prefect.context.get("logger")
c = from_profile("nsls2", username=None)
try:
run = c[beamline_acronym][uid]
except KeyError:
run = c[beamline_acronym]["raw"][uid]
logger = get_run_logger()
run = tiled_client[beamline_acronym]["raw"][uid]
logger.info(f"Validating uid {run.start['uid']}")
start_time = ttime.monotonic()
for stream in run:
Expand All @@ -25,8 +20,6 @@ def read_all_streams(beamline_acronym, uid):
elapsed_time = ttime.monotonic() - start_time
logger.info(f"{elapsed_time = }")


with Flow("general-data-validation") as flow:
beamline_acronym = Parameter("beamline_acronym")
uid = Parameter("uid")
@flow
def general_data_validation(beamline_acronym, uid):
read_all_streams(beamline_acronym, uid)

0 comments on commit 1c8ea30

Please sign in to comment.