diff --git a/dags/atd_knack_banner.py b/dags/atd_knack_banner.py index 1a7706c..d87887c 100644 --- a/dags/atd_knack_banner.py +++ b/dags/atd_knack_banner.py @@ -17,7 +17,7 @@ "email_on_failure": False, "email_on_retry": False, "retries": 0, - "execution_timeout": duration(minutes=10), + "retry_delay": duration(minutes=5), "on_failure_callback": task_fail_slack_alert, } @@ -47,6 +47,7 @@ description="Update knack HR app based on records in Banner and CTM", default_args=DEFAULT_ARGS, schedule_interval="45 7 * * *" if DEPLOYMENT_ENVIRONMENT == "production" else None, + dagrun_timeout=duration(minutes=30), tags=["repo:atd-knack-banner", "knack"], catchup=False, ) as dag: diff --git a/dags/atd_traffic_incident_reports.py b/dags/atd_traffic_incident_reports.py index 13cd71c..eee6453 100644 --- a/dags/atd_traffic_incident_reports.py +++ b/dags/atd_traffic_incident_reports.py @@ -52,30 +52,62 @@ "opitem": "atd-traffic-incident-reports", "opfield": "production.Postgrest Endpoint", }, + # Socrata + "SOCRATA_RESOURCE_ID": { + "opitem": "atd-traffic-incident-reports", + "opfield": "production.Socrata Resource ID" + }, + "SOCRATA_API_KEY_ID": { + "opitem": "Socrata Key ID, Secret, and Token", + "opfield": "socrata.apiKeyId", + }, + "SOCRATA_API_KEY_SECRET": { + "opitem": "Socrata Key ID, Secret, and Token", + "opfield": "socrata.apiKeySecret", + }, + "SOCRATA_APP_TOKEN": { + "opitem": "Socrata Key ID, Secret, and Token", + "opfield": "socrata.appToken", + }, + } with DAG( dag_id="atd_traffic_incident_reports", - description="wrapper etl for atd-traffic-incident-reports docker image connects to oracle db and updates postrgrest with incidents", + description="wrapper etl for atd-traffic-incident-reports docker image connects to oracle db and updates postrgrest and socrata with incidents", default_args=DEFAULT_ARGS, schedule_interval="*/5 * * * *" if DEPLOYMENT_ENVIRONMENT == "production" else None, - tags=["repo:atd-traffic-incident-reports", "postgrest"], + tags=["repo:atd-traffic-incident-reports", "postgrest", "socrata"], catchup=False, ) as dag: docker_image = "atddocker/atd-traffic-incident-reports:production" + date_filter_arg = get_date_filter_arg(should_replace_monthly=False) + env_vars = get_env_vars_task(REQUIRED_SECRETS) t1 = DockerOperator( task_id="traffic_incident_reports_to_postgres", + docker_conn_id="docker_default", image=docker_image, auto_remove=True, - command=f"python main.py", + command=f"python records_to_postgrest.py", environment=env_vars, tty=True, force_pull=True, mount_tmp_dir=False, ) - t1 + t2 = DockerOperator( + task_id="traffic_incident_reports_to_socrata", + docker_conn_id="docker_default", + image=docker_image, + auto_remove=True, + command=f"python records_to_socrata.py -d {date_filter_arg}", + environment=env_vars, + tty=True, + mount_tmp_dir=False, + ) + + date_filter_arg >> t1 >> t2