diff --git a/airflow/models/backfill.py b/airflow/models/backfill.py index 37a953311306..49a0f44ed1a5 100644 --- a/airflow/models/backfill.py +++ b/airflow/models/backfill.py @@ -33,11 +33,12 @@ from airflow.api_connexion.exceptions import Conflict, NotFound from airflow.exceptions import AirflowException +from airflow.models import clear_task_instances from airflow.models.base import Base, StringID from airflow.settings import json from airflow.utils import timezone from airflow.utils.session import create_session -from airflow.utils.sqlalchemy import UtcDateTime +from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -56,6 +57,18 @@ class AlreadyRunningBackfill(AirflowException): """ +class ClearingBehavior(str, Enum): + """ + Internal enum for setting clearing behavior in a backfill. + + :meta private: + """ + + FAILED_RUNS = "failed-runs" + COMPLETED_RUNS = "completed-runs" + NO_CLEARING = "no-clearing" + + class Backfill(Base): """Model representing a backfill job.""" @@ -72,6 +85,7 @@ class Backfill(Base): Does not pause existing dag runs. """ + clearing_behavior = Column(StringID(), nullable=False, default=ClearingBehavior.NO_CLEARING) max_active_runs = Column(Integer, default=10, nullable=False) created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False) completed_at = Column(UtcDateTime, nullable=True) @@ -131,27 +145,82 @@ def validate_sort_ordinal(self, key, val): return val -def _create_backfill_dag_run(dag, info, backfill_id, dag_run_conf, backfill_sort_ordinal, session): +def _create_backfill_dag_run( + *, + dag, + info, + clearing_behavior: ClearingBehavior, + backfill_id, + dag_run_conf, + backfill_sort_ordinal, + session, +): from airflow.models import DagRun - dr = session.scalar(select(DagRun).where(DagRun.execution_date == info.logical_date).limit(1)) - if dr: - session.add( - BackfillDagRun( - backfill_id=backfill_id, - dag_run_id=None, - logical_date=info.logical_date, - exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS, - sort_ordinal=backfill_sort_ordinal, + states_to_query = None + dr_exists = session.scalar(select(DagRun.id).where(DagRun.execution_date == info.logical_date).limit(1)) + if dr_exists: + if clearing_behavior is ClearingBehavior.NO_CLEARING: + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS, + sort_ordinal=backfill_sort_ordinal, + ) ) + return + elif clearing_behavior is ClearingBehavior.FAILED_RUNS: + states_to_query = [DagRunState.FAILED] + elif clearing_behavior is ClearingBehavior.COMPLETED_RUNS: + states_to_query = [DagRunState.SUCCESS, DagRunState.FAILED] + else: + raise RuntimeError("unexpected") + if states_to_query: + # we only get here if user requested some clearing behavior + # and there are some dag runs for the logical date + # find the ones in the requested states and clear them + runs_to_clear_query = select(DagRun).where( + DagRun.execution_date == info.logical_date, + DagRun.state.in_(states_to_query), ) - log.info( - "dag run already exists for dag_id=%s backfill_id=%s, info=%s", - dag.dag_id, - backfill_id, - info, - ) + runs_to_clear = session.scalars(with_row_locks(runs_to_clear_query, session=session)).all() + for dr in runs_to_clear: + clear_task_instances( + tis=dr.get_task_instances(session=session), + session=session, + ) + dr.backfill_id = backfill_id + dr.run_type = DagRunType.BACKFILL_JOB + dr.triggered_by = DagRunTriggeredByType.BACKFILL + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=dr.id, + logical_date=info.logical_date, + sort_ordinal=backfill_sort_ordinal, + ) + ) + if not runs_to_clear: + session.add( + BackfillDagRun( + backfill_id=backfill_id, + dag_run_id=None, + logical_date=info.logical_date, + exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS, + sort_ordinal=backfill_sort_ordinal, + ) + ) return + + if dr_exists: + return + + # we only get here if there's no dag run for the logical date + # todo: we're subject to a race condition here where something else + # could create a dag run for the same logical date at the same time + # but there's no easy solution for this right now dr = dag.create_dagrun( triggered_by=DagRunTriggeredByType.BACKFILL, execution_date=info.logical_date, @@ -183,6 +252,7 @@ def _create_backfill( max_active_runs: int, reverse: bool, dag_run_conf: dict | None, + clearing_behavior: ClearingBehavior | None = None, ) -> Backfill | None: from airflow.models.serialized_dag import SerializedDagModel @@ -206,6 +276,7 @@ def _create_backfill( to_date=to_date, max_active_runs=max_active_runs, dag_run_conf=dag_run_conf, + clearing_behavior=clearing_behavior, ) session.add(br) session.commit() @@ -225,45 +296,22 @@ def _create_backfill( for info in dagrun_info_list: backfill_sort_ordinal += 1 session.commit() - from tenacity import RetryError, Retrying, stop_after_attempt - - try: - for attempt in Retrying(stop=stop_after_attempt(3)): - # we do retries here because it's possible that we check to see if dr exists - # before we attempt to create the dag run. if something else creates the dag - # run in between, we'll have to retry the transaction - with attempt: - with session.begin(): - _create_backfill_dag_run( - dag=dag, - info=info, - backfill_id=br.id, - dag_run_conf=br.dag_run_conf, - backfill_sort_ordinal=backfill_sort_ordinal, - session=session, - ) - log.info( - "created backfill dag run dag_id=%s backfill_id=%s, info=%s", - dag.dag_id, - br.id, - info, - ) - except RetryError: - dag.log.exception( - "Error while attempting to create a dag run dag_id='%s' logical_date='%s'", - dag.dag_id, - info.logical_date, - ) - session.add( - BackfillDagRun( - backfill_id=br.id, - dag_run_id=None, - exception_reason=BackfillDagRunExceptionReason.UNKNOWN, - logical_date=info.logical_date, - sort_ordinal=backfill_sort_ordinal, - ) - ) - session.commit() + _create_backfill_dag_run( + dag=dag, + info=info, + clearing_behavior=br.clearing_behavior, + backfill_id=br.id, + dag_run_conf=br.dag_run_conf, + backfill_sort_ordinal=backfill_sort_ordinal, + session=session, + ) + session.commit() + log.info( + "created backfill dag run dag_id=%s backfill_id=%s, info=%s", + dag.dag_id, + br.id, + info, + ) return br diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index d22ad5460329..501cd202c78e 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -31,6 +31,7 @@ Backfill, BackfillDagRun, BackfillDagRunExceptionReason, + ClearingBehavior, _cancel_backfill, _create_backfill, ) @@ -38,7 +39,7 @@ from airflow.ti_deps.dep_context import DepContext from airflow.utils import timezone from airflow.utils.state import DagRunState, TaskInstanceState -from airflow.utils.types import DagRunType +from airflow.utils.types import DagRunTriggeredByType, DagRunType from tests_common.test_utils.db import ( clear_db_backfills, @@ -147,6 +148,149 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): assert all(x.conf == expected_run_conf for x in dag_runs) +@pytest.mark.parametrize( + "clearing_behavior, run_counts", + [ + ( + ClearingBehavior.FAILED_RUNS, + { + "2021-01-01": 1, + "2021-01-02": 1, + "2021-01-03": 1, + "2021-01-04": 1, + "2021-01-06": 2, + "2021-01-08": 1, + }, + ), + ( + ClearingBehavior.COMPLETED_RUNS, + { + "2021-01-01": 1, + "2021-01-02": 1, + "2021-01-03": 2, + "2021-01-04": 2, + "2021-01-05": 2, + "2021-01-06": 2, + "2021-01-08": 1, + }, + ), + ], +) +def test_clearing_behavior(clearing_behavior, run_counts, dag_maker, session): + """ + We have two modes whereby when there's an existing run(s) in the range + of the backfill, we clear the dagrun to be rerun. + """ + with dag_maker(schedule="@daily") as dag: + PythonOperator(task_id="hi", python_callable=print) + + # for the first existing date, let's have one failed + date = "2021-01-02" + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}", + execution_date=timezone.parse(date), + session=session, + state=DagRunState.FAILED, + ) + for ti in dr.get_task_instances(session=session): + ti.state = TaskInstanceState.FAILED + session.commit() + + # for the next existing date, let's have one failed, one success + date = "2021-01-03" + for num, state in [(1, "failed"), (2, "success")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + # for the next existing date, let's have both failed + date = "2021-01-04" + for num, state in [(1, "failed"), (2, "success")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + # for the next existing date, let's have both success + date = "2021-01-05" + for num, state in [(1, "success"), (2, "success")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + # for the next existing date, let's have both failed + date = "2021-01-06" + for num, state in [(1, "failed"), (2, "failed")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + # for the next existing date, let's have both running + date = "2021-01-07" + for num, state in [(1, "running"), (2, "running")]: + dr = dag_maker.create_dagrun( + run_id=f"scheduled_{date}-{num}", + execution_date=timezone.parse(date), + session=session, + state=state, + ) + for ti in dr.get_task_instances(session=session): + ti.state = state + session.commit() + + b = _create_backfill( + dag_id=dag.dag_id, + from_date=pendulum.parse("2021-01-01"), + to_date=pendulum.parse("2021-01-08"), + max_active_runs=2, + clearing_behavior=clearing_behavior, + reverse=False, + dag_run_conf=None, + ) + from collections import Counter + + query = ( + select(DagRun) + .join(BackfillDagRun.dag_run) + .where(BackfillDagRun.backfill_id == b.id) + .order_by(BackfillDagRun.sort_ordinal) + ) + dag_runs = session.scalars(query).all() + assert all(x.run_type == DagRunType.BACKFILL_JOB for x in dag_runs) + assert all(x.triggered_by == DagRunTriggeredByType.BACKFILL for x in dag_runs) + backfill_dates = [str(x.logical_date.date()) for x in dag_runs] + assert Counter(backfill_dates) == run_counts + non_run = session.scalar( + select(BackfillDagRun).where(BackfillDagRun.logical_date == timezone.parse("2021-01-07")) + ) + assert non_run.exception_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS + # todo AIP-78: require clear_failed if any task is depends_on_past + assert all(x.state == DagRunState.QUEUED for x in dag_runs) + + def test_params_stored_correctly(dag_maker, session): with dag_maker(schedule="@daily") as dag: PythonOperator(task_id="hi", python_callable=print)