Skip to content

Commit

Permalink
Initial pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
cyramic committed Apr 10, 2024
1 parent 14201c9 commit cbaad3c
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 277 deletions.
9 changes: 9 additions & 0 deletions liiatools_pipeline/assets/external_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,12 @@
def external_data_folder():
external_data_location = env_config("EXTERNAL_DATA_LOCATION", cast=str)
return open_fs(external_data_location)


@asset
def ons_data_location():
return env_config("ONS_DATA_URL", cast=str)

@asset
def ofsted_data_location():
return env_config("OFSTED_DATA_URL", cast=str)
7 changes: 4 additions & 3 deletions liiatools_pipeline/jobs/external_dataset.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from dagster import job

from decouple import config as env_config
from liiatools_pipeline.ops import external_dataset as ed

from liiatools_pipeline.assets import external_dataset as ed_assets

@job
def external_incoming():
ed.request_dataset()
ed.request_dataset(ed_assets.ons_data_location())
ed.request_dataset(ed_assets.ofsted_data_location())
9 changes: 9 additions & 0 deletions liiatools_pipeline/jobs/sufficiency_903.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from sufficiency_data_transform.all_dim_and_fact import create_sufficiency_data
from dagster import job
from liiatools_pipeline.ops import ssda903


@job
def ssda903_sufficiency():
print("Hi")
create_sufficiency_data()
6 changes: 3 additions & 3 deletions liiatools_pipeline/ops/external_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@
from liiatools_pipeline.assets.external_dataset import external_data_folder


@op()
def request_dataset():
retrieve_dataset("http://localhost:8080/sample.csv", external_data_folder)
@op
def request_dataset(url):
retrieve_dataset(url, external_data_folder())
6 changes: 5 additions & 1 deletion liiatools_pipeline/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

from liiatools_pipeline.jobs.ssda903 import ssda903_incoming
from liiatools_pipeline.jobs.external_dataset import external_incoming
from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency
from liiatools_pipeline.sensors.location_sensor import location_sensor
#from liiatools_pipeline.sensors.sufficiency_sensor import sufficiency_sensor

register()

Expand All @@ -16,8 +18,10 @@ def sync():
For hints on building your Dagster repository, see our documentation overview on Repositories:
https://docs.dagster.io/overview/repositories-workspaces/repositories
"""
jobs = [ssda903_incoming, external_incoming]
jobs = [ssda903_incoming, external_incoming, ssda903_sufficiency]
schedules = []
sensors = [location_sensor]

return jobs + schedules + sensors

#sufficiency_sensor
21 changes: 21 additions & 0 deletions liiatools_pipeline/sensors/sufficiency_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from dagster import RunRequest, run_status_sensor, sensor, DagsterRunStatus, RunConfig, DefaultSensorStatus

from liiatools_pipeline.jobs.ssda903 import ssda903_incoming
from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency

'''
@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
description="Adds in ONS and census data once 903 pipeline completes",
request_job=ssda903_incoming,
)
@sensor(
job=ssda903_sufficiency,
minimum_interval_seconds=300,
default_status=DefaultSensorStatus.RUNNING,
)
def sufficiency_sensor(context):
yield RunRequest(
run_key=None,
run_config=RunConfig(),
)'''
Loading

0 comments on commit cbaad3c

Please sign in to comment.