Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend StreamsBootstrap model #534

Merged
merged 12 commits into from
Oct 25, 2024
1,008 changes: 997 additions & 11 deletions docs/docs/schema/defaults.json

Large diffs are not rendered by default.

817 changes: 816 additions & 1 deletion docs/docs/schema/pipeline.json

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def validate_connector_config(
"""Validate connector config using the given configuration.

:param connector_config: Configuration parameters for the connector.
:raises KafkaConnectError: Kafka Konnect error
:raises KafkaConnectError: Kafka Connect error
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
:return: List of all found errors
"""
response = await self._client.put(
Expand Down
124 changes: 124 additions & 0 deletions kpops/components/common/kubernetes_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import enum

from pydantic import Field

from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import DescConfigModel

# Matches plain integer or numbers with valid suffixes: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory
MEMORY_PATTERN = r"^\d+([EPTGMk]|Ei|Pi|Ti|Gi|Mi|Ki)?$"


class ServiceType(enum.Enum):
"""Represents the different Kubernetes service types.

https://kubernetes.io/docs/concepts/services-networking/service/#publishing-services-service-types
"""

CLUSTER_IP = "ClusterIP"
NODE_PORT = "NodePort"
LOAD_BALANCER = "LoadBalancer"
EXTERNAL_NAME = "ExternalName"


class ProtocolSchema(enum.Enum):
"""Represents the different Kubernetes protocols.

https://kubernetes.io/docs/reference/networking/service-protocols/
"""

TCP = "TCP"
UDP = "UDP"
SCTP = "SCTP"


class ImagePullPolicy(enum.Enum):
"""Represents the different Kubernetes image pull policies.

https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy
"""

ALWAYS = "Always"
IF_NOT_PRESENT = "IfNotPresent"
NEVER = "Never"


class Operation(enum.Enum):
EXISTS = "Exists"
EQUAL = "Equal"


class Effects(enum.Enum):
NO_EXECUTE = "NoExecute"
NO_SCHEDULE = "NoSchedule"
PREFER_NO_SCHEDULE = "PreferNoSchedule"


class RestartPolicy(enum.Enum):
ALWAYS = "Always"
ON_FAILURE = "OnFailure"
NEVER = "Never"


class Toleration(DescConfigModel):
"""Represents the different Kubernetes tolerations.

https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/

:param key: The key that the toleration applies to.
:param operator: The operator ('Exists' or 'Equal').
:param value: The value to match for the key.
:param effect: The effect to tolerate.
:param toleration_seconds: The duration for which the toleration is valid.
"""

key: str = Field(default=..., description=describe_attr("key", __doc__))

operator: Operation = Field(
default=Operation.EQUAL, description=describe_attr("operator", __doc__)
)

effect: Effects = Field(default=..., description=describe_attr("effect", __doc__))

value: str | None = Field(default=None, description=describe_attr("value", __doc__))

toleration_seconds: int | None = Field(
default=None, description=describe_attr("toleration_seconds", __doc__)
)


class ResourceLimits(DescConfigModel):
"""Model representing the 'limits' section of Kubernetes resource specifications.

:param cpu: The maximum amount of CPU a container can use, expressed in milli CPUs (e.g., '300m').
:param memory: The maximum amount of memory a container can use, with valid units such as 'Mi' or 'Gi' (e.g., '2G').
"""

cpu: str | int = Field(pattern=r"^\d+m$", description=describe_attr("cpu", __doc__))
memory: str = Field(
pattern=MEMORY_PATTERN, description=describe_attr("memory", __doc__)
)


class ResourceRequests(DescConfigModel):
disrupted marked this conversation as resolved.
Show resolved Hide resolved
"""Model representing the 'requests' section of Kubernetes resource specifications.

:param cpu: The minimum amount of CPU requested for the container, expressed in milli CPUs (e.g., '100m').
:param memory: The minimum amount of memory requested for the container, with valid units such as 'Mi' or 'Gi' (e.g., '500Mi').
"""

cpu: str = Field(pattern=r"^\d+m$", description=describe_attr("cpu", __doc__))
raminqaf marked this conversation as resolved.
Show resolved Hide resolved
memory: str = Field(
pattern=r"^\d+[KMGi]+$", description=describe_attr("memory", __doc__)
)
raminqaf marked this conversation as resolved.
Show resolved Hide resolved


class Resources(DescConfigModel):
"""Model representing the resource specifications for a Kubernetes container.

:param requests: The minimum resource requirements for the container.
:param limits: The maximum resource limits for the container.
"""

requests: ResourceRequests = Field(description=describe_attr("requests", __doc__))
limits: ResourceLimits = Field(description=describe_attr("limits", __doc__))
193 changes: 193 additions & 0 deletions kpops/components/streams_bootstrap/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
from pydantic import AliasChoices, ConfigDict, Field

from kpops.components.base_components.helm_app import HelmAppValues
from kpops.components.common.kubernetes_model import (
ImagePullPolicy,
ProtocolSchema,
ResourceLimits,
ResourceRequests,
Resources,
ServiceType,
Toleration,
)
from kpops.components.common.topic import KafkaTopic, KafkaTopicStr
from kpops.utils.docstring import describe_attr
from kpops.utils.pydantic import (
Expand All @@ -19,23 +28,207 @@
IMAGE_TAG_PATTERN = r"^[a-zA-Z0-9_][a-zA-Z0-9._-]{0,127}$"


class PortConfig(CamelCaseConfigModel, DescConfigModel):
"""Base class for the port configuration of the Kafka Streams application.

:param container_port: Number of the port to expose.
:param name: Services can reference port by name (optional).
:param schema: Protocol for port. Must be UDP, TCP, or SCTP.
:param service_port: Number of the port of the service (optional)
"""

container_port: int = Field(
description=describe_attr("ports", __doc__),
)
name: str | None = Field(
default=None,
description=describe_attr("name", __doc__),
)
schema: ProtocolSchema = Field(
default=ProtocolSchema.TCP,
description=describe_attr("schema", __doc__),
)
service_port: int | None = Field(
default=None,
description=describe_attr("service_port", __doc__),
)


class ServiceConfig(CamelCaseConfigModel, DescConfigModel):
"""Base model for configuring a service for the Kafka Streams application.

:param enabled: Whether to create a service.
:param labels: Additional service labels.
:param type: Service type.
"""

enabled: bool = Field(
default=False,
description=describe_attr("enabled", __doc__),
)
labels: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("labels", __doc__),
)
type: ServiceType = Field(
default=ServiceType.CLUSTER_IP,
description=describe_attr("type", __doc__),
)


class JavaOptions(CamelCaseConfigModel, DescConfigModel):
"""JVM configuration options.

:param max_RAM_percentage: Sets the maximum amount of memory that the JVM may use for the Java heap before applying ergonomics heuristics as a percentage of the maximum amount determined as described in the -XX:MaxRAM option
:param others: List of Java VM options passed to the streams app.
"""

max_RAM_percentage: int = Field(
disrupted marked this conversation as resolved.
Show resolved Hide resolved
default=75,
description=describe_attr("max_RAM_percentage", __doc__),
)
others: list[str] = Field(
default_factory=list,
description=describe_attr("others", __doc__),
)


class StreamsBootstrapValues(HelmAppValues):
"""Base value class for all streams bootstrap related components.

:param image: Docker image of the Kafka producer app.
:param image_tag: Docker image tag of the streams-bootstrap app.
:param image_pull_policy: Docker image pull policy.
:param image_pull_secrets: Secrets to be used for private registries.
:param kafka: Kafka configuration for the streams-bootstrap app.
:param resources: See https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
:param configuration_env_prefix: Prefix for environment variables to use that should be parsed as command line arguments.
:param command_line: Map of command line arguments passed to the streams app.
:param env: Custom environment variables.
:param secrets: Custom secret environment variables. Prefix with configurationEnvPrefix in order to pass secrets to command line or prefix with KAFKA_ to pass secrets to Kafka Streams configuration.
:param secret_refs: Inject existing secrets as environment variables. Map key is used as environment variable name. Value consists of secret name and key.
:param secret_files_refs: Mount existing secrets as volumes
:param files: Map of files to mount for the app. File will be mounted as $value.mountPath/$key. $value.content denotes file content (recommended to be used with --set-file).
:param pod_annotations: Map of custom annotations to attach to the pod spec.
:param pod_labels: Map of custom labels to attach to the pod spec.
:param liveness_probe: See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core
:param readiness_probe: See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.25/#probe-v1-core
:param affinity: Map to configure pod affinities https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#affinity-and-anti-affinity.
:param tolerations: Array containing taint references. When defined, pods can run on nodes, which would otherwise deny scheduling.
"""

image: str = Field(
description=describe_attr("image", __doc__),
)

image_tag: str = Field(
default="latest",
pattern=IMAGE_TAG_PATTERN,
description=describe_attr("image_tag", __doc__),
)

image_pull_policy: ImagePullPolicy = Field(
default=ImagePullPolicy.ALWAYS,
description=describe_attr("image_pull_policy", __doc__),
)

image_pull_secrets: list[dict[str, str]] = Field(
default_factory=list,
description=describe_attr("image_pull_secret", __doc__),
)

kafka: KafkaConfig = Field(
description=describe_attr("kafka", __doc__),
)

resources: Resources = Field(
default=Resources(
requests=ResourceRequests(cpu="100m", memory="500Mi"),
limits=ResourceLimits(cpu="300m", memory="2G"),
),
description=describe_attr("resources", __doc__),
)

ports: list[PortConfig] = Field(
default_factory=list,
description=describe_attr("ports", __doc__),
)

service: ServiceConfig = Field(
default_factory=ServiceConfig,
description=describe_attr("service", __doc__),
)

configuration_env_prefix: str = Field(
default="APP",
description=describe_attr("configuration_env_prefix", __doc__),
)

command_line: dict[str, str | bool | int | float] = Field(
default_factory=dict,
description=describe_attr("command_line", __doc__),
)

env: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("env", __doc__),
)

secrets: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("secrets", __doc__),
)

secret_refs: dict[str, Any] = Field(
default_factory=dict,
description=describe_attr("secret_refs", __doc__),
)

secret_files_refs: list[str] = Field(
default_factory=list,
description=describe_attr("secret_files_refs", __doc__),
)

files: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("files", __doc__),
)

java_options: JavaOptions = Field(
default_factory=JavaOptions,
description=describe_attr("java_options", __doc__),
)

pod_annotations: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("pod_annotations", __doc__),
)

pod_labels: dict[str, str] = Field(
default_factory=dict,
description=describe_attr("pod_labels", __doc__),
)

liveness_probe: dict[str, Any] = Field(
default_factory=dict,
description=describe_attr("liveness_probe", __doc__),
)

readiness_probe: dict[str, Any] = Field(
default_factory=dict,
description=describe_attr("readiness_probe", __doc__),
)

affinity: dict[str, Any] = Field(
default_factory=dict,
description=describe_attr("affinity", __doc__),
)

tolerations: list[Toleration] = Field(
default_factory=list,
description=describe_attr("tolerations", __doc__),
)


class KafkaConfig(CamelCaseConfigModel, DescConfigModel):
"""Kafka Streams config.
Expand Down
Loading
Loading