Skip to content

Commit

Permalink
Add clearing behavior for backfill
Browse files Browse the repository at this point in the history
User can elect to clear failed runs or completed runs in the backfill range.

Also, we handle the possibility of duplicate dag runs (i.e. multiple for the same execution date).
  • Loading branch information
dstandish committed Oct 19, 2024
1 parent 025082a commit fea0ec3
Show file tree
Hide file tree
Showing 2 changed files with 249 additions and 57 deletions.
160 changes: 104 additions & 56 deletions airflow/models/backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@

from airflow.api_connexion.exceptions import Conflict, NotFound
from airflow.exceptions import AirflowException
from airflow.models import clear_task_instances
from airflow.models.base import Base, StringID
from airflow.settings import json
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.sqlalchemy import UtcDateTime
from airflow.utils.sqlalchemy import UtcDateTime, with_row_locks
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType

Expand All @@ -56,6 +57,18 @@ class AlreadyRunningBackfill(AirflowException):
"""


class ClearingBehavior(str, Enum):
"""
Internal enum for setting clearing behavior in a backfill.
:meta private:
"""

FAILED_RUNS = "failed-runs"
COMPLETED_RUNS = "completed-runs"
NO_CLEARING = "no-clearing"


class Backfill(Base):
"""Model representing a backfill job."""

Expand All @@ -72,6 +85,7 @@ class Backfill(Base):
Does not pause existing dag runs.
"""
clearing_behavior = Column(StringID(), nullable=False, default=ClearingBehavior.NO_CLEARING)
max_active_runs = Column(Integer, default=10, nullable=False)
created_at = Column(UtcDateTime, default=timezone.utcnow, nullable=False)
completed_at = Column(UtcDateTime, nullable=True)
Expand Down Expand Up @@ -131,27 +145,82 @@ def validate_sort_ordinal(self, key, val):
return val


def _create_backfill_dag_run(dag, info, backfill_id, dag_run_conf, backfill_sort_ordinal, session):
def _create_backfill_dag_run(
*,
dag,
info,
clearing_behavior: ClearingBehavior,
backfill_id,
dag_run_conf,
backfill_sort_ordinal,
session,
):
from airflow.models import DagRun

dr = session.scalar(select(DagRun).where(DagRun.execution_date == info.logical_date).limit(1))
if dr:
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS,
sort_ordinal=backfill_sort_ordinal,
states_to_query = None
dr_exists = session.scalar(select(DagRun.id).where(DagRun.execution_date == info.logical_date).limit(1))
if dr_exists:
if clearing_behavior is ClearingBehavior.NO_CLEARING:
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS,
sort_ordinal=backfill_sort_ordinal,
)
)
return
elif clearing_behavior is ClearingBehavior.FAILED_RUNS:
states_to_query = [DagRunState.FAILED]
elif clearing_behavior is ClearingBehavior.COMPLETED_RUNS:
states_to_query = [DagRunState.SUCCESS, DagRunState.FAILED]
else:
raise RuntimeError("unexpected")
if states_to_query:
# we only get here if user requested some clearing behavior
# and there are some dag runs for the logical date
# find the ones in the requested states and clear them
runs_to_clear_query = select(DagRun).where(
DagRun.execution_date == info.logical_date,
DagRun.state.in_(states_to_query),
)
log.info(
"dag run already exists for dag_id=%s backfill_id=%s, info=%s",
dag.dag_id,
backfill_id,
info,
)
runs_to_clear = session.scalars(with_row_locks(runs_to_clear_query, session=session)).all()
for dr in runs_to_clear:
clear_task_instances(
tis=dr.get_task_instances(session=session),
session=session,
)
dr.backfill_id = backfill_id
dr.run_type = DagRunType.BACKFILL_JOB
dr.triggered_by = DagRunTriggeredByType.BACKFILL
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=dr.id,
logical_date=info.logical_date,
sort_ordinal=backfill_sort_ordinal,
)
)
if not runs_to_clear:
session.add(
BackfillDagRun(
backfill_id=backfill_id,
dag_run_id=None,
logical_date=info.logical_date,
exception_reason=BackfillDagRunExceptionReason.ALREADY_EXISTS,
sort_ordinal=backfill_sort_ordinal,
)
)
return

if dr_exists:
return

# we only get here if there's no dag run for the logical date
# todo: we're subject to a race condition here where something else
# could create a dag run for the same logical date at the same time
# but there's no easy solution for this right now
dr = dag.create_dagrun(
triggered_by=DagRunTriggeredByType.BACKFILL,
execution_date=info.logical_date,
Expand Down Expand Up @@ -183,6 +252,7 @@ def _create_backfill(
max_active_runs: int,
reverse: bool,
dag_run_conf: dict | None,
clearing_behavior: ClearingBehavior | None = None,
) -> Backfill | None:
from airflow.models.serialized_dag import SerializedDagModel

Expand All @@ -206,6 +276,7 @@ def _create_backfill(
to_date=to_date,
max_active_runs=max_active_runs,
dag_run_conf=dag_run_conf,
clearing_behavior=clearing_behavior,
)
session.add(br)
session.commit()
Expand All @@ -225,45 +296,22 @@ def _create_backfill(
for info in dagrun_info_list:
backfill_sort_ordinal += 1
session.commit()
from tenacity import RetryError, Retrying, stop_after_attempt

try:
for attempt in Retrying(stop=stop_after_attempt(3)):
# we do retries here because it's possible that we check to see if dr exists
# before we attempt to create the dag run. if something else creates the dag
# run in between, we'll have to retry the transaction
with attempt:
with session.begin():
_create_backfill_dag_run(
dag=dag,
info=info,
backfill_id=br.id,
dag_run_conf=br.dag_run_conf,
backfill_sort_ordinal=backfill_sort_ordinal,
session=session,
)
log.info(
"created backfill dag run dag_id=%s backfill_id=%s, info=%s",
dag.dag_id,
br.id,
info,
)
except RetryError:
dag.log.exception(
"Error while attempting to create a dag run dag_id='%s' logical_date='%s'",
dag.dag_id,
info.logical_date,
)
session.add(
BackfillDagRun(
backfill_id=br.id,
dag_run_id=None,
exception_reason=BackfillDagRunExceptionReason.UNKNOWN,
logical_date=info.logical_date,
sort_ordinal=backfill_sort_ordinal,
)
)
session.commit()
_create_backfill_dag_run(
dag=dag,
info=info,
clearing_behavior=br.clearing_behavior,
backfill_id=br.id,
dag_run_conf=br.dag_run_conf,
backfill_sort_ordinal=backfill_sort_ordinal,
session=session,
)
session.commit()
log.info(
"created backfill dag run dag_id=%s backfill_id=%s, info=%s",
dag.dag_id,
br.id,
info,
)
return br


Expand Down
146 changes: 145 additions & 1 deletion tests/models/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@
Backfill,
BackfillDagRun,
BackfillDagRunExceptionReason,
ClearingBehavior,
_cancel_backfill,
_create_backfill,
)
from airflow.operators.python import PythonOperator
from airflow.ti_deps.dep_context import DepContext
from airflow.utils import timezone
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from airflow.utils.types import DagRunTriggeredByType, DagRunType

from tests_common.test_utils.db import (
clear_db_backfills,
Expand Down Expand Up @@ -147,6 +148,149 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session):
assert all(x.conf == expected_run_conf for x in dag_runs)


@pytest.mark.parametrize(
"clearing_behavior, run_counts",
[
(
ClearingBehavior.FAILED_RUNS,
{
"2021-01-01": 1,
"2021-01-02": 1,
"2021-01-03": 1,
"2021-01-04": 1,
"2021-01-06": 2,
"2021-01-08": 1,
},
),
(
ClearingBehavior.COMPLETED_RUNS,
{
"2021-01-01": 1,
"2021-01-02": 1,
"2021-01-03": 2,
"2021-01-04": 2,
"2021-01-05": 2,
"2021-01-06": 2,
"2021-01-08": 1,
},
),
],
)
def test_clearing_behavior(clearing_behavior, run_counts, dag_maker, session):
"""
We have two modes whereby when there's an existing run(s) in the range
of the backfill, we clear the dagrun to be rerun.
"""
with dag_maker(schedule="@daily") as dag:
PythonOperator(task_id="hi", python_callable=print)

# for the first existing date, let's have one failed
date = "2021-01-02"
dr = dag_maker.create_dagrun(
run_id=f"scheduled_{date}",
execution_date=timezone.parse(date),
session=session,
state=DagRunState.FAILED,
)
for ti in dr.get_task_instances(session=session):
ti.state = TaskInstanceState.FAILED
session.commit()

# for the next existing date, let's have one failed, one success
date = "2021-01-03"
for num, state in [(1, "failed"), (2, "success")]:
dr = dag_maker.create_dagrun(
run_id=f"scheduled_{date}-{num}",
execution_date=timezone.parse(date),
session=session,
state=state,
)
for ti in dr.get_task_instances(session=session):
ti.state = state
session.commit()

# for the next existing date, let's have both failed
date = "2021-01-04"
for num, state in [(1, "failed"), (2, "success")]:
dr = dag_maker.create_dagrun(
run_id=f"scheduled_{date}-{num}",
execution_date=timezone.parse(date),
session=session,
state=state,
)
for ti in dr.get_task_instances(session=session):
ti.state = state
session.commit()

# for the next existing date, let's have both success
date = "2021-01-05"
for num, state in [(1, "success"), (2, "success")]:
dr = dag_maker.create_dagrun(
run_id=f"scheduled_{date}-{num}",
execution_date=timezone.parse(date),
session=session,
state=state,
)
for ti in dr.get_task_instances(session=session):
ti.state = state
session.commit()

# for the next existing date, let's have both failed
date = "2021-01-06"
for num, state in [(1, "failed"), (2, "failed")]:
dr = dag_maker.create_dagrun(
run_id=f"scheduled_{date}-{num}",
execution_date=timezone.parse(date),
session=session,
state=state,
)
for ti in dr.get_task_instances(session=session):
ti.state = state
session.commit()

# for the next existing date, let's have both running
date = "2021-01-07"
for num, state in [(1, "running"), (2, "running")]:
dr = dag_maker.create_dagrun(
run_id=f"scheduled_{date}-{num}",
execution_date=timezone.parse(date),
session=session,
state=state,
)
for ti in dr.get_task_instances(session=session):
ti.state = state
session.commit()

b = _create_backfill(
dag_id=dag.dag_id,
from_date=pendulum.parse("2021-01-01"),
to_date=pendulum.parse("2021-01-08"),
max_active_runs=2,
clearing_behavior=clearing_behavior,
reverse=False,
dag_run_conf=None,
)
from collections import Counter

query = (
select(DagRun)
.join(BackfillDagRun.dag_run)
.where(BackfillDagRun.backfill_id == b.id)
.order_by(BackfillDagRun.sort_ordinal)
)
dag_runs = session.scalars(query).all()
assert all(x.run_type == DagRunType.BACKFILL_JOB for x in dag_runs)
assert all(x.triggered_by == DagRunTriggeredByType.BACKFILL for x in dag_runs)
backfill_dates = [str(x.logical_date.date()) for x in dag_runs]
assert Counter(backfill_dates) == run_counts
non_run = session.scalar(
select(BackfillDagRun).where(BackfillDagRun.logical_date == timezone.parse("2021-01-07"))
)
assert non_run.exception_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS
# todo AIP-78: require clear_failed if any task is depends_on_past
assert all(x.state == DagRunState.QUEUED for x in dag_runs)


def test_params_stored_correctly(dag_maker, session):
with dag_maker(schedule="@daily") as dag:
PythonOperator(task_id="hi", python_callable=print)
Expand Down

0 comments on commit fea0ec3

Please sign in to comment.