Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/sufficiency sensor #5

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions liiatools_pipeline/assets/external_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,12 @@
def external_data_folder():
external_data_location = env_config("EXTERNAL_DATA_LOCATION", cast=str)
return open_fs(external_data_location)


@asset
def ons_data_location():
return env_config("ONS_DATA_URL", cast=str)

@asset
def ofsted_data_location():
return env_config("OFSTED_DATA_URL", cast=str)
7 changes: 4 additions & 3 deletions liiatools_pipeline/jobs/external_dataset.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dagster import job

from decouple import config as env_config
from liiatools_pipeline.ops import external_dataset as ed

from liiatools_pipeline.assets import external_dataset as ed_assets

@job
def external_incoming():
ed.request_dataset()
ed.request_dataset(ed_assets.ons_data_location())
ed.request_dataset(ed_assets.ofsted_data_location())
7 changes: 7 additions & 0 deletions liiatools_pipeline/jobs/sufficiency_903.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dagster import job
from liiatools_pipeline.ops.sufficiency903 import ons_area


@job
def ssda903_sufficiency():
ons_area()
6 changes: 3 additions & 3 deletions liiatools_pipeline/ops/external_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
from liiatools_pipeline.assets.external_dataset import external_data_folder


@op()
def request_dataset():
retrieve_dataset("http://localhost:8080/sample.csv", external_data_folder)
@op
def request_dataset(url):
retrieve_dataset(url, external_data_folder())
16 changes: 16 additions & 0 deletions liiatools_pipeline/ops/sufficiency903.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from sufficiency_data_transform.all_dim_and_fact import (
create_dimONSArea,
create_dimLookedAfterChild,
create_dimOfstedProvider,
create_dimPostcode,
create_factEpisode,
create_factOfstedInspection,
create_dim_tables,
)

from dagster import op


@op
def ons_area():
create_dimONSArea()
9 changes: 7 additions & 2 deletions liiatools_pipeline/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from liiatools_pipeline.jobs.ssda903 import ssda903_incoming
from liiatools_pipeline.jobs.external_dataset import external_incoming
from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency
from liiatools_pipeline.sensors.location_sensor import location_sensor
from liiatools_pipeline.sensors.sufficiency_sensor import sufficiency_sensor

register()

Expand All @@ -16,8 +18,11 @@ def sync():
For hints on building your Dagster repository, see our documentation overview on Repositories:
https://docs.dagster.io/overview/repositories-workspaces/repositories
"""
jobs = [ssda903_incoming, external_incoming]
jobs = [ssda903_incoming, external_incoming, ssda903_sufficiency]
schedules = []
sensors = [location_sensor]
sensors = [location_sensor, sufficiency_sensor]

return jobs + schedules + sensors


# sufficiency_sensor
2 changes: 1 addition & 1 deletion liiatools_pipeline/sensors/location_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def generate_run_key(folder_location, files):

@sensor(
job=ssda903_incoming,
minimum_interval_seconds=300,
minimum_interval_seconds=60,
description="Monitors Specified Location for 903 Files",
default_status=DefaultSensorStatus.RUNNING,
)
Expand Down
17 changes: 17 additions & 0 deletions liiatools_pipeline/sensors/sufficiency_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dagster import RunRequest, RunsFilter, DagsterRunStatus, sensor
from liiatools_pipeline.jobs.ssda903 import ssda903_incoming
from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency


@sensor(job=ssda903_sufficiency)
def sufficiency_sensor(context):
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name=ssda903_incoming.name,
statuses=[DagsterRunStatus.SUCCESS],
),
order_by="update_timestamp",
ascending=False,
)
for run_record in run_records:
yield RunRequest(run_key=run_record.dagster_run.run_id)
Loading
Loading