Skip to content

Commit

Permalink
update sensors to run through all necessary datasets (#35)
Browse files Browse the repository at this point in the history
* update sensors to run through all datasets

* add error catching in tags
  • Loading branch information
patrick-troy authored Sep 9, 2024
1 parent 3d63f85 commit e7c323c
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 42 deletions.
107 changes: 65 additions & 42 deletions liiatools_pipeline/sensors/job_success_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@
from liiatools_pipeline.ops.common_config import CleanConfig


def find_previous_matching_ssda903_run(run_records):
previous_run_id = [
run.dagster_run.run_id
for run in run_records
if "ssda903" == run.dagster_run.tags["dataset"]
]

previous_run_id = previous_run_id[0]
def find_previous_matching_dataset_run(run_records, dataset, context):
try:
previous_run_id = [
run.dagster_run.run_id
for run in run_records
if dataset == run.dagster_run.tags["dataset"]
]

except KeyError:
context.log.error(
f"'dataset' not found in tags. No previous run id found"
)
previous_run_id = None

previous_run_id = previous_run_id[0] if previous_run_id else None
return previous_run_id


Expand All @@ -46,14 +53,17 @@ def move_current_sensor(context):
)

if run_records: # Ensure there is at least one run record
latest_run_record = run_records[0] # Get the most recent run record
dataset = latest_run_record.dagster_run.run_config["ops"]["process_files"][
"config"
]["dataset"]
yield RunRequest(
run_key=latest_run_record.dagster_run.run_id,
tags={"dataset": dataset},
)
allowed_datasets = env_config("ALLOWED_DATASETS").split(",")
for dataset in allowed_datasets:
latest_run_id = find_previous_matching_dataset_run(
run_records,
dataset,
context,
) # Get the most recent dataset run id
yield RunRequest(
run_key=latest_run_id,
tags={"dataset": dataset},
)


@sensor(
Expand All @@ -72,21 +82,26 @@ def concatenate_sensor(context):
)

if run_records: # Ensure there is at least one run record
latest_run_record = run_records[0] # Get the most recent run record
dataset = latest_run_record.dagster_run.tags["dataset"]
concat_config = CleanConfig(
dataset=dataset,
)
yield RunRequest(
run_key=latest_run_record.dagster_run.run_id,
run_config=RunConfig(
ops={
"open_current": concat_config,
"create_concatenated_view": concat_config,
}
),
tags={"dataset": dataset},
)
allowed_datasets = env_config("ALLOWED_DATASETS").split(",")
for dataset in allowed_datasets:
latest_run_id = find_previous_matching_dataset_run(
run_records,
dataset,
context,
) # Get the most recent dataset run id
concat_config = CleanConfig(
dataset=dataset,
)
yield RunRequest(
run_key=latest_run_id,
run_config=RunConfig(
ops={
"open_current": concat_config,
"create_concatenated_view": concat_config,
}
),
tags={"dataset": dataset},
)


@sensor(
Expand All @@ -104,10 +119,12 @@ def ssda903_fix_episodes_sensor(context):
ascending=False,
)

if run_records: # Ensure there is at least one run record
latest_run_id = find_previous_matching_ssda903_run(
run_records
) # Get the most recent ssda903 run record
latest_run_id = find_previous_matching_dataset_run(
run_records,
"ssda903",
context,
) # Get the most recent ssda903 run id
if latest_run_id: # Ensure there is at least one ssda903 run record
yield RunRequest(run_key=latest_run_id, run_config=RunConfig())


Expand Down Expand Up @@ -149,19 +166,21 @@ def move_current_and_concat_sensor(context):
)

if run_records: # Ensure there is at least one run record
latest_run_record = run_records[0] # Get the most recent run record

allowed_datasets = env_config("ALLOWED_DATASETS").split(",")
context.log.info(f"Allowed datasets: {allowed_datasets}")
for dataset in allowed_datasets:
latest_run_id = find_previous_matching_dataset_run(
run_records,
dataset,
context,
) # Get the most recent dataset run id
clean_config = CleanConfig(
dataset_folder=None,
la_folder=None,
input_la_code=None,
dataset=dataset,
)
yield RunRequest(
run_key=latest_run_record.dagster_run.run_id,
run_key=latest_run_id,
run_config=RunConfig(
ops={
"move_current_and_concat_view": clean_config,
Expand All @@ -185,8 +204,12 @@ def sufficiency_sensor(context):
ascending=False,
)

if run_records: # Ensure there is at least one run record
latest_run_record = run_records[0] # Get the most recent run record
latest_run_id = find_previous_matching_dataset_run(
run_records,
"ssda903",
context,
) # Get the most recent ssda903 run id
if latest_run_id: # Ensure there is at least one ssda903 run record
yield RunRequest(
run_key=latest_run_record.dagster_run.run_id,
run_key=latest_run_id,
)
2 changes: 2 additions & 0 deletions liiatools_pipeline/sensors/location_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def clean_schedule(context):
context.log.info("Differences found, executing run")
yield RunRequest(
run_key=run_key,
tags={"dataset": dataset},
run_config=RunConfig(
ops={
"create_session_folder": clean_config,
Expand Down Expand Up @@ -221,6 +222,7 @@ def reports_schedule(context):
context.log.info("Differences found, executing run")
yield RunRequest(
run_key=run_key,
tags={"dataset": dataset},
run_config=RunConfig(
ops={
"create_org_session_folder": clean_config,
Expand Down

0 comments on commit e7c323c

Please sign in to comment.