Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test standard provider with Airflow 2.8 and 2.9 #43556

Merged
merged 1 commit into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,13 @@ def get_airflow_extras():
{
"python-version": "3.9",
"airflow-version": "2.8.4",
"remove-providers": "cloudant fab edge standard",
"remove-providers": "cloudant fab edge",
"run-tests": "true",
},
{
"python-version": "3.9",
"airflow-version": "2.9.3",
"remove-providers": "cloudant edge standard",
"remove-providers": "cloudant edge",
"run-tests": "true",
},
{
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/howto/operator/python.rst
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ With some limitations, you can also use ``Context`` in virtual environments.

You can also use ``get_current_context()`` in the same way as before, but with some limitations.

* Requires ``pydantic>=2``.
* Requires ``apache-airflow>=3.0.0``.

* Set ``use_airflow_context`` to ``True`` to call ``get_current_context()`` in the virtual environment.

Expand Down
9 changes: 9 additions & 0 deletions providers/src/airflow/providers/standard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from packaging.version import Version

from airflow import __version__ as airflow_version

AIRFLOW_VERSION = Version(airflow_version)
AIRFLOW_V_2_10_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("2.10.0")
AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0")
84 changes: 56 additions & 28 deletions providers/src/airflow/providers/standard/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Mapping, NamedTuple, Sequence, cast

import lazy_object_proxy
from packaging.version import Version

from airflow import __version__ as airflow_version
from airflow.exceptions import (
AirflowConfigException,
AirflowException,
Expand All @@ -50,21 +48,19 @@
from airflow.models.taskinstance import _CURRENT_CONTEXT
from airflow.models.variable import Variable
from airflow.operators.branch import BranchMixIn
from airflow.providers.standard import AIRFLOW_V_2_10_PLUS, AIRFLOW_V_3_0_PLUS
from airflow.providers.standard.utils.python_virtualenv import prepare_virtualenv, write_python_script
from airflow.settings import _ENABLE_AIP_44
from airflow.typing_compat import Literal
from airflow.utils import hashlib_wrapper
from airflow.utils.context import context_copy_partial, context_get_outlet_events, context_merge
from airflow.utils.context import context_copy_partial, context_merge
from airflow.utils.file import get_unique_dag_module_name
from airflow.utils.operator_helpers import ExecutionCallableRunner, KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess
from airflow.utils.operator_helpers import KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess, execute_in_subprocess_with_kwargs
from airflow.utils.session import create_session

log = logging.getLogger(__name__)

AIRFLOW_VERSION = Version(airflow_version)
AIRFLOW_V_3_0_PLUS = Version(AIRFLOW_VERSION.base_version) >= Version("3.0.0")

if TYPE_CHECKING:
from pendulum.datetime import DateTime

Expand Down Expand Up @@ -187,7 +183,15 @@ def __init__(
def execute(self, context: Context) -> Any:
context_merge(context, self.op_kwargs, templates_dict=self.templates_dict)
self.op_kwargs = self.determine_kwargs(context)
self._asset_events = context_get_outlet_events(context)

potiuk marked this conversation as resolved.
Show resolved Hide resolved
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.context import context_get_outlet_events

self._asset_events = context_get_outlet_events(context)
elif AIRFLOW_V_2_10_PLUS:
from airflow.utils.context import context_get_outlet_events

self._dataset_events = context_get_outlet_events(context)

return_value = self.execute_callable()
if self.show_return_value_in_logs:
Expand All @@ -206,7 +210,15 @@ def execute_callable(self) -> Any:

:return: the return value of the call.
"""
runner = ExecutionCallableRunner(self.python_callable, self._asset_events, logger=self.log)
try:
from airflow.utils.operator_helpers import ExecutionCallableRunner

asset_events = self._asset_events if AIRFLOW_V_3_0_PLUS else self._dataset_events

runner = ExecutionCallableRunner(self.python_callable, asset_events, logger=self.log)
except ImportError:
# Handle Pre Airflow 3.10 case where ExecutionCallableRunner was not available
return self.python_callable(*self.op_args, **self.op_kwargs)
return runner.run(*self.op_args, **self.op_kwargs)


Expand Down Expand Up @@ -348,7 +360,6 @@ class _BasePythonVirtualenvOperator(PythonOperator, metaclass=ABCMeta):
"ds_nodash",
"expanded_ti_count",
"inlets",
"map_index_template",
"next_ds",
"next_ds_nodash",
"outlets",
Expand Down Expand Up @@ -551,18 +562,25 @@ def _execute_python_callable_in_subprocess(self, python_path: Path):
env_vars.update(self.env_vars)

try:
execute_in_subprocess(
cmd=[
os.fspath(python_path),
os.fspath(script_path),
os.fspath(input_path),
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
os.fspath(airflow_context_path),
],
env=env_vars,
)
cmd: list[str] = [
os.fspath(python_path),
os.fspath(script_path),
os.fspath(input_path),
os.fspath(output_path),
os.fspath(string_args_path),
os.fspath(termination_log_path),
os.fspath(airflow_context_path),
]
if AIRFLOW_V_2_10_PLUS:
execute_in_subprocess(
cmd=cmd,
env=env_vars,
)
else:
execute_in_subprocess_with_kwargs(
cmd=cmd,
env=env_vars,
)
except subprocess.CalledProcessError as e:
if e.returncode in self.skip_on_exit_code:
raise AirflowSkipException(f"Process exited with code {e.returncode}. Skipping.")
Expand Down Expand Up @@ -697,10 +715,15 @@ def __init__(
raise AirflowException(
"Passing non-string types (e.g. int or float) as python_version not supported"
)

if use_airflow_context and not AIRFLOW_V_3_0_PLUS:
raise AirflowException(
"The `use_airflow_context=True` is only supported in Airflow 3.0.0 and later."
)
if use_airflow_context and (not expect_airflow and not system_site_packages):
error_msg = "use_airflow_context is set to True, but expect_airflow and system_site_packages are set to False."
raise AirflowException(error_msg)
raise AirflowException(
"The `use_airflow_context` parameter is set to True, but "
"expect_airflow and system_site_packages are set to False."
)
if not requirements:
self.requirements: list[str] = []
elif isinstance(requirements, str):
Expand Down Expand Up @@ -976,9 +999,14 @@ def __init__(
):
if not python:
raise ValueError("Python Path must be defined in ExternalPythonOperator")
if use_airflow_context and not AIRFLOW_V_3_0_PLUS:
raise AirflowException(
"The `use_airflow_context=True` is only supported in Airflow 3.0.0 and later."
)
if use_airflow_context and not expect_airflow:
error_msg = "use_airflow_context is set to True, but expect_airflow is set to False."
raise AirflowException(error_msg)
raise AirflowException(
"The `use_airflow_context` parameter is set to True, but expect_airflow is set to False."
)
self.python = python
self.expect_pendulum = expect_pendulum
super().__init__(
Expand Down
23 changes: 21 additions & 2 deletions providers/src/airflow/providers/standard/sensors/date_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,27 @@
from __future__ import annotations

import datetime
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, NoReturn, Sequence

from airflow.providers.standard import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None


from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

Expand Down Expand Up @@ -125,7 +142,9 @@ def execute(self, context: Context) -> NoReturn:
trigger=DateTimeTrigger(
moment=timezone.parse(self.target_time),
end_from_trigger=self.end_from_trigger,
),
)
if AIRFLOW_V_3_0_PLUS
else DateTimeTrigger(moment=timezone.parse(self.target_time)),
)

def execute_complete(self, context: Context, event: Any = None) -> None:
Expand Down
23 changes: 21 additions & 2 deletions providers/src/airflow/providers/standard/sensors/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,27 @@
from __future__ import annotations

import datetime
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, NoReturn

from airflow.providers.standard import AIRFLOW_V_2_10_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

try:
from airflow.triggers.base import StartTriggerArgs
except ImportError:
# TODO: Remove this when min airflow version is 2.10.0 for standard provider
@dataclass
class StartTriggerArgs: # type: ignore[no-redef]
"""Arguments required for start task execution from triggerer."""

trigger_cls: str
next_method: str
trigger_kwargs: dict[str, Any] | None = None
next_kwargs: dict[str, Any] | None = None
timeout: datetime.timedelta | None = None


from airflow.triggers.temporal import DateTimeTrigger
from airflow.utils import timezone

Expand Down Expand Up @@ -102,7 +119,9 @@ def __init__(

def execute(self, context: Context) -> NoReturn:
self.defer(
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger),
trigger=DateTimeTrigger(moment=self.target_datetime, end_from_trigger=self.end_from_trigger)
if AIRFLOW_V_2_10_PLUS
else DateTimeTrigger(moment=self.target_datetime),
method_name="execute_complete",
)

Expand Down
10 changes: 8 additions & 2 deletions providers/src/airflow/providers/standard/sensors/time_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from airflow.configuration import conf
from airflow.exceptions import AirflowSkipException
from airflow.providers.standard import AIRFLOW_V_3_0_PLUS
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
Expand Down Expand Up @@ -81,7 +82,10 @@ def execute(self, context: Context) -> bool | NoReturn:
# If the target datetime is in the past, return immediately
return True
try:
trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
if AIRFLOW_V_3_0_PLUS:
trigger = DateTimeTrigger(moment=target_dttm, end_from_trigger=self.end_from_trigger)
else:
trigger = DateTimeTrigger(moment=target_dttm)
except (TypeError, ValueError) as e:
if self.soft_fail:
raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e
Expand Down Expand Up @@ -121,7 +125,9 @@ def __init__(
def execute(self, context: Context) -> None:
if self.deferrable:
self.defer(
trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True),
trigger=TimeDeltaTrigger(self.time_to_wait, end_from_trigger=True)
if AIRFLOW_V_3_0_PLUS
else TimeDeltaTrigger(self.time_to_wait),
method_name="execute_complete",
)
else:
Expand Down
1 change: 0 additions & 1 deletion providers/tests/common/sql/operators/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

pytestmark = [
pytest.mark.db_test,
pytest.mark.skipif(reason="Tests for Airflow 2.8.0+ only"),
pytest.mark.skip_if_database_isolation_mode,
]

Expand Down
12 changes: 3 additions & 9 deletions providers/tests/openlineage/plugins/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,6 @@
if AIRFLOW_V_3_0_PLUS:
from airflow.utils.types import DagRunTriggeredByType

BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
if not AIRFLOW_V_2_10_PLUS:
BASH_OPERATOR_PATH = "airflow.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.operators.python"


class SafeStrDict(dict):
def __str__(self):
Expand Down Expand Up @@ -276,7 +270,7 @@ def test_get_fully_qualified_class_name():
from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter

result = get_fully_qualified_class_name(BashOperator(task_id="test", bash_command="exit 0;"))
assert result == f"{BASH_OPERATOR_PATH}.BashOperator"
assert result == "airflow.providers.standard.operators.bash.BashOperator"

result = get_fully_qualified_class_name(OpenLineageAdapter())
assert result == "airflow.providers.openlineage.plugins.adapter.OpenLineageAdapter"
Expand All @@ -292,8 +286,8 @@ def test_is_operator_disabled(mock_disabled_operators):
assert is_operator_disabled(op) is False

mock_disabled_operators.return_value = {
f"{BASH_OPERATOR_PATH}.BashOperator",
f"{PYTHON_OPERATOR_PATH}.PythonOperator",
"airflow.providers.standard.operators.bash.BashOperator",
"airflow.providers.standard.operators.python.PythonOperator",
}
assert is_operator_disabled(op) is True

Expand Down
5 changes: 1 addition & 4 deletions providers/tests/openlineage/utils/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,11 @@
from airflow.utils.task_group import TaskGroup
from airflow.utils.types import DagRunType

from tests_common.test_utils.compat import AIRFLOW_V_2_10_PLUS, BashOperator, PythonOperator
from tests_common.test_utils.compat import BashOperator, PythonOperator
from tests_common.test_utils.mock_operators import MockOperator

BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.providers.standard.operators.python"
if not AIRFLOW_V_2_10_PLUS:
potiuk marked this conversation as resolved.
Show resolved Hide resolved
BASH_OPERATOR_PATH = "airflow.operators.bash"
PYTHON_OPERATOR_PATH = "airflow.operators.python"


class CustomOperatorForTest(BashOperator):
Expand Down
Loading