Skip to content

Commit

Permalink
First working sensor. NOTE: May trigger sufficiency pipeline more tha…
Browse files Browse the repository at this point in the history
…n once at the moment
  • Loading branch information
cyramic committed Apr 11, 2024
1 parent d115dc4 commit 062b6ff
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 40 deletions.
3 changes: 2 additions & 1 deletion liiatools_pipeline/ops/sufficiency903.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from dagster import op


@op
def ons_area():
create_dimONSArea()
create_dimONSArea()
7 changes: 4 additions & 3 deletions liiatools_pipeline/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from liiatools_pipeline.jobs.external_dataset import external_incoming
from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency
from liiatools_pipeline.sensors.location_sensor import location_sensor
#from liiatools_pipeline.sensors.sufficiency_sensor import sufficiency_sensor
from liiatools_pipeline.sensors.sufficiency_sensor import sufficiency_sensor

register()

Expand All @@ -20,8 +20,9 @@ def sync():
"""
jobs = [ssda903_incoming, external_incoming, ssda903_sufficiency]
schedules = []
sensors = [location_sensor]
sensors = [location_sensor, sufficiency_sensor]

return jobs + schedules + sensors

#sufficiency_sensor

# sufficiency_sensor
2 changes: 1 addition & 1 deletion liiatools_pipeline/sensors/location_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def generate_run_key(folder_location, files):

@sensor(
job=ssda903_incoming,
minimum_interval_seconds=300,
minimum_interval_seconds=60,
description="Monitors Specified Location for 903 Files",
default_status=DefaultSensorStatus.RUNNING,
)
Expand Down
30 changes: 13 additions & 17 deletions liiatools_pipeline/sensors/sufficiency_sensor.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
from dagster import RunRequest, run_status_sensor, sensor, DagsterRunStatus, RunConfig, DefaultSensorStatus

from dagster import RunRequest, RunsFilter, DagsterRunStatus, sensor
from liiatools_pipeline.jobs.ssda903 import ssda903_incoming
from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency

'''
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
description="Adds in ONS and census data once 903 pipeline completes",
request_job=ssda903_incoming,
)
@sensor(
job=ssda903_sufficiency,
minimum_interval_seconds=300,
default_status=DefaultSensorStatus.RUNNING,
)

@sensor(job=ssda903_sufficiency)
def sufficiency_sensor(context):
yield RunRequest(
run_key=None,
run_config=RunConfig(),
)'''
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name=ssda903_incoming.name,
statuses=[DagsterRunStatus.SUCCESS],
),
order_by="update_timestamp",
ascending=False,
)
for run_record in run_records:
yield RunRequest(run_key=run_record.dagster_run.run_id)
34 changes: 17 additions & 17 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ fs = "^2.4.16"
fs-s3fs = "^1.1.1"
backports-strenum = "^1.2.4"
jupyterlab = {version = "^4.0.6", optional = true}
dagster = "^1.6.11"
dagster = "1.6.14"
dagster-webserver = "^1.6.11"
dagster-docker = "^0.22.11"
dagster-postgres = "^0.22.11"
Expand Down

0 comments on commit 062b6ff

Please sign in to comment.