From b49eec6e8089c57305a314bd9ebb62d5faefa785 Mon Sep 17 00:00:00 2001 From: chiaberry Date: Mon, 4 Dec 2023 16:27:08 -0600 Subject: [PATCH 1/4] add second task to incident dag --- dags/atd_traffic_incident_reports.py | 38 +++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/dags/atd_traffic_incident_reports.py b/dags/atd_traffic_incident_reports.py index 13cd71c..c73dd1b 100644 --- a/dags/atd_traffic_incident_reports.py +++ b/dags/atd_traffic_incident_reports.py @@ -52,23 +52,44 @@ "opitem": "atd-traffic-incident-reports", "opfield": "production.Postgrest Endpoint", }, + # Socrata + "SOCRATA_RESOURCE_ID": { + "opitem": "atd-traffic-incident-reports", + "opfield": "production.Socrata Resource ID" + }, + "SOCRATA_API_KEY_ID": { + "opitem": "Socrata Key ID, Secret, and Token", + "opfield": "socrata.apiKeyId", + }, + "SOCRATA_API_KEY_SECRET": { + "opitem": "Socrata Key ID, Secret, and Token", + "opfield": "socrata.apiKeySecret", + }, + "SOCRATA_APP_TOKEN": { + "opitem": "Socrata Key ID, Secret, and Token", + "opfield": "socrata.appToken", + }, + } with DAG( dag_id="atd_traffic_incident_reports", - description="wrapper etl for atd-traffic-incident-reports docker image connects to oracle db and updates postrgrest with incidents", + description="wrapper etl for atd-traffic-incident-reports docker image connects to oracle db and updates postrgrest and socrata with incidents", default_args=DEFAULT_ARGS, schedule_interval="*/5 * * * *" if DEPLOYMENT_ENVIRONMENT == "production" else None, - tags=["repo:atd-traffic-incident-reports", "postgrest"], + tags=["repo:atd-traffic-incident-reports", "postgrest", "socrata"], catchup=False, ) as dag: docker_image = "atddocker/atd-traffic-incident-reports:production" + date_filter_arg = get_date_filter_arg(should_replace_monthly=False) + env_vars = get_env_vars_task(REQUIRED_SECRETS) t1 = DockerOperator( task_id="traffic_incident_reports_to_postgres", + docker_conn_id="docker_default", image=docker_image, auto_remove=True, command=f"python main.py", @@ -78,4 +99,15 @@ mount_tmp_dir=False, ) - t1 + t2 = DockerOperator( + task_id="traffic_incident_reports_to_socrata", + docker_conn_id="docker_default", + image=docker_image, + auto_remove=True, + command=f"python records_to_socrata.py -d {date_filter_arg}", + environment=env_vars, + tty=True, + mount_tmp_dir=False, + ) + + date_filter_arg >> t1 >> t2 From 1a1eebce79a81eb6c864786e34a38a8b51218739 Mon Sep 17 00:00:00 2001 From: chiaberry Date: Mon, 4 Dec 2023 18:34:26 -0600 Subject: [PATCH 2/4] revert changes from other PR --- dags/atd_knack_banner.py | 3 +- dags/atd_knack_edp_school_beacons.py | 93 ---------------------------- 2 files changed, 2 insertions(+), 94 deletions(-) delete mode 100644 dags/atd_knack_edp_school_beacons.py diff --git a/dags/atd_knack_banner.py b/dags/atd_knack_banner.py index 1a7706c..c00855f 100644 --- a/dags/atd_knack_banner.py +++ b/dags/atd_knack_banner.py @@ -17,7 +17,7 @@ "email_on_failure": False, "email_on_retry": False, "retries": 0, - "execution_timeout": duration(minutes=10), + "execution_timeout": duration(minutes=5), "on_failure_callback": task_fail_slack_alert, } @@ -47,6 +47,7 @@ description="Update knack HR app based on records in Banner and CTM", default_args=DEFAULT_ARGS, schedule_interval="45 7 * * *" if DEPLOYMENT_ENVIRONMENT == "production" else None, + dagrun_timeout=duration(minutes=30) tags=["repo:atd-knack-banner", "knack"], catchup=False, ) as dag: diff --git a/dags/atd_knack_edp_school_beacons.py b/dags/atd_knack_edp_school_beacons.py deleted file mode 100644 index 6718fb0..0000000 --- a/dags/atd_knack_edp_school_beacons.py +++ /dev/null @@ -1,93 +0,0 @@ -import os - -from airflow.models import DAG -from airflow.operators.docker_operator import DockerOperator -from pendulum import datetime, duration - -from utils.onepassword import get_env_vars_task -from utils.knack import get_date_filter_arg -from utils.slack_operator import task_fail_slack_alert - -DEPLOYMENT_ENVIRONMENT = os.getenv("ENVIRONMENT", "development") - -DEFAULT_ARGS = { - "owner": "airflow", - "depends_on_past": False, - "start_date": datetime(2023, 1, 1, tz="America/Chicago"), - "email_on_failure": False, - "email_on_retry": False, - "retries": 0, - "execution_timeout": duration(minutes=5), - "on_failure_callback": task_fail_slack_alert, -} - -REQUIRED_SECRETS = { - "KNACK_APP_ID": { - "opitem": "Knack AMD Data Tracker", - "opfield": f"production.appId", - }, - "KNACK_API_KEY": { - "opitem": "Knack AMD Data Tracker", - "opfield": f"production.apiKey", - }, - "PGREST_ENDPOINT": { - "opitem": "atd-knack-services PostgREST", - "opfield": "production.endpoint", - }, - "PGREST_JWT": { - "opitem": "atd-knack-services PostgREST", - "opfield": "production.jwt", - }, - "SOCRATA_API_KEY_ID": { - "opitem": "Socrata Key ID, Secret, and Token", - "opfield": "socrata.apiKeyId", - }, - "SOCRATA_API_KEY_SECRET": { - "opitem": "Socrata Key ID, Secret, and Token", - "opfield": "socrata.apiKeySecret", - }, - "SOCRATA_APP_TOKEN": { - "opitem": "Socrata Key ID, Secret, and Token", - "opfield": "socrata.appToken", - }, -} - - -with DAG( - dag_id="atd_knack_edp_school_beacons", - description="Load school beacons (view_3898) records from Knack to Postgrest to Socrata", - default_args=DEFAULT_ARGS, - schedule_interval="28 5 * * *" if DEPLOYMENT_ENVIRONMENT == "production" else None, - tags=["repo:atd-knack-services", "knack", "socrata", "data-tracker"], - catchup=False, -) as dag: - docker_image = "atddocker/atd-knack-services:production" - app_name = "data-tracker" - container = "view_3898" - - date_filter_arg = get_date_filter_arg(should_replace_monthly=True) - - env_vars = get_env_vars_task(REQUIRED_SECRETS) - - t1 = DockerOperator( - task_id="atd_knack_edp_school_beacons_to_postgrest", - image=docker_image, - auto_remove=True, - command=f"./atd-knack-services/services/records_to_postgrest.py -a {app_name} -c {container} {date_filter_arg}", - environment=env_vars, - tty=True, - force_pull=True, - mount_tmp_dir=False, - ) - - t2 = DockerOperator( - task_id="atd_knack_edp_school_beacons_to_socrata", - image=docker_image, - auto_remove=True, - command=f"./atd-knack-services/services/records_to_socrata.py -a {app_name} -c {container} {date_filter_arg}", - environment=env_vars, - tty=True, - mount_tmp_dir=False, - ) - - date_filter_arg >> t1 >> t2 From 68ae8306849568578c4465f7523a0735d1f8ec7c Mon Sep 17 00:00:00 2001 From: chiaberry Date: Mon, 4 Dec 2023 18:35:52 -0600 Subject: [PATCH 3/4] forgot a comma --- dags/atd_knack_banner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/atd_knack_banner.py b/dags/atd_knack_banner.py index c00855f..d87887c 100644 --- a/dags/atd_knack_banner.py +++ b/dags/atd_knack_banner.py @@ -17,7 +17,7 @@ "email_on_failure": False, "email_on_retry": False, "retries": 0, - "execution_timeout": duration(minutes=5), + "retry_delay": duration(minutes=5), "on_failure_callback": task_fail_slack_alert, } @@ -47,7 +47,7 @@ description="Update knack HR app based on records in Banner and CTM", default_args=DEFAULT_ARGS, schedule_interval="45 7 * * *" if DEPLOYMENT_ENVIRONMENT == "production" else None, - dagrun_timeout=duration(minutes=30) + dagrun_timeout=duration(minutes=30), tags=["repo:atd-knack-banner", "knack"], catchup=False, ) as dag: From 7bea71b33fae680f31ac741bd1d88afbb85b339f Mon Sep 17 00:00:00 2001 From: chiaberry Date: Wed, 24 Jan 2024 10:33:20 -0600 Subject: [PATCH 4/4] rename main.py to records_to_postgrest --- dags/atd_traffic_incident_reports.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/atd_traffic_incident_reports.py b/dags/atd_traffic_incident_reports.py index c73dd1b..eee6453 100644 --- a/dags/atd_traffic_incident_reports.py +++ b/dags/atd_traffic_incident_reports.py @@ -92,7 +92,7 @@ docker_conn_id="docker_default", image=docker_image, auto_remove=True, - command=f"python main.py", + command=f"python records_to_postgrest.py", environment=env_vars, tty=True, force_pull=True,