From fea0ec3dc3f3d5958f9c8e002e95e43ef45f8597 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Fri, 18 Oct 2024 19:05:20 -0700 Subject: [PATCH] Add clearing behavior for backfill User can elect to clear failed runs or completed runs in the backfill range. Also, we handle the possibility of duplicate dag runs (i.e. multiple for the same execution date). --- airflow/models/backfill.py | 160 ++++++++++++++++++++++------------ tests/models/test_backfill.py | 146 ++++++++++++++++++++++++++++++- 2 files changed, 249 insertions(+), 57 deletions(-) diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 37a953311306..49a0f44ed1a5 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -33,11 +33,12 @@ from airflow.api_connexion.exceptions import Conflict, NotFound from airflow.exceptions import AirflowException +from airflow.models import clear_task_instances from airflow.models.base import Base, StringID from airflow.settings import json from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.sqlalchemy import UtcDateTime +from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -56,6 +57,18 @@ class AlreadyRunningBackfill(AirflowException): """ +class ClearingBehavior(str, Enum): + """ + Internal enum for setting clearing behavior in a backfill. + + :meta private: + """ + + FAILED_RUNS = "failed-runs" + COMPLETED_RUNS = "completed-runs" + NO_CLEARING = "no-clearing" + + class Backfill(Base): """Model representing a backfill job.""" @@ -72,6 +85,7 @@ class Backfill(Base): Does not pause existing dag runs. """ + clearing_behavior = Column(StringID(), nullable=False, default=ClearingBehavior.NO_CLEARING) max_active_runs = Column(Integer, default=10, nullable=False) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) completed_at = Column(UtcDateTime, nullable=True) @@ -131,27 +145,82 @@ def validate_sort_ordinal(self, key, val): return val -def _create_backfill_dag_run(dag, info, backfill_id, dag_run_conf, backfill_sort_ordinal, session): +def _create_backfill_dag_run( + *, + dag, + info, + clearing_behavior: ClearingBehavior, + backfill_id, + dag_run_conf, + backfill_sort_ordinal, + session, +): from airflow.models import DagRun - dr = session.scalar(select(DagRun).where(DagRun.execution_date == info.logical_date).limit(1)) - if dr: - session.add( - BackfillDagRun( - backfill_id=backfill_id, - dag_run_id=None, - logical_date=info.logical_date, - exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS, - sort_ordinal=backfill_sort_ordinal, + states_to_query = None + dr_exists = session.scalar(select(DagRun.id).where(DagRun.execution_date == info.logical_date).limit(1)) + if dr_exists: + if clearing_behavior is ClearingBehavior.NO_CLEARING: + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS, + sort_ordinal=backfill_sort_ordinal, + ) ) + return + elif clearing_behavior is ClearingBehavior.FAILED_RUNS: + states_to_query = [DagRunState.FAILED] + elif clearing_behavior is ClearingBehavior.COMPLETED_RUNS: + states_to_query = [DagRunState.SUCCESS, DagRunState.FAILED] + else: + raise RuntimeError("unexpected") + if states_to_query: + # we only get here if user requested some clearing behavior + # and there are some dag runs for the logical date + # find the ones in the requested states and clear them + runs_to_clear_query = select(DagRun).where( + DagRun.execution_date == info.logical_date, + DagRun.state.in_(states_to_query), ) - log.info( - "dag run already exists for dag_id=%s backfill_id=%s, info=%s", - dag.dag_id, - backfill_id, - info, - ) + runs_to_clear = session.scalars(with_row_locks(runs_to_clear_query, session=session)).all() + for dr in runs_to_clear: + clear_task_instances( + tis=dr.get_task_instances(session=session), + session=session, + ) + dr.backfill_id = backfill_id + dr.run_type = DagRunType.BACKFILL_JOB + dr.triggered_by = DagRunTriggeredByType.BACKFILL + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=info.logical_date, + sort_ordinal=backfill_sort_ordinal, + ) + ) + if not runs_to_clear: + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS, + sort_ordinal=backfill_sort_ordinal, + ) + ) return + + if dr_exists: + return + + # we only get here if there's no dag run for the logical date + # todo: we're subject to a race condition here where something else + # could create a dag run for the same logical date at the same time + # but there's no easy solution for this right now dr = dag.create_dagrun( triggered_by=DagRunTriggeredByType.BACKFILL, execution_date=info.logical_date, @@ -183,6 +252,7 @@ def _create_backfill( max_active_runs: int, reverse: bool, dag_run_conf: dict | None, + clearing_behavior: ClearingBehavior | None = None, ) -> Backfill | None: from airflow.models.serialized_dag import SerializedDagModel @@ -206,6 +276,7 @@ def _create_backfill( to_date=to_date, max_active_runs=max_active_runs, dag_run_conf=dag_run_conf, + clearing_behavior=clearing_behavior, ) session.add(br) session.commit() @@ -225,45 +296,22 @@ def _create_backfill( for info in dagrun_info_list: backfill_sort_ordinal += 1 session.commit() - from tenacity import RetryError, Retrying, stop_after_attempt - - try: - for attempt in Retrying(stop=stop_after_attempt(3)): - # we do retries here because it's possible that we check to see if dr exists - # before we attempt to create the dag run. if something else creates the dag - # run in between, we'll have to retry the transaction - with attempt: - with session.begin(): - _create_backfill_dag_run( - dag=dag, - info=info, - backfill_id=br.id, - dag_run_conf=br.dag_run_conf, - backfill_sort_ordinal=backfill_sort_ordinal, - session=session, - ) - log.info( - "created backfill dag run dag_id=%s backfill_id=%s, info=%s", - dag.dag_id, - br.id, - info, - ) - except RetryError: - dag.log.exception( - "Error while attempting to create a dag run dag_id='%s' logical_date='%s'", - dag.dag_id, - info.logical_date, - ) - session.add( - BackfillDagRun( - backfill_id=br.id, - dag_run_id=None, - exception_reason=BackfillDagRunExceptionReason.UNKNOWN, - logical_date=info.logical_date, - sort_ordinal=backfill_sort_ordinal, - ) - ) - session.commit() + _create_backfill_dag_run( + dag=dag, + info=info, + clearing_behavior=br.clearing_behavior, + backfill_id=br.id, + dag_run_conf=br.dag_run_conf, + backfill_sort_ordinal=backfill_sort_ordinal, + session=session, + ) + session.commit() + log.info( + "created backfill dag run dag_id=%s backfill_id=%s, info=%s", + dag.dag_id, + br.id, + info, + ) return br diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index d22ad5460329..501cd202c78e 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -31,6 +31,7 @@ Backfill, BackfillDagRun, BackfillDagRunExceptionReason, + ClearingBehavior, _cancel_backfill, _create_backfill, ) @@ -38,7 +39,7 @@ from airflow.ti_deps.dep_context import DepContext from airflow.utils import timezone from airflow.utils.state import DagRunState, TaskInstanceState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.db import ( clear_db_backfills, @@ -147,6 +148,149 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): assert all(x.conf == expected_run_conf for x in dag_runs) +@pytest.mark.parametrize( + "clearing_behavior, run_counts", + [ + ( + ClearingBehavior.FAILED_RUNS, + { + "2021-01-01": 1, + "2021-01-02": 1, + "2021-01-03": 1, + "2021-01-04": 1, + "2021-01-06": 2, + "2021-01-08": 1, + }, + ), + ( + ClearingBehavior.COMPLETED_RUNS, + { + "2021-01-01": 1, + "2021-01-02": 1, + "2021-01-03": 2, + "2021-01-04": 2, + "2021-01-05": 2, + "2021-01-06": 2, + "2021-01-08": 1, + }, + ), + ], +) +def test_clearing_behavior(clearing_behavior, run_counts, dag_maker, session): + """ + We have two modes whereby when there's an existing run(s) in the range + of the backfill, we clear the dagrun to be rerun. + """ + with dag_maker(schedule="@daily") as dag: + PythonOperator(task_id="hi", python_callable=print) + + # for the first existing date, let's have one failed + date = "2021-01-02" + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}", + execution_date=timezone.parse(date), + session=session, + state=DagRunState.FAILED, + ) + for ti in dr.get_task_instances(session=session): + ti.state = TaskInstanceState.FAILED + session.commit() + + # for the next existing date, let's have one failed, one success + date = "2021-01-03" + for num, state in [(1, "failed"), (2, "success")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + # for the next existing date, let's have both failed + date = "2021-01-04" + for num, state in [(1, "failed"), (2, "success")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + # for the next existing date, let's have both success + date = "2021-01-05" + for num, state in [(1, "success"), (2, "success")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + # for the next existing date, let's have both failed + date = "2021-01-06" + for num, state in [(1, "failed"), (2, "failed")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + # for the next existing date, let's have both running + date = "2021-01-07" + for num, state in [(1, "running"), (2, "running")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + b = _create_backfill( + dag_id=dag.dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-08"), + max_active_runs=2, + clearing_behavior=clearing_behavior, + reverse=False, + dag_run_conf=None, + ) + from collections import Counter + + query = ( + select(DagRun) + .join(BackfillDagRun.dag_run) + .where(BackfillDagRun.backfill_id == b.id) + .order_by(BackfillDagRun.sort_ordinal) + ) + dag_runs = session.scalars(query).all() + assert all(x.run_type == DagRunType.BACKFILL_JOB for x in dag_runs) + assert all(x.triggered_by == DagRunTriggeredByType.BACKFILL for x in dag_runs) + backfill_dates = [str(x.logical_date.date()) for x in dag_runs] + assert Counter(backfill_dates) == run_counts + non_run = session.scalar( + select(BackfillDagRun).where(BackfillDagRun.logical_date == timezone.parse("2021-01-07")) + ) + assert non_run.exception_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS + # todo AIP-78: require clear_failed if any task is depends_on_past + assert all(x.state == DagRunState.QUEUED for x in dag_runs) + + def test_params_stored_correctly(dag_maker, session): with dag_maker(schedule="@daily") as dag: PythonOperator(task_id="hi", python_callable=print)