diff --git a/.prefectignore b/.prefectignore new file mode 100644 index 0000000..7ce5ed8 --- /dev/null +++ b/.prefectignore @@ -0,0 +1,41 @@ +# prefect artifacts +.prefectignore + +# python artifacts +__pycache__/ +*.py[cod] +*$py.class +*.egg-info/ +*.egg + +# Type checking artifacts +.mypy_cache/ +.dmypy.json +dmypy.json +.pyre/ + +# IPython +profile_default/ +ipython_config.py +*.ipynb_checkpoints/* + +# Environments +.python-version +.env +.venv +env/ +venv/ + +# MacOS +.DS_Store + +# Dask +dask-worker-space/ + +# Editors +.idea/ +.vscode/ + +# VCS +.git/ +.hg/ diff --git a/data_validation.py b/data_validation.py new file mode 100644 index 0000000..4c95659 --- /dev/null +++ b/data_validation.py @@ -0,0 +1,25 @@ +from prefect import task, flow, get_run_logger +import time as ttime +from tiled.client import from_profile + +tiled_client = from_profile("nsls2", username=None) + +@task(retries=2, retry_delay_seconds=10) +def read_all_streams(beamline_acronym, uid): + logger = get_run_logger() + run = tiled_client[beamline_acronym]["raw"][uid] + logger.info(f"Validating uid {run.start['uid']}") + start_time = ttime.monotonic() + for stream in run: + logger.info(f"{stream}:") + stream_start_time = ttime.monotonic() + stream_data = run[stream].read() + stream_elapsed_time = ttime.monotonic() - stream_start_time + logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") + logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") + elapsed_time = ttime.monotonic() - start_time + logger.info(f"{elapsed_time = }") + +@flow +def general_data_validation(beamline_acronym, uid): + read_all_streams(beamline_acronym, uid)