diff --git a/liiatools_pipeline/ops/sufficiency903.py b/liiatools_pipeline/ops/sufficiency903.py index 0024e098..0b87bbe7 100644 --- a/liiatools_pipeline/ops/sufficiency903.py +++ b/liiatools_pipeline/ops/sufficiency903.py @@ -10,6 +10,7 @@ from dagster import op + @op def ons_area(): - create_dimONSArea() \ No newline at end of file + create_dimONSArea() diff --git a/liiatools_pipeline/repository.py b/liiatools_pipeline/repository.py index 53d3488c..aff00cea 100644 --- a/liiatools_pipeline/repository.py +++ b/liiatools_pipeline/repository.py @@ -5,7 +5,7 @@ from liiatools_pipeline.jobs.external_dataset import external_incoming from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency from liiatools_pipeline.sensors.location_sensor import location_sensor -#from liiatools_pipeline.sensors.sufficiency_sensor import sufficiency_sensor +from liiatools_pipeline.sensors.sufficiency_sensor import sufficiency_sensor register() @@ -20,8 +20,9 @@ def sync(): """ jobs = [ssda903_incoming, external_incoming, ssda903_sufficiency] schedules = [] - sensors = [location_sensor] + sensors = [location_sensor, sufficiency_sensor] return jobs + schedules + sensors -#sufficiency_sensor \ No newline at end of file + +# sufficiency_sensor diff --git a/liiatools_pipeline/sensors/location_sensor.py b/liiatools_pipeline/sensors/location_sensor.py index 02aa73c3..149a081c 100644 --- a/liiatools_pipeline/sensors/location_sensor.py +++ b/liiatools_pipeline/sensors/location_sensor.py @@ -47,7 +47,7 @@ def generate_run_key(folder_location, files): @sensor( job=ssda903_incoming, - minimum_interval_seconds=300, + minimum_interval_seconds=60, description="Monitors Specified Location for 903 Files", default_status=DefaultSensorStatus.RUNNING, ) diff --git a/liiatools_pipeline/sensors/sufficiency_sensor.py b/liiatools_pipeline/sensors/sufficiency_sensor.py index 83afd597..5cc0eeb4 100644 --- a/liiatools_pipeline/sensors/sufficiency_sensor.py +++ b/liiatools_pipeline/sensors/sufficiency_sensor.py @@ -1,21 +1,17 @@ -from dagster import RunRequest, run_status_sensor, sensor, DagsterRunStatus, RunConfig, DefaultSensorStatus - +from dagster import RunRequest, RunsFilter, DagsterRunStatus, sensor from liiatools_pipeline.jobs.ssda903 import ssda903_incoming from liiatools_pipeline.jobs.sufficiency_903 import ssda903_sufficiency -''' -@run_status_sensor( - run_status=DagsterRunStatus.SUCCESS, - description="Adds in ONS and census data once 903 pipeline completes", - request_job=ssda903_incoming, -) -@sensor( - job=ssda903_sufficiency, - minimum_interval_seconds=300, - default_status=DefaultSensorStatus.RUNNING, -) + +@sensor(job=ssda903_sufficiency) def sufficiency_sensor(context): - yield RunRequest( - run_key=None, - run_config=RunConfig(), - )''' \ No newline at end of file + run_records = context.instance.get_run_records( + filters=RunsFilter( + job_name=ssda903_incoming.name, + statuses=[DagsterRunStatus.SUCCESS], + ), + order_by="update_timestamp", + ascending=False, + ) + for run_record in run_records: + yield RunRequest(run_key=run_record.dagster_run.run_id) diff --git a/poetry.lock b/poetry.lock index ca0e6df4..75e56154 100644 --- a/poetry.lock +++ b/poetry.lock @@ -344,18 +344,18 @@ css = ["tinycss2 (>=1.1.0,<1.3)"] [[package]] name = "boto3" -version = "1.34.80" +version = "1.34.82" description = "The AWS SDK for Python" category = "main" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.34.80-py3-none-any.whl", hash = "sha256:bb8f433c04dcdffbd4a802df56c1c30f2be23b1161fd8fb45e4b76c1487ec122"}, - {file = "boto3-1.34.80.tar.gz", hash = "sha256:5627f6ecadb46fc7c9f8c368baf948f1b00a3fd2f8eb1275c254469853ad8fdb"}, + {file = "boto3-1.34.82-py3-none-any.whl", hash = "sha256:6e0ee12e87b37fa81133e9308d0957fce4200c1ff37c96346538dba5e857da18"}, + {file = "boto3-1.34.82.tar.gz", hash = "sha256:fcdb84936b04d5f78c8c8667b65bf5b9803cf39fd25bb7fe57ba237074e36171"}, ] [package.dependencies] -botocore = ">=1.34.80,<1.35.0" +botocore = ">=1.34.82,<1.35.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -364,14 +364,14 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.34.80" +version = "1.34.82" description = "Low-level, data-driven core of boto 3." category = "main" optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.34.80-py3-none-any.whl", hash = "sha256:354a00f03faba52acc6f1a84fa4f035d48541633be98ccc24b59dc544f679f8b"}, - {file = "botocore-1.34.80.tar.gz", hash = "sha256:8402262e819f3d46df504bbd781e770858c0130b90f660699f75ef3a63abca5a"}, + {file = "botocore-1.34.82-py3-none-any.whl", hash = "sha256:8f839e9a88e7ac7185e406be4cf9926673374e8a6ecc295302f56f7e3c618692"}, + {file = "botocore-1.34.82.tar.gz", hash = "sha256:2fd14676152f9d64541099090cc64973fdf8232744256454de443583e35e497d"}, ] [package.dependencies] @@ -1643,14 +1643,14 @@ pyreadline3 = {version = "*", markers = "sys_platform == \"win32\" and python_ve [[package]] name = "idna" -version = "3.6" +version = "3.7" description = "Internationalized Domain Names in Applications (IDNA)" category = "main" optional = false python-versions = ">=3.5" files = [ - {file = "idna-3.6-py3-none-any.whl", hash = "sha256:c05567e9c24a6b9faaa835c4821bad0590fbb9d5779e7caa6e1cc4978e7eb24f"}, - {file = "idna-3.6.tar.gz", hash = "sha256:9ecdbbd083b06798ae1e86adcbfe8ab1479cf864e4ee30fe4e46a003d12491ca"}, + {file = "idna-3.7-py3-none-any.whl", hash = "sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0"}, + {file = "idna-3.7.tar.gz", hash = "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"}, ] [[package]] @@ -1976,14 +1976,14 @@ test = ["click", "pre-commit", "pytest (>=7.0)", "pytest-asyncio (>=0.19.0)", "p [[package]] name = "jupyter-lsp" -version = "2.2.4" +version = "2.2.5" description = "Multi-Language Server WebSocket proxy for Jupyter Notebook/Lab server" category = "main" optional = true python-versions = ">=3.8" files = [ - {file = "jupyter-lsp-2.2.4.tar.gz", hash = "sha256:5e50033149344065348e688608f3c6d654ef06d9856b67655bd7b6bac9ee2d59"}, - {file = "jupyter_lsp-2.2.4-py3-none-any.whl", hash = "sha256:da61cb63a16b6dff5eac55c2699cc36eac975645adee02c41bdfc03bf4802e77"}, + {file = "jupyter-lsp-2.2.5.tar.gz", hash = "sha256:793147a05ad446f809fd53ef1cd19a9f5256fd0a2d6b7ce943a982cb4f545001"}, + {file = "jupyter_lsp-2.2.5-py3-none-any.whl", hash = "sha256:45fbddbd505f3fbfb0b6cb2f1bc5e15e83ab7c79cd6e89416b248cb3c00c11da"}, ] [package.dependencies] @@ -2229,14 +2229,14 @@ source = ["Cython (==0.29.37)"] [[package]] name = "mako" -version = "1.3.2" +version = "1.3.3" description = "A super-fast templating language that borrows the best ideas from the existing templating languages." category = "main" optional = false python-versions = ">=3.8" files = [ - {file = "Mako-1.3.2-py3-none-any.whl", hash = "sha256:32a99d70754dfce237019d17ffe4a282d2d3351b9c476e90d8a60e63f133b80c"}, - {file = "Mako-1.3.2.tar.gz", hash = "sha256:2a0c8ad7f6274271b3bb7467dd37cf9cc6dab4bc19cb69a4ef10669402de698e"}, + {file = "Mako-1.3.3-py3-none-any.whl", hash = "sha256:5324b88089a8978bf76d1629774fcc2f1c07b82acdf00f4c5dd8ceadfffc4b40"}, + {file = "Mako-1.3.3.tar.gz", hash = "sha256:e16c01d9ab9c11f7290eef1cfefc093fb5a45ee4a3da09e2fec2e4d1bae54e73"}, ] [package.dependencies] @@ -5261,4 +5261,4 @@ s3 = ["fs-s3fs"] [metadata] lock-version = "2.0" python-versions = ">=3.10, <3.11" -content-hash = "febd86f6588a1c86c74deead538cd4ad93a9ee86b2f9eba4afc72ec383fcb3cf" +content-hash = "0651766fbbcad4aad627b328e5ca618ca2e9651fed990f14fc044eb3c6da6185" diff --git a/pyproject.toml b/pyproject.toml index a8e5c79c..3715dbee 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ fs = "^2.4.16" fs-s3fs = "^1.1.1" backports-strenum = "^1.2.4" jupyterlab = {version = "^4.0.6", optional = true} -dagster = "^1.6.11" +dagster = "1.6.14" dagster-webserver = "^1.6.11" dagster-docker = "^0.22.11" dagster-postgres = "^0.22.11"