From e7c323c2122ba6e7b324a539ffdcb96568538320 Mon Sep 17 00:00:00 2001 From: patrick-troy <58770937+patrick-troy@users.noreply.github.com> Date: Mon, 9 Sep 2024 13:11:19 +0300 Subject: [PATCH] update sensors to run through all necessary datasets (#35) * update sensors to run through all datasets * add error catching in tags --- .../sensors/job_success_sensor.py | 107 +++++++++++------- .../sensors/location_schedule.py | 2 + 2 files changed, 67 insertions(+), 42 deletions(-) diff --git a/liiatools_pipeline/sensors/job_success_sensor.py b/liiatools_pipeline/sensors/job_success_sensor.py index eb89e19..22dd554 100644 --- a/liiatools_pipeline/sensors/job_success_sensor.py +++ b/liiatools_pipeline/sensors/job_success_sensor.py @@ -19,14 +19,21 @@ from liiatools_pipeline.ops.common_config import CleanConfig -def find_previous_matching_ssda903_run(run_records): - previous_run_id = [ - run.dagster_run.run_id - for run in run_records - if "ssda903" == run.dagster_run.tags["dataset"] - ] - - previous_run_id = previous_run_id[0] +def find_previous_matching_dataset_run(run_records, dataset, context): + try: + previous_run_id = [ + run.dagster_run.run_id + for run in run_records + if dataset == run.dagster_run.tags["dataset"] + ] + + except KeyError: + context.log.error( + f"'dataset' not found in tags. No previous run id found" + ) + previous_run_id = None + + previous_run_id = previous_run_id[0] if previous_run_id else None return previous_run_id @@ -46,14 +53,17 @@ def move_current_sensor(context): ) if run_records: # Ensure there is at least one run record - latest_run_record = run_records[0] # Get the most recent run record - dataset = latest_run_record.dagster_run.run_config["ops"]["process_files"][ - "config" - ]["dataset"] - yield RunRequest( - run_key=latest_run_record.dagster_run.run_id, - tags={"dataset": dataset}, - ) + allowed_datasets = env_config("ALLOWED_DATASETS").split(",") + for dataset in allowed_datasets: + latest_run_id = find_previous_matching_dataset_run( + run_records, + dataset, + context, + ) # Get the most recent dataset run id + yield RunRequest( + run_key=latest_run_id, + tags={"dataset": dataset}, + ) @sensor( @@ -72,21 +82,26 @@ def concatenate_sensor(context): ) if run_records: # Ensure there is at least one run record - latest_run_record = run_records[0] # Get the most recent run record - dataset = latest_run_record.dagster_run.tags["dataset"] - concat_config = CleanConfig( - dataset=dataset, - ) - yield RunRequest( - run_key=latest_run_record.dagster_run.run_id, - run_config=RunConfig( - ops={ - "open_current": concat_config, - "create_concatenated_view": concat_config, - } - ), - tags={"dataset": dataset}, - ) + allowed_datasets = env_config("ALLOWED_DATASETS").split(",") + for dataset in allowed_datasets: + latest_run_id = find_previous_matching_dataset_run( + run_records, + dataset, + context, + ) # Get the most recent dataset run id + concat_config = CleanConfig( + dataset=dataset, + ) + yield RunRequest( + run_key=latest_run_id, + run_config=RunConfig( + ops={ + "open_current": concat_config, + "create_concatenated_view": concat_config, + } + ), + tags={"dataset": dataset}, + ) @sensor( @@ -104,10 +119,12 @@ def ssda903_fix_episodes_sensor(context): ascending=False, ) - if run_records: # Ensure there is at least one run record - latest_run_id = find_previous_matching_ssda903_run( - run_records - ) # Get the most recent ssda903 run record + latest_run_id = find_previous_matching_dataset_run( + run_records, + "ssda903", + context, + ) # Get the most recent ssda903 run id + if latest_run_id: # Ensure there is at least one ssda903 run record yield RunRequest(run_key=latest_run_id, run_config=RunConfig()) @@ -149,11 +166,13 @@ def move_current_and_concat_sensor(context): ) if run_records: # Ensure there is at least one run record - latest_run_record = run_records[0] # Get the most recent run record - allowed_datasets = env_config("ALLOWED_DATASETS").split(",") - context.log.info(f"Allowed datasets: {allowed_datasets}") for dataset in allowed_datasets: + latest_run_id = find_previous_matching_dataset_run( + run_records, + dataset, + context, + ) # Get the most recent dataset run id clean_config = CleanConfig( dataset_folder=None, la_folder=None, @@ -161,7 +180,7 @@ def move_current_and_concat_sensor(context): dataset=dataset, ) yield RunRequest( - run_key=latest_run_record.dagster_run.run_id, + run_key=latest_run_id, run_config=RunConfig( ops={ "move_current_and_concat_view": clean_config, @@ -185,8 +204,12 @@ def sufficiency_sensor(context): ascending=False, ) - if run_records: # Ensure there is at least one run record - latest_run_record = run_records[0] # Get the most recent run record + latest_run_id = find_previous_matching_dataset_run( + run_records, + "ssda903", + context, + ) # Get the most recent ssda903 run id + if latest_run_id: # Ensure there is at least one ssda903 run record yield RunRequest( - run_key=latest_run_record.dagster_run.run_id, + run_key=latest_run_id, ) diff --git a/liiatools_pipeline/sensors/location_schedule.py b/liiatools_pipeline/sensors/location_schedule.py index 8e00d03..c627197 100644 --- a/liiatools_pipeline/sensors/location_schedule.py +++ b/liiatools_pipeline/sensors/location_schedule.py @@ -160,6 +160,7 @@ def clean_schedule(context): context.log.info("Differences found, executing run") yield RunRequest( run_key=run_key, + tags={"dataset": dataset}, run_config=RunConfig( ops={ "create_session_folder": clean_config, @@ -221,6 +222,7 @@ def reports_schedule(context): context.log.info("Differences found, executing run") yield RunRequest( run_key=run_key, + tags={"dataset": dataset}, run_config=RunConfig( ops={ "create_org_session_folder": clean_config,