Skip to content

Commit

Permalink
Remove execution_date and logical_date from arguments where function/…
Browse files Browse the repository at this point in the history
…API is used to look up a DAG run

- Resolve compatibility issues and refactor `execution_date` to `logical_date`
- Resolve compatibility tests
- Correct import paths after rebase
- Address static checks
- Refactor `execution_date` to `logical_date`
- Add missing DAG files for tests
- Enhance GCP ML and CloudBuild tests using helpers for compatibility
- Fix mypy errors
- Miscellaneous fixes and removals of `execution_date`
- Resolve compatibility tests
- Correct import paths after rebase
- Address static checks
- Refactor `execution_date` to `logical_date`
- Add missing DAG files for tests
- Enhance GCP ML and CloudBuild tests using helpers for compatibility
- Fix mypy errors
- Miscellaneous fixes and removals of `execution_date`
- Resolve compatibility tests
- Correct import paths after rebase
- Address static checks
- Refactor `execution_date` to `logical_date`
- Add missing DAG files for tests
- Enhance GCP ML and CloudBuild tests using helpers for compatibility
- Fix mypy errors
- Miscellaneous fixes and removals of `execution_date`

Resolve compatibility issues and refactor `execution_date` to `logical_date`

- Resolve compatibility tests
- Correct import paths after rebase
- Address static checks
- Refactor `execution_date` to `logical_date`
- Add missing DAG files for tests
- Enhance GCP ML and CloudBuild tests using helpers for compatibility
- Fix mypy errors
- Miscellaneous fixes and removals of `execution_date`

Resolve compatibility issues and refactor `execution_date` to `logical_date`

- Resolve compatibility tests
- Correct import paths after rebase
- Address static checks
- Refactor `execution_date` to `logical_date`
- Add missing DAG files for tests
- Enhance GCP ML and CloudBuild tests using helpers for compatibility
- Fix mypy errors
- Miscellaneous fixes and removals of `execution_date`

Resolve compatibility issues and refactor `execution_date` to `logical_date`

- Resolve compatibility tests
- Correct import paths after rebase
- Address static checks
- Refactor `execution_date` to `logical_date`
- Add missing DAG files for tests
- Enhance GCP ML and CloudBuild tests using helpers for compatibility
- Fix mypy errors
- Miscellaneous fixes and removals of `execution_date`

Resolve compatibility issues and refactor `execution_date` to `logical_date`
- Resolve compatibility tests
- Correct import paths after rebase
- Address static checks
- Refactor `execution_date` to `logical_date`
- Add missing DAG files for tests
- Enhance GCP ML and CloudBuild tests using helpers for compatibility
- Fix mypy errors
- Miscellaneous fixes and removals of `execution_date`

More name changes and argument removal

Mass replace execution_date in arguments

Not finished, still many to go.

Mass replace execution_date in arguments

Not finished, still many to go.

Drop execution_date unique constraint on DagRun

The column has also been renamed to logical_date, although the Python
model is not changed. This allows us to not need to fix all the Python
code at once (we'll do that later), but still do the two changes in one
migration instead of two.

Fix compat tests

Fix compat tests

Fix compat test

Fix the tests

Fix compat tests

Fix compat tests

Fix compat tests

Fix compat tests

Fix compat tests

Fix compat tests

Fix static checks

Fix the pytest_plugin by refactor renaming execution_date to logical_date

Fix the pre-commit errors

Fix import paths after rebase

fix more compat tests

Fix the compat tests

Fix the compat tests

Fix the compat tests

Use test helpers in GCP MLEngine tests for compat

Using Airflow internals directly presents a problem when dealing with
compatibility in tests (since the same tests must run against Airflow 2
and 3). The helpers already handle this well, so we should use them.

Use test helpers in GCP CloudBuild tests for compat

Using Airflow internals directly presents a problem when dealing with
compatibility in tests (since the same tests must run against Airflow 2
and 3). The helpers already handle this well, so we should use them.

Mark db tests

Some compat code to make DAG.clear() still work

Remove unneeded test cases for compat code

Use test helpers in GCS-BQ tests for compat

Using Airflow internals directly presents a problem when dealing with
compatibility in tests (since the same tests must run against Airflow 2
and 3). The helpers already handle this well, so we should use them.

Fix tests and add missing dag_file for tests

Add missing dag_file for tests

Fix some provider/cncf compat on logical date

We should continue to use execution_date if the provider is run against 2.

Fix the cache import path

refactor rename execution_date, executionDate to logical_date, logicalDate respectively.

refactor rename execution_date, executionDate to logical_date, logicalDate respectively.

fix more tests

Fix more tests

Fix more tests

More fixes on removal of execution date

Fix more tests

fix more tests

Fix tests

Fix tests

Fix tests

fix the static checks

remove the migration

fix the tests

Fix migration file

Remove renaming execution date for SlaMiss

Remove execution_date and add logical date

Fix mypy errors

try-except for 2.x. for DagRun

Remove _DEPRECATION_REPLACEMENTS

More name changes and argument removal

Mass replace execution_date in arguments

Not finished, still many to go.

Mass replace execution_date in arguments

Not finished, still many to go.

Drop execution_date unique constraint on DagRun

The column has also been renamed to logical_date, although the Python
model is not changed. This allows us to not need to fix all the Python
code at once (we'll do that later), but still do the two changes in one
migration instead of two.
  • Loading branch information
sunank200 committed Nov 7, 2024
1 parent 845a160 commit 7b05041
Show file tree
Hide file tree
Showing 303 changed files with 3,613 additions and 3,252 deletions.
4 changes: 2 additions & 2 deletions airflow/api/client/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ def __init__(self, auth=None, session: httpx.Client | None = None):
self._session.auth = auth

def trigger_dag(
self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True
self, dag_id, run_id=None, conf=None, logical_date=None, replace_microseconds=True
) -> dict | None:
dag_run = trigger_dag.trigger_dag(
dag_id=dag_id,
triggered_by=DagRunTriggeredByType.CLI,
run_id=run_id,
conf=conf,
execution_date=execution_date,
logical_date=logical_date,
replace_microseconds=replace_microseconds,
)
if dag_run:
Expand Down
90 changes: 17 additions & 73 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
from airflow.utils.helpers import exactly_one
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
Expand Down Expand Up @@ -59,19 +58,19 @@ def _create_dagruns(
:param dag: The DAG to create runs for.
:param infos: List of logical dates and data intervals to evaluate.
:param state: The state to set the dag run to
:param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{execution_date}``.
:param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{logical_date}``.
:return: Newly created and existing dag runs for the execution dates supplied.
"""
# Find out existing DAG runs that we don't need to create.
dag_runs = {
run.logical_date: run
for run in DagRun.find(dag_id=dag.dag_id, execution_date=[info.logical_date for info in infos])
for run in DagRun.find(dag_id=dag.dag_id, logical_date=[info.logical_date for info in infos])
}

for info in infos:
if info.logical_date not in dag_runs:
dag_runs[info.logical_date] = dag.create_dagrun(
execution_date=info.logical_date,
logical_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
external_trigger=False,
Expand All @@ -87,7 +86,6 @@ def set_state(
*,
tasks: Collection[Operator | tuple[Operator, int]],
run_id: str | None = None,
execution_date: datetime | None = None,
upstream: bool = False,
downstream: bool = False,
future: bool = False,
Expand All @@ -107,7 +105,6 @@ def set_state(
:param tasks: the iterable of tasks or (task, map_index) tuples from which to work.
``task.dag`` needs to be set
:param run_id: the run_id of the dagrun to start looking from
:param execution_date: the execution date from which to start looking (deprecated)
:param upstream: Mark all parents (upstream tasks)
:param downstream: Mark all siblings (downstream tasks) of task_id
:param future: Mark all future tasks on the interval of the dag up until
Expand All @@ -121,21 +118,12 @@ def set_state(
if not tasks:
return []

if not exactly_one(execution_date, run_id):
raise ValueError("Exactly one of dag_run_id and execution_date must be set")

if execution_date and not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")

task_dags = {task[0].dag if isinstance(task, tuple) else task.dag for task in tasks}
if len(task_dags) > 1:
raise ValueError(f"Received tasks from multiple DAGs: {task_dags}")
dag = next(iter(task_dags))
if dag is None:
raise ValueError("Received tasks with no DAG")

if execution_date:
run_id = dag.get_dagrun(execution_date=execution_date, session=session).run_id
if not run_id:
raise ValueError("Received tasks with no run_id")

Expand Down Expand Up @@ -200,26 +188,26 @@ def find_task_relatives(tasks, downstream, upstream):


@provide_session
def get_execution_dates(
dag: DAG, execution_date: datetime, future: bool, past: bool, *, session: SASession = NEW_SESSION
def get_logical_dates(
dag: DAG, logical_date: datetime, future: bool, past: bool, *, session: SASession = NEW_SESSION
) -> list[datetime]:
"""Return DAG execution dates."""
latest_execution_date = dag.get_latest_execution_date(session=session)
if latest_execution_date is None:
raise ValueError(f"Received non-localized date {execution_date}")
execution_date = timezone.coerce_datetime(execution_date)
latest_logical_date = dag.get_latest_logical_date(session=session)
if latest_logical_date is None:
raise ValueError(f"Received non-localized date {logical_date}")
logical_date = timezone.coerce_datetime(logical_date)
# determine date range of dag runs and tasks to consider
end_date = latest_execution_date if future else execution_date
end_date = latest_logical_date if future else logical_date
if dag.start_date:
start_date = dag.start_date
else:
start_date = execution_date
start_date = execution_date if not past else start_date
start_date = logical_date
start_date = logical_date if not past else start_date
if not dag.timetable.can_be_scheduled:
# If the DAG never schedules, need to look at existing DagRun if the user wants future or
# past runs.
dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date)
dates = sorted({d.execution_date for d in dag_runs})
dates = sorted({d.logical_date for d in dag_runs})
elif not dag.timetable.periodic:
dates = [start_date]
else:
Expand All @@ -235,7 +223,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
last_dagrun = dag.get_last_dagrun(include_externally_triggered=True, session=session)
current_dagrun = dag.get_dagrun(run_id=run_id, session=session)
first_dagrun = session.scalar(
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date.asc()).limit(1)
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.logical_date.asc()).limit(1)
)

if last_dagrun is None:
Expand All @@ -255,7 +243,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
dates = [
info.logical_date for info in dag.iter_dagrun_infos_between(start_date, end_date, align=False)
]
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, execution_date=dates, session=session)]
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, logical_date=dates, session=session)]
return run_ids


Expand All @@ -279,7 +267,6 @@ def _set_dag_run_state(dag_id: str, run_id: str, state: DagRunState, session: SA
def set_dag_run_state_to_success(
*,
dag: DAG,
execution_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
Expand All @@ -290,27 +277,15 @@ def set_dag_run_state_to_success(
Set for a specific execution date and its task instances to success.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking(deprecated)
:param run_id: the run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: ValueError if dag or execution_date is invalid
:raises: ValueError if dag or logical_date is invalid
"""
if not exactly_one(execution_date, run_id):
return []

if not dag:
return []

if execution_date:
if not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
dag_run = dag.get_dagrun(execution_date=execution_date)
if not dag_run:
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
run_id = dag_run.run_id
if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")
# Mark the dag run to success.
Expand All @@ -333,7 +308,6 @@ def set_dag_run_state_to_success(
def set_dag_run_state_to_failed(
*,
dag: DAG,
execution_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
Expand All @@ -344,27 +318,15 @@ def set_dag_run_state_to_failed(
Set for a specific execution date and its task instances to failed.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking(deprecated)
:param run_id: the DAG run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: AssertionError if dag or execution_date is invalid
:raises: AssertionError if dag or logical_date is invalid
"""
if not exactly_one(execution_date, run_id):
return []
if not dag:
return []

if execution_date:
if not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
dag_run = dag.get_dagrun(execution_date=execution_date)
if not dag_run:
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
run_id = dag_run.run_id

if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")

Expand Down Expand Up @@ -429,7 +391,6 @@ def __set_dag_run_state_to_running_or_queued(
*,
new_state: DagRunState,
dag: DAG,
execution_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession,
Expand All @@ -438,28 +399,15 @@ def __set_dag_run_state_to_running_or_queued(
Set the dag run for a specific execution date to running.
:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param run_id: the id of the DagRun
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
"""
res: list[TaskInstance] = []

if not exactly_one(execution_date, run_id):
return res

if not dag:
return res

if execution_date:
if not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
dag_run = dag.get_dagrun(execution_date=execution_date)
if not dag_run:
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
run_id = dag_run.run_id
if not run_id:
raise ValueError(f"DagRun with run_id: {run_id} not found")
# Mark the dag run to running.
Expand All @@ -474,7 +422,6 @@ def __set_dag_run_state_to_running_or_queued(
def set_dag_run_state_to_running(
*,
dag: DAG,
execution_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
Expand All @@ -487,7 +434,6 @@ def set_dag_run_state_to_running(
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.RUNNING,
dag=dag,
execution_date=execution_date,
run_id=run_id,
commit=commit,
session=session,
Expand All @@ -498,7 +444,6 @@ def set_dag_run_state_to_running(
def set_dag_run_state_to_queued(
*,
dag: DAG,
execution_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
Expand All @@ -511,7 +456,6 @@ def set_dag_run_state_to_queued(
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.QUEUED,
dag=dag,
execution_date=execution_date,
run_id=run_id,
commit=commit,
session=session,
Expand Down
36 changes: 18 additions & 18 deletions airflow/api/common/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _trigger_dag(
triggered_by: DagRunTriggeredByType,
run_id: str | None = None,
conf: dict | str | None = None,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
replace_microseconds: bool = True,
) -> DagRun | None:
"""
Expand All @@ -52,9 +52,9 @@ def _trigger_dag(
:param dag_id: DAG ID
:param dag_bag: DAG Bag model
:param triggered_by: the entity which triggers the dag_run
:param run_id: ID of the dag_run
:param run_id: ID of the run
:param conf: configuration
:param execution_date: date of execution
:param logical_date: logical date of the run
:param replace_microseconds: whether microseconds should be zeroed
:return: list of triggered dags
"""
Expand All @@ -63,39 +63,39 @@ def _trigger_dag(
if dag is None or dag_id not in dag_bag.dags:
raise DagNotFound(f"Dag id {dag_id} not found")

execution_date = execution_date or timezone.utcnow()
logical_date = logical_date or timezone.utcnow()

if not timezone.is_localized(execution_date):
raise ValueError("The execution_date should be localized")
if not timezone.is_localized(logical_date):
raise ValueError("The logical date should be localized")

if replace_microseconds:
execution_date = execution_date.replace(microsecond=0)
logical_date = logical_date.replace(microsecond=0)

if dag.default_args and "start_date" in dag.default_args:
min_dag_start_date = dag.default_args["start_date"]
if min_dag_start_date and execution_date < min_dag_start_date:
if min_dag_start_date and logical_date < min_dag_start_date:
raise ValueError(
f"The execution_date [{execution_date.isoformat()}] should be >= start_date "
f"Logical date [{logical_date.isoformat()}] should be >= start_date "
f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
)
logical_date = timezone.coerce_datetime(execution_date)
coerced_logical_date = timezone.coerce_datetime(logical_date)

data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)
run_id = run_id or dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL, logical_date=logical_date, data_interval=data_interval
run_type=DagRunType.MANUAL, logical_date=coerced_logical_date, data_interval=data_interval
)
dag_run = DagRun.find_duplicate(dag_id=dag_id, execution_date=execution_date, run_id=run_id)
dag_run = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id)

if dag_run:
raise DagRunAlreadyExists(dag_run=dag_run, execution_date=execution_date, run_id=run_id)
raise DagRunAlreadyExists(dag_run)

run_conf = None
if conf:
run_conf = conf if isinstance(conf, dict) else json.loads(conf)

dag_run = dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
logical_date=logical_date,
state=DagRunState.QUEUED,
conf=run_conf,
external_trigger=True,
Expand All @@ -115,7 +115,7 @@ def trigger_dag(
triggered_by: DagRunTriggeredByType,
run_id: str | None = None,
conf: dict | str | None = None,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
replace_microseconds: bool = True,
session: Session = NEW_SESSION,
) -> DagRun | None:
Expand All @@ -125,7 +125,7 @@ def trigger_dag(
:param dag_id: DAG ID
:param run_id: ID of the dag_run
:param conf: configuration
:param execution_date: date of execution
:param logical_date: date of execution
:param replace_microseconds: whether microseconds should be zeroed
:param session: Unused. Only added in compatibility with database isolation mode
:param triggered_by: the entity which triggers the dag_run
Expand All @@ -141,7 +141,7 @@ def trigger_dag(
dag_bag=dagbag,
run_id=run_id,
conf=conf,
execution_date=execution_date,
logical_date=logical_date,
replace_microseconds=replace_microseconds,
triggered_by=triggered_by,
)
Expand Down
Loading

0 comments on commit 7b05041

Please sign in to comment.