Skip to content

Commit

Permalink
Add a high number of retries and a short delay to retry on read tasks (
Browse files Browse the repository at this point in the history
…#766)

* add columns for wiriting to displayed acteur

* add retry
  • Loading branch information
Hazelmat committed Aug 12, 2024
1 parent f7be15f commit 67d80e0
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions dags/create_final_actors.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,12 @@ def _merge_acteurs_many2many_relationship(
"retry_delay": timedelta(minutes=5),
}


# Retry settings for reading tasks
read_retry_count = 5
read_retry_interval = timedelta(minutes=2)


dag = DAG(
utils.get_dag_name(__file__, "apply_adresse_corrections"),
default_args=default_args,
Expand All @@ -349,69 +355,89 @@ def _merge_acteurs_many2many_relationship(
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_acteur"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

read_ps = PythonOperator(
task_id="load_propositionservice",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_propositionservice"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

read_revision_actor = PythonOperator(
task_id="load_revision_actors",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionacteur"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

read_revision_ps = PythonOperator(
task_id="load_revision_propositionservice",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionpropositionservice"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

read_revision_sc = PythonOperator(
task_id="load_revision_ps_sous_categories",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionpropositionservice_sous_categories"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

read_sc = PythonOperator(
task_id="load_ps_sous_categories",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_propositionservice_sous_categories"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

read_acteur_labels = PythonOperator(
task_id="read_acteur_labels",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_acteur_labels"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

read_acteur_acteur_services = PythonOperator(
task_id="read_acteur_acteur_services",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_acteur_acteur_services"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

read_revisionacteur_labels = PythonOperator(
task_id="read_revisionacteur_labels",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionacteur_labels"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)

read_revisionacteur_acteur_services = PythonOperator(
task_id="read_revisionacteur_acteur_services",
python_callable=read_data_from_postgres,
op_kwargs={"table_name": "qfdmo_revisionacteur_acteur_services"},
dag=dag,
retries=read_retry_count,
retry_delay=read_retry_interval,
)
# qfdmo_revisionacteur_acteur_services

Expand Down

0 comments on commit 67d80e0

Please sign in to comment.