Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/sufficiency pipeline #7

Merged
merged 9 commits into from
Apr 17, 2024
10 changes: 10 additions & 0 deletions liiatools_pipeline/assets/external_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,13 @@
def external_data_folder():
external_data_location = env_config("EXTERNAL_DATA_LOCATION", cast=str)
return open_fs(external_data_location)


@asset
def ons_data_location():
return env_config("ONS_DATA_URL", cast=str)


@asset
def ofsted_data_location():
return env_config("OFSTED_DATA_URL", cast=str)
6 changes: 4 additions & 2 deletions liiatools_pipeline/jobs/external_dataset.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from dagster import job

from decouple import config as env_config
from liiatools_pipeline.ops import external_dataset as ed
from liiatools_pipeline.assets import external_dataset as ed_assets


@job
def external_incoming():
ed.request_dataset()
ed.request_dataset(ed_assets.ons_data_location())
ed.request_dataset(ed_assets.ofsted_data_location())
21 changes: 21 additions & 0 deletions liiatools_pipeline/jobs/sufficiency_903.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dagster import job
from liiatools_pipeline.ops.sufficiency903 import (
dim_tables,
ons_area,
looked_after_child,
ofsted_provider,
postcode,
episode,
ofsted_inspection,
)


@job()
def ssda903_sufficiency():
dim = dim_tables()
area = ons_area()
lac = looked_after_child(area)
prov = ofsted_provider(area)
pc = postcode()
episode(area, lac, pc, prov, dim)
ofsted_inspection(dim, prov)
6 changes: 3 additions & 3 deletions liiatools_pipeline/ops/external_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
from liiatools_pipeline.assets.external_dataset import external_data_folder


@op()
def request_dataset():
retrieve_dataset("http://localhost:8080/sample.csv", external_data_folder)
@op
def request_dataset(url):
retrieve_dataset(url, external_data_folder())
52 changes: 52 additions & 0 deletions liiatools_pipeline/ops/sufficiency903.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from sufficiency_data_transform.all_dim_and_fact import (
create_dim_tables,
create_dimONSArea,
create_dimLookedAfterChild,
create_dimOfstedProvider,
create_dimPostcode,
create_factEpisode,
create_factOfstedInspection,
)

from dagster import op


@op
def dim_tables():
create_dim_tables()
return True


@op
def ons_area():
create_dimONSArea()
return True



@op
def looked_after_child(area):
create_dimLookedAfterChild()
return True


@op
def ofsted_provider(area):
create_dimOfstedProvider()
return True


@op
def postcode():
create_dimPostcode()
return True


@op
def episode(area, lac, pc, prov, dim):
create_factEpisode()


@op
def ofsted_inspection(dim, prov):
create_factOfstedInspection()
9 changes: 7 additions & 2 deletions liiatools_pipeline/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from liiatools_pipeline.jobs.ssda903 import ssda903_incoming
from liiatools_pipeline.jobs.external_dataset import external_incoming
from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency
from liiatools_pipeline.sensors.location_sensor import location_sensor
from liiatools_pipeline.sensors.sufficiency_sensor import sufficiency_sensor

register()

Expand All @@ -16,8 +18,11 @@ def sync():
For hints on building your Dagster repository, see our documentation overview on Repositories:
https://docs.dagster.io/overview/repositories-workspaces/repositories
"""
jobs = [ssda903_incoming, external_incoming]
jobs = [ssda903_incoming, external_incoming, ssda903_sufficiency]
schedules = []
sensors = [location_sensor]
sensors = [location_sensor, sufficiency_sensor]

return jobs + schedules + sensors


# sufficiency_sensor
2 changes: 1 addition & 1 deletion liiatools_pipeline/sensors/location_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def generate_run_key(folder_location, files):

@sensor(
job=ssda903_incoming,
minimum_interval_seconds=300,
minimum_interval_seconds=60,
description="Monitors Specified Location for 903 Files",
default_status=DefaultSensorStatus.RUNNING,
)
Expand Down
17 changes: 17 additions & 0 deletions liiatools_pipeline/sensors/sufficiency_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dagster import RunRequest, RunsFilter, DagsterRunStatus, sensor
from liiatools_pipeline.jobs.ssda903 import ssda903_incoming
from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency


@sensor(job=ssda903_sufficiency)
def sufficiency_sensor(context):
run_records = context.instance.get_run_records(
filters=RunsFilter(
job_name=ssda903_incoming.name,
statuses=[DagsterRunStatus.SUCCESS],
),
order_by="update_timestamp",
ascending=False,
)
for run_record in run_records:
yield RunRequest(run_key=run_record.dagster_run.run_id)
Loading
Loading