Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove all deprecations cncf.kubernetes #43689

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/cli/commands/kubernetes_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from airflow.providers.cncf.kubernetes import pod_generator
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubeConfig
from airflow.providers.cncf.kubernetes.kube_client import get_kube_client
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_pod_id
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils import cli as cli_utils, yaml
from airflow.utils.cli import get_dag
Expand Down Expand Up @@ -59,7 +59,7 @@ def generate_pod_yaml(args):
pod = PodGenerator.construct_pod(
dag_id=args.dag_id,
task_id=ti.task_id,
pod_id=create_pod_id(args.dag_id, ti.task_id),
pod_id=create_unique_id(args.dag_id, ti.task_id),
try_number=ti.try_number,
kube_image=kube_config.kube_image,
date=ti.execution_date,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ Read more on termination-log `here <https://kubernetes.io/docs/tasks/debug/debug
KubernetesPodOperator callbacks
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The :class:`~airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator` supports different
The :class:`~airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator` supports different
callbacks that can be used to trigger actions during the lifecycle of the pod. In order to use them, you need to
create a subclass of :class:`~airflow.providers.cncf.kubernetes.callbacks.KubernetesPodOperatorCallback` and override
the callbacks methods you want to use. Then you can pass your callback class to the operator using the ``callbacks``
Expand Down
18 changes: 0 additions & 18 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1201,24 +1201,6 @@ def test_changing_base_container_name_with_get_logs(self, mock_get_connection):
self.expected_pod["spec"]["containers"][0]["name"] = "apple-sauce"
assert self.expected_pod["spec"] == actual_pod["spec"]

def test_progess_call(self, mock_get_connection):
progress_callback = MagicMock()
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels=self.labels,
task_id=str(uuid4()),
in_cluster=False,
do_xcom_push=False,
get_logs=True,
progress_callback=progress_callback,
)
context = create_context(k)
k.execute(context)
progress_callback.assert_called()

def test_changing_base_container_name_no_logs(self, mock_get_connection):
"""
This test checks BOTH a modified base container name AND the get_logs=False flow,
Expand Down
6 changes: 4 additions & 2 deletions providers/src/airflow/providers/amazon/aws/operators/eks.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
try:
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
except ImportError:
# preserve backward compatibility for older versions of cncf.kubernetes provider
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
# preserve backward compatibility for older versions of cncf.kubernetes provider, remove this when minimum cncf.kubernetes provider is 10.0
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( # type: ignore[no-redef]
KubernetesPodOperator,
)

if TYPE_CHECKING:
from airflow.utils.context import Context
Expand Down
28 changes: 28 additions & 0 deletions providers/src/airflow/providers/cncf/kubernetes/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,34 @@ Changelog
main
.....

All deprecated classes, parameters and features have been removed from the Kubernetes provider package.
The following breaking changes were introduced:

* Helpers
* Remove ``add_pod_suffix`` method from ``kubernetes_helper_functions.py``. Use ``add_unique_suffix`` instead.
* Remove ``make_unique_pod_id`` method from ``PodGenerator``. Use ``add_unique_suffix`` in ``kubernetes_helper_functions`` instead.
* Remove ``create_pod_id`` method from ``kubernetes_helper_functions.py``. Use ``create_unique_id`` instead.
* Remove ``gen_pod`` method from ``PodGenerator``.
* Remove ``add_xcom_sidecar`` method from ``PodGenerator``. Use ``airflow.providers.cncf.kubernetes.utils.xcom_sidecar.add_xcom_sidecar`` instead.
* Remove the option to using a dictionary for the executor_config ``from_obj`` function in ``PodGenerator``. Use a ``kubernetes.client.models.V1Pod`` class with a "pod_override" key.
* Remove ``from_legacy_obj`` method from ``PodGenerator``.
* Remove ``airflow.providers.cncf.kubernetes.pod_launcher_deprecated`` module. Use ``airflow.providers.cncf.kubernetes.utils.pod_manager`` instead.

* Operators
* Remove ``airflow.providers.cncf.kubernetes.operators.kubernetes_pod``. Use ``airflow.providers.cncf.kubernetes.operators.pod`` instead.
* Remove ``is_delete_operator_pod`` parameters from ``KubernetesPodOperator``. Use ``on_finish_action`` instead.
* Remove ``progress_callback`` parameters from ``KubernetesPodOperator``. Use ``callbacks`` instead.
* Remove ``execute_complete`` method from ``KubernetesPodOperator``. Use ``trigger_reentry`` instead.
* Remove ``xcom_push`` parameter from ``SparkKubernetesOperator``. Use ``do_xcom_push``.

* Triggers
* Remove ``should_delete_pod`` parameter from ``KubernetesPodTrigger``. Use ``on_finish_action`` instead.

* Utils
* Remove ``progress_callback`` parameter from ``PodManager``.
* Remove ``follow_container_logs`` method from ``PodManager``. Use ``fetch_container_logs`` instead.


.. warning::
Set the default value of ``namespace`` in ``@task.kubernetes`` to ``None``, so it uses the cluster namespace when ``in_cluster`` is True. Be sure to specify a namespace when using this decorator. To retain the previous behavior, set ``namespace="default"``

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,10 @@
from typing import TYPE_CHECKING

import pendulum
from deprecated import deprecated
from kubernetes.client.rest import ApiException
from slugify import slugify

from airflow.configuration import conf
from airflow.exceptions import AirflowProviderDeprecationWarning

if TYPE_CHECKING:
from airflow.models.taskinstancekey import TaskInstanceKey
Expand Down Expand Up @@ -62,22 +60,6 @@ def add_unique_suffix(*, name: str, rand_len: int = 8, max_len: int = POD_NAME_M
return name[: max_len - len(suffix)].strip("-.") + suffix


@deprecated(
reason="This function is deprecated. Please use `add_unique_suffix`",
category=AirflowProviderDeprecationWarning,
)
def add_pod_suffix(*, pod_name: str, rand_len: int = 8, max_len: int = POD_NAME_MAX_LENGTH) -> str:
"""
Add random string to pod name while staying under max length.

:param pod_name: name of the pod
:param rand_len: length of the random string to append
:param max_len: maximum length of the pod name
:meta private:
"""
return add_unique_suffix(name=pod_name, rand_len=rand_len, max_len=max_len)


def create_unique_id(
dag_id: str | None = None,
task_id: str | None = None,
Expand Down Expand Up @@ -110,29 +92,6 @@ def create_unique_id(
return base_name


@deprecated(
reason="This function is deprecated. Please use `create_unique_id`.",
category=AirflowProviderDeprecationWarning,
)
def create_pod_id(
dag_id: str | None = None,
task_id: str | None = None,
*,
max_length: int = POD_NAME_MAX_LENGTH,
unique: bool = True,
) -> str:
"""
Generate unique pod ID given a dag_id and / or task_id.

:param dag_id: DAG ID
:param task_id: Task ID
:param max_length: max number of characters
:param unique: whether a random string suffix should be added
:return: A valid identifier for a kubernetes pod name
"""
return create_unique_id(dag_id=dag_id, task_id=task_id, max_length=max_length, unique=unique)


def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:
"""Build a TaskInstanceKey based on pod annotations."""
log.debug("Creating task key for annotations %s", annotations)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import re
import shlex
import string
import warnings
from collections.abc import Container, Mapping
from contextlib import AbstractContextManager
from enum import Enum
Expand All @@ -35,7 +34,6 @@

import kubernetes
import tenacity
from deprecated import deprecated
from kubernetes.client import CoreV1Api, V1Pod, models as k8s
from kubernetes.client.exceptions import ApiException
from kubernetes.stream import stream
Expand All @@ -44,7 +42,6 @@
from airflow.configuration import conf
from airflow.exceptions import (
AirflowException,
AirflowProviderDeprecationWarning,
AirflowSkipException,
TaskDeferred,
)
Expand Down Expand Up @@ -215,18 +212,12 @@ class KubernetesPodOperator(BaseOperator):
:param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted.
If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod",
only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod.
:param is_delete_operator_pod: What to do when the pod reaches its final
state, or the execution is interrupted. If True (default), delete the
pod; if False, leave the pod.
Deprecated - use `on_finish_action` instead.
:param termination_message_policy: The termination message policy of the base container.
Default value is "File"
:param active_deadline_seconds: The active_deadline_seconds which translates to active_deadline_seconds
in V1PodSpec.
:param callbacks: KubernetesPodOperatorCallback instance contains the callbacks methods on different step
of KubernetesPodOperator.
:param progress_callback: Callback function for receiving k8s container logs.
`progress_callback` is deprecated, please use :param `callbacks` instead.
:param logging_interval: max time in seconds that task should be in deferred state before
resuming to fetch the latest logs. If ``None``, then the task will remain in deferred state until pod
is done, and no logs will be visible until that time.
Expand Down Expand Up @@ -404,19 +395,8 @@ def __init__(
self.poll_interval = poll_interval
self.remote_pod: k8s.V1Pod | None = None
self.log_pod_spec_on_failure = log_pod_spec_on_failure
if is_delete_operator_pod is not None:
warnings.warn(
"`is_delete_operator_pod` parameter is deprecated, please use `on_finish_action`",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
self.on_finish_action = (
OnFinishAction.DELETE_POD if is_delete_operator_pod else OnFinishAction.KEEP_POD
)
self.is_delete_operator_pod = is_delete_operator_pod
else:
self.on_finish_action = OnFinishAction(on_finish_action)
self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD
self.on_finish_action = OnFinishAction(on_finish_action)
self.is_delete_operator_pod = self.on_finish_action == OnFinishAction.DELETE_POD
self.termination_message_policy = termination_message_policy
self.active_deadline_seconds = active_deadline_seconds
self.logging_interval = logging_interval
Expand Down Expand Up @@ -512,9 +492,7 @@ def _get_ti_pod_labels(context: Context | None = None, include_try_number: bool

@cached_property
def pod_manager(self) -> PodManager:
return PodManager(
kube_client=self.client, callbacks=self.callbacks, progress_callback=self._progress_callback
)
return PodManager(kube_client=self.client, callbacks=self.callbacks)

@cached_property
def hook(self) -> PodOperatorHookProtocol:
Expand Down Expand Up @@ -1161,10 +1139,6 @@ def dry_run(self) -> None:
pod = self.build_pod_request_obj()
print(yaml.dump(prune_dict(pod.to_dict(), mode="strict")))

@deprecated(reason="use `trigger_reentry` instead.", category=AirflowProviderDeprecationWarning)
def execute_complete(self, context: Context, event: dict, **kwargs):
return self.trigger_reentry(context=context, event=event)

def process_duplicate_label_pods(self, pod_list: list[k8s.V1Pod]) -> k8s.V1Pod:
"""
Patch or delete the existing pod with duplicate labels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ def __init__(
kubernetes_conn_id: str = "kubernetes_default",
**kwargs,
) -> None:
if kwargs.get("xcom_push") is not None:
raise AirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")
super().__init__(name=name, **kwargs)
self.image = image
self.code_path = code_path
Expand Down
Loading
Loading