From 7e8159a440d630ec6b383fc5d9597165fbaae65b Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Thu, 31 Oct 2024 19:23:01 +0100 Subject: [PATCH] Test standard provider with Airflow 2.8 and 2.9 The standard provider has now min version of Airflow = 2.8 since #43553, but we have not tested it for Airflow 2.8 and 2.9. --- .../src/airflow_breeze/global_constants.py | 4 +- .../providers/standard/operators/python.py | 58 +++++++++++++------ .../providers/standard/sensors/date_time.py | 23 +++++++- .../providers/standard/sensors/time.py | 23 +++++++- .../providers/standard/sensors/time_delta.py | 11 +++- .../tests/openlineage/utils/test_utils.py | 5 +- .../tests/standard/operators/test_python.py | 48 +++++++++------ 7 files changed, 126 insertions(+), 46 deletions(-) diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index a674b142b3c3e..db9c3556e9b17 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -574,13 +574,13 @@ def get_airflow_extras(): { "python-version": "3.9", "airflow-version": "2.8.4", - "remove-providers": "cloudant fab edge standard", + "remove-providers": "cloudant fab edge", "run-tests": "true", }, { "python-version": "3.9", "airflow-version": "2.9.3", - "remove-providers": "cloudant edge standard", + "remove-providers": "cloudant edge", "run-tests": "true", }, { diff --git a/providers/src/airflow/providers/standard/operators/python.py b/providers/src/airflow/providers/standard/operators/python.py index fb4babaf4aa2a..cd1aec7da752d 100644 --- a/providers/src/airflow/providers/standard/operators/python.py +++ b/providers/src/airflow/providers/standard/operators/python.py @@ -54,15 +54,16 @@ from airflow.settings import _ENABLE_AIP_44 from airflow.typing_compat import Literal from airflow.utils import hashlib_wrapper -from airflow.utils.context import context_copy_partial, context_get_outlet_events, context_merge +from airflow.utils.context import context_copy_partial, context_merge from airflow.utils.file import get_unique_dag_module_name -from airflow.utils.operator_helpers import ExecutionCallableRunner, KeywordParameters -from airflow.utils.process_utils import execute_in_subprocess +from airflow.utils.operator_helpers import KeywordParameters +from airflow.utils.process_utils import execute_in_subprocess, execute_in_subprocess_with_kwargs from airflow.utils.session import create_session log = logging.getLogger(__name__) AIRFLOW_VERSION = Version(airflow_version) +AIRFLOW_V_2_10_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.10.0") AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0") if TYPE_CHECKING: @@ -187,7 +188,15 @@ def __init__( def execute(self, context: Context) -> Any: context_merge(context, self.op_kwargs, templates_dict=self.templates_dict) self.op_kwargs = self.determine_kwargs(context) - self._asset_events = context_get_outlet_events(context) + + if AIRFLOW_V_3_0_PLUS: + from airflow.utils.context import context_get_outlet_events + + self._asset_events = context_get_outlet_events(context) + elif AIRFLOW_V_2_10_PLUS: + from airflow.utils.context import context_get_outlet_events + + self._dataset_events = context_get_outlet_events(context) return_value = self.execute_callable() if self.show_return_value_in_logs: @@ -206,7 +215,15 @@ def execute_callable(self) -> Any: :return: the return value of the call. """ - runner = ExecutionCallableRunner(self.python_callable, self._asset_events, logger=self.log) + try: + from airflow.utils.operator_helpers import ExecutionCallableRunner + + asset_events = self._asset_events if AIRFLOW_V_3_0_PLUS else self._dataset_events + + runner = ExecutionCallableRunner(self.python_callable, asset_events, logger=self.log) + except ImportError: + # Handle Pre Airflow 3.10 case where ExecutionCallableRunner was not available + return self.python_callable(*self.op_args, **self.op_kwargs) return runner.run(*self.op_args, **self.op_kwargs) @@ -551,18 +568,25 @@ def _execute_python_callable_in_subprocess(self, python_path: Path): env_vars.update(self.env_vars) try: - execute_in_subprocess( - cmd=[ - os.fspath(python_path), - os.fspath(script_path), - os.fspath(input_path), - os.fspath(output_path), - os.fspath(string_args_path), - os.fspath(termination_log_path), - os.fspath(airflow_context_path), - ], - env=env_vars, - ) + cmd: list[str] = [ + os.fspath(python_path), + os.fspath(script_path), + os.fspath(input_path), + os.fspath(output_path), + os.fspath(string_args_path), + os.fspath(termination_log_path), + os.fspath(airflow_context_path), + ] + if AIRFLOW_V_2_10_PLUS: + execute_in_subprocess( + cmd=cmd, + env=env_vars, + ) + else: + execute_in_subprocess_with_kwargs( + cmd=cmd, + env=env_vars, + ) except subprocess.CalledProcessError as e: if e.returncode in self.skip_on_exit_code: raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.") diff --git a/providers/src/airflow/providers/standard/sensors/date_time.py b/providers/src/airflow/providers/standard/sensors/date_time.py index 20a6a484e05a4..468d7be23cc63 100644 --- a/providers/src/airflow/providers/standard/sensors/date_time.py +++ b/providers/src/airflow/providers/standard/sensors/date_time.py @@ -18,10 +18,27 @@ from __future__ import annotations import datetime +from dataclasses import dataclass from typing import TYPE_CHECKING, Any, NoReturn, Sequence +from airflow.providers.standard.operators.python import AIRFLOW_V_3_0_PLUS from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.base import StartTriggerArgs + +try: + from airflow.triggers.base import StartTriggerArgs +except ImportError: + # TODO: Remove this when min airflow version is 2.10.0 for standard provider + @dataclass + class StartTriggerArgs: # type: ignore[no-redef] + """Arguments required for start task execution from triggerer.""" + + trigger_cls: str + next_method: str + trigger_kwargs: dict[str, Any] | None = None + next_kwargs: dict[str, Any] | None = None + timeout: datetime.timedelta | None = None + + from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -125,7 +142,9 @@ def execute(self, context: Context) -> NoReturn: trigger=DateTimeTrigger( moment=timezone.parse(self.target_time), end_from_trigger=self.end_from_trigger, - ), + ) + if AIRFLOW_V_3_0_PLUS + else DateTimeTrigger(moment=timezone.parse(self.target_time)), ) def execute_complete(self, context: Context, event: Any = None) -> None: diff --git a/providers/src/airflow/providers/standard/sensors/time.py b/providers/src/airflow/providers/standard/sensors/time.py index 6dba2628fce35..19e6a4bf11b32 100644 --- a/providers/src/airflow/providers/standard/sensors/time.py +++ b/providers/src/airflow/providers/standard/sensors/time.py @@ -18,10 +18,27 @@ from __future__ import annotations import datetime +from dataclasses import dataclass from typing import TYPE_CHECKING, Any, NoReturn +from airflow.providers.standard.operators.python import AIRFLOW_V_3_0_PLUS from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.base import StartTriggerArgs + +try: + from airflow.triggers.base import StartTriggerArgs +except ImportError: + # TODO: Remove this when min airflow version is 2.10.0 for standard provider + @dataclass + class StartTriggerArgs: # type: ignore[no-redef] + """Arguments required for start task execution from triggerer.""" + + trigger_cls: str + next_method: str + trigger_kwargs: dict[str, Any] | None = None + next_kwargs: dict[str, Any] | None = None + timeout: datetime.timedelta | None = None + + from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -102,7 +119,9 @@ def __init__( def execute(self, context: Context) -> NoReturn: self.defer( - trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger), + trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger) + if AIRFLOW_V_3_0_PLUS + else DateTimeTrigger(moment=self.target_datetime), method_name="execute_complete", ) diff --git a/providers/src/airflow/providers/standard/sensors/time_delta.py b/providers/src/airflow/providers/standard/sensors/time_delta.py index dc78a0e33bc42..c0b811791826e 100644 --- a/providers/src/airflow/providers/standard/sensors/time_delta.py +++ b/providers/src/airflow/providers/standard/sensors/time_delta.py @@ -27,6 +27,8 @@ from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger from airflow.utils import timezone +from tests_common.test_utils.compat import AIRFLOW_V_3_0_PLUS + if TYPE_CHECKING: from airflow.utils.context import Context @@ -81,7 +83,10 @@ def execute(self, context: Context) -> bool | NoReturn: # If the target datetime is in the past, return immediately return True try: - trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger) + if AIRFLOW_V_3_0_PLUS: + trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger) + else: + trigger = DateTimeTrigger(moment=target_dttm) except (TypeError, ValueError) as e: if self.soft_fail: raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e @@ -121,7 +126,9 @@ def __init__( def execute(self, context: Context) -> None: if self.deferrable: self.defer( - trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True), + trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True) + if AIRFLOW_V_3_0_PLUS + else TimeDeltaTrigger(self.time_to_wait), method_name="execute_complete", ) else: diff --git a/providers/tests/openlineage/utils/test_utils.py b/providers/tests/openlineage/utils/test_utils.py index f4e286331a518..28be0b6306751 100644 --- a/providers/tests/openlineage/utils/test_utils.py +++ b/providers/tests/openlineage/utils/test_utils.py @@ -43,14 +43,11 @@ from airflow.utils.task_group import TaskGroup from airflow.utils.types import DagRunType -from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS, BashOperator, PythonOperator +from tests_common.test_utils.compat import BashOperator, PythonOperator from tests_common.test_utils.mock_operators import MockOperator BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash" PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python" -if not AIRFLOW_V_2_10_PLUS: - BASH_OPERATOR_PATH = "airflow.operators.bash" - PYTHON_OPERATOR_PATH = "airflow.operators.python" class CustomOperatorForTest(BashOperator): diff --git a/providers/tests/standard/operators/test_python.py b/providers/tests/standard/operators/test_python.py index b8a8ef5bc1228..75edc2621c5d6 100644 --- a/providers/tests/standard/operators/test_python.py +++ b/providers/tests/standard/operators/test_python.py @@ -51,6 +51,7 @@ from airflow.models.taskinstance import TaskInstance, clear_task_instances, set_current_context from airflow.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import ( + AIRFLOW_V_2_10_PLUS, BranchExternalPythonOperator, BranchPythonOperator, BranchPythonVirtualenvOperator, @@ -509,7 +510,7 @@ def f(): ti = self.create_ti(f) with pytest.raises( AirflowException, - match="'branch_task_ids' expected all task IDs are strings.", + match=r"'branch_task_ids'.*task.*", ): ti.run() @@ -518,7 +519,9 @@ def f(): return "some_task_id" ti = self.create_ti(f) - with pytest.raises(AirflowException, match="Invalid tasks found: {'some_task_id'}"): + with pytest.raises( + AirflowException, match=r"Invalid tasks found: {\(False, 'bool'\)}.|'branch_task_ids'.*task.*" + ): ti.run() @pytest.mark.skip_if_database_isolation_mode # tests pure logic with run() method, can not run in isolation mode @@ -903,9 +906,12 @@ def test_virtualenv_serializable_context_fields(self, create_task_instance): "ti", "var", # Accessor for Variable; var->json and var->value. "conn", # Accessor for Connection. - "inlet_events", # Accessor for inlet AssetEvent. - "outlet_events", # Accessor for outlet AssetEvent. ] + if AIRFLOW_V_2_10_PLUS: + intentionally_excluded_context_keys.extend( + # Accessors for inlet_events and outlet_events + ["inlet_events", "outlet_events"] + ) ti = create_task_instance(dag_id=self.dag_id, task_id=self.task_id, schedule=None) context = ti.get_template_context() @@ -1035,7 +1041,7 @@ def f(): context = get_current_context() if not isinstance(context, Context): # type: ignore[misc] - error_msg = f"Expected Context, got {type(context)}" + error_msg = f"Expected Context, got {type(context)}:{context!r}" raise TypeError(error_msg) return [] @@ -1086,7 +1092,7 @@ def f(): context = get_current_context() if not isinstance(context, Context): # type: ignore[misc] - error_msg = f"Expected Context, got {type(context)}" + error_msg = f"Expected Context, got {type(context)}:{context!r}" raise TypeError(error_msg) return [] @@ -1520,7 +1526,7 @@ def f(): context = get_current_context() if not isinstance(context, Context): # type: ignore[misc] - error_msg = f"Expected Context, got {type(context)}" + error_msg = f"Expected Context, got {type(context)}:{context!r}" raise TypeError(error_msg) return [] @@ -1627,21 +1633,25 @@ def f(a, b, c=False, d=False): else: raise RuntimeError - with pytest.raises(AirflowException, match=r"Invalid tasks found: {\((True|False), 'bool'\)}"): + with pytest.raises( + AirflowException, match=r"Invalid tasks found: {\(False, 'bool'\)}.|'branch_task_ids'.*task.*" + ): self.run_as_task(f, op_args=[0, 1], op_kwargs={"c": True}) def test_return_false(self): def f(): return False - with pytest.raises(AirflowException, match=r"Invalid tasks found: {\(False, 'bool'\)}."): + with pytest.raises( + AirflowException, match=r"Invalid tasks found: {\(False, 'bool'\)}.|'branch_task_ids'.*task.*" + ): self.run_as_task(f) def test_context(self): def f(templates_dict): return templates_dict["ds"] - with pytest.raises(AirflowException, match="Invalid tasks found:"): + with pytest.raises(AirflowException, match="Invalid tasks found:|'branch_task_ids'.*task.*"): self.run_as_task(f, templates_dict={"ds": "{{ ds }}"}) def test_environment_variables(self): @@ -1652,7 +1662,7 @@ def f(): with pytest.raises( AirflowException, - match=r"'branch_task_ids' must contain only valid task_ids. Invalid tasks found: {'ABCDE'}", + match=r"'branch_task_ids'.*task.*", ): self.run_as_task(f, env_vars={"MY_ENV_VAR": "ABCDE"}) @@ -1666,7 +1676,7 @@ def f(): with pytest.raises( AirflowException, - match=r"'branch_task_ids' must contain only valid task_ids. Invalid tasks found: {'QWERT'}", + match=r"'branch_task_ids'.*task.*", ): self.run_as_task(f, inherit_env=True) @@ -1691,7 +1701,7 @@ def f(): with pytest.raises( AirflowException, - match=r"'branch_task_ids' must contain only valid task_ids. Invalid tasks found: {'EFGHI'}", + match=r"'branch_task_ids'.*task.*", ): self.run_as_task(f, env_vars={"MY_ENV_VAR": "EFGHI"}, inherit_env=True) @@ -1706,7 +1716,9 @@ def test_with_no_caching(self): def f(): return False - with pytest.raises(AirflowException, match=r"Invalid tasks found: {\(False, 'bool'\)}."): + with pytest.raises( + AirflowException, match=r"Invalid tasks found: {\(False, 'bool'\)}.|'branch_task_ids'.*task.*" + ): self.run_as_task(f, do_not_use_caching=True) def test_with_dag_run(self): @@ -1827,7 +1839,7 @@ def f(): ti = self.create_ti(f) with pytest.raises( AirflowException, - match="'branch_task_ids' expected all task IDs are strings.", + match=r"'branch_task_ids'.*task.*", ): ti.run() @@ -1836,7 +1848,9 @@ def f(): return "some_task_id" ti = self.create_ti(f) - with pytest.raises(AirflowException, match="Invalid tasks found: {'some_task_id'}"): + with pytest.raises( + AirflowException, match=r"Invalid tasks found: {\(False, 'bool'\)}.|'branch_task_ids'.*task.*" + ): ti.run() @@ -1866,7 +1880,7 @@ def f(): context = get_current_context() if not isinstance(context, Context): # type: ignore[misc] - error_msg = f"Expected Context, got {type(context)}" + error_msg = f"Expected Context, got {type(context)}:{context!r}" raise TypeError(error_msg) return []