Skip to content

Commit

Permalink
Merge pull request #186 from cityofaustin/11997-traffic-incidents
Browse files Browse the repository at this point in the history
Traffic Incidents -- add task to push to socrata
  • Loading branch information
chiaberry authored Jan 24, 2024
2 parents bc4bfa2 + 7bea71b commit 6fa873c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
3 changes: 2 additions & 1 deletion dags/atd_knack_banner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"execution_timeout": duration(minutes=10),
"retry_delay": duration(minutes=5),
"on_failure_callback": task_fail_slack_alert,
}

Expand Down Expand Up @@ -47,6 +47,7 @@
description="Update knack HR app based on records in Banner and CTM",
default_args=DEFAULT_ARGS,
schedule_interval="45 7 * * *" if DEPLOYMENT_ENVIRONMENT == "production" else None,
dagrun_timeout=duration(minutes=30),
tags=["repo:atd-knack-banner", "knack"],
catchup=False,
) as dag:
Expand Down
40 changes: 36 additions & 4 deletions dags/atd_traffic_incident_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,30 +52,62 @@
"opitem": "atd-traffic-incident-reports",
"opfield": "production.Postgrest Endpoint",
},
# Socrata
"SOCRATA_RESOURCE_ID": {
"opitem": "atd-traffic-incident-reports",
"opfield": "production.Socrata Resource ID"
},
"SOCRATA_API_KEY_ID": {
"opitem": "Socrata Key ID, Secret, and Token",
"opfield": "socrata.apiKeyId",
},
"SOCRATA_API_KEY_SECRET": {
"opitem": "Socrata Key ID, Secret, and Token",
"opfield": "socrata.apiKeySecret",
},
"SOCRATA_APP_TOKEN": {
"opitem": "Socrata Key ID, Secret, and Token",
"opfield": "socrata.appToken",
},

}


with DAG(
dag_id="atd_traffic_incident_reports",
description="wrapper etl for atd-traffic-incident-reports docker image connects to oracle db and updates postrgrest with incidents",
description="wrapper etl for atd-traffic-incident-reports docker image connects to oracle db and updates postrgrest and socrata with incidents",
default_args=DEFAULT_ARGS,
schedule_interval="*/5 * * * *" if DEPLOYMENT_ENVIRONMENT == "production" else None,
tags=["repo:atd-traffic-incident-reports", "postgrest"],
tags=["repo:atd-traffic-incident-reports", "postgrest", "socrata"],
catchup=False,
) as dag:
docker_image = "atddocker/atd-traffic-incident-reports:production"

date_filter_arg = get_date_filter_arg(should_replace_monthly=False)

env_vars = get_env_vars_task(REQUIRED_SECRETS)

t1 = DockerOperator(
task_id="traffic_incident_reports_to_postgres",
docker_conn_id="docker_default",
image=docker_image,
auto_remove=True,
command=f"python main.py",
command=f"python records_to_postgrest.py",
environment=env_vars,
tty=True,
force_pull=True,
mount_tmp_dir=False,
)

t1
t2 = DockerOperator(
task_id="traffic_incident_reports_to_socrata",
docker_conn_id="docker_default",
image=docker_image,
auto_remove=True,
command=f"python records_to_socrata.py -d {date_filter_arg}",
environment=env_vars,
tty=True,
mount_tmp_dir=False,
)

date_filter_arg >> t1 >> t2

0 comments on commit 6fa873c

Please sign in to comment.