From 658242eb4858a1ae77d353acb6a5c5437e631593 Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 13 Dec 2024 09:01:17 +0100 Subject: [PATCH] Manifest Kubernetes resources for `reset` command (#563) Closes #556 --- docs/docs/user/references/cli-commands.md | 1 + kpops/api/__init__.py | 25 ++++ kpops/cli/main.py | 42 +++++-- .../streams_bootstrap/streams/streams_app.py | 23 ++++ .../manifest.yaml | 107 ++++++++++++++++++ .../manifest.yaml | 107 ++++++++++++++++++ .../manifest.yaml | 107 ++++++++++++++++++ tests/pipeline/test_manifest.py | 46 ++++++++ 8 files changed, 446 insertions(+), 12 deletions(-) create mode 100644 tests/pipeline/snapshots/test_manifest/test_manifest_reset_argo_mode/manifest.yaml create mode 100644 tests/pipeline/snapshots/test_manifest/test_manifest_reset_manifest_mode/manifest.yaml create mode 100644 tests/pipeline/snapshots/test_manifest/test_manifest_reset_python_api/manifest.yaml diff --git a/docs/docs/user/references/cli-commands.md b/docs/docs/user/references/cli-commands.md index 575e4dd51..20e871574 100644 --- a/docs/docs/user/references/cli-commands.md +++ b/docs/docs/user/references/cli-commands.md @@ -171,6 +171,7 @@ $ kpops reset [OPTIONS] PIPELINE_PATHS... * `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run] * `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose] * `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel] +* `--operation-mode [argo|manifest|managed]`: How KPOps should operate. [env var: KPOPS_OPERATION_MODE; default: managed] * `--help`: Show this message and exit. ## `kpops schema` diff --git a/kpops/api/__init__.py b/kpops/api/__init__.py index 07d3507cf..21b047ce9 100644 --- a/kpops/api/__init__.py +++ b/kpops/api/__init__.py @@ -120,6 +120,31 @@ def manifest_destroy( yield resource +def manifest_reset( + pipeline_path: Path, + dotenv: list[Path] | None = None, + config: Path = Path(), + steps: set[str] | None = None, + filter_type: FilterType = FilterType.INCLUDE, + environment: str | None = None, + verbose: bool = True, + operation_mode: OperationMode = OperationMode.MANIFEST, +) -> Iterator[tuple[KubernetesManifest, ...]]: + pipeline = generate( + pipeline_path=pipeline_path, + dotenv=dotenv, + config=config, + steps=steps, + filter_type=filter_type, + environment=environment, + verbose=verbose, + operation_mode=operation_mode, + ) + for component in pipeline.components: + resource = component.manifest_reset() + yield resource + + def manifest_clean( pipeline_path: Path, dotenv: list[Path] | None = None, diff --git a/kpops/cli/main.py b/kpops/cli/main.py index 9503e13ca..f372c7603 100644 --- a/kpops/cli/main.py +++ b/kpops/cli/main.py @@ -286,19 +286,37 @@ def reset( dry_run: bool = DRY_RUN, verbose: bool = VERBOSE_OPTION, parallel: bool = PARALLEL, + operation_mode: OperationMode = OPERATION_MODE_OPTION, ): - for pipeline_file_path in collect_pipeline_paths(pipeline_paths): - kpops.reset( - pipeline_path=pipeline_file_path, - dotenv=dotenv, - config=config, - steps=parse_steps(steps), - filter_type=filter_type, - environment=environment, - dry_run=dry_run, - verbose=verbose, - parallel=parallel, - ) + match operation_mode: + case OperationMode.MANAGED: + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + kpops.reset( + pipeline_path=pipeline_file_path, + dotenv=dotenv, + config=config, + steps=parse_steps(steps), + filter_type=filter_type, + environment=environment, + dry_run=dry_run, + verbose=verbose, + parallel=parallel, + ) + case _: + for pipeline_file_path in collect_pipeline_paths(pipeline_paths): + resources = kpops.manifest_reset( + pipeline_file_path, + dotenv, + config, + parse_steps(steps), + filter_type, + environment, + verbose, + operation_mode, + ) + for resource in resources: + for rendered_manifest in resource: + print_yaml(rendered_manifest.model_dump()) @app.command(help="Clean pipeline steps") diff --git a/kpops/components/streams_bootstrap/streams/streams_app.py b/kpops/components/streams_bootstrap/streams/streams_app.py index 70fecc285..98403bfe5 100644 --- a/kpops/components/streams_bootstrap/streams/streams_app.py +++ b/kpops/components/streams_bootstrap/streams/streams_app.py @@ -20,6 +20,7 @@ from kpops.const.file_type import DEFAULTS_YAML, PIPELINE_YAML from kpops.manifests.argo import ArgoHook, enrich_annotations from kpops.manifests.kubernetes import KubernetesManifest +from kpops.manifests.strimzi.kafka_topic import StrimziKafkaTopic from kpops.utils.docstring import describe_attr log = logging.getLogger("StreamsApp") @@ -66,6 +67,19 @@ def manifest_deploy(self) -> tuple[KubernetesManifest, ...]: self.template_flags, ) + @override + def manifest_reset(self) -> tuple[KubernetesManifest, ...]: + self.values.kafka.delete_output = False + values = self.to_helm_values() + + return self.helm.template( + self.helm_release_name, + self.helm_chart, + self.namespace, + values, + self.template_flags, + ) + async def clean_pvcs(self, dry_run: bool) -> None: app_full_name = super(HelmApp, self).full_name pvc_handler = PVCHandler(app_full_name, self.namespace) @@ -185,6 +199,15 @@ def manifest_deploy(self) -> tuple[KubernetesManifest, ...]: return manifests + @override + def manifest_reset(self) -> tuple[KubernetesManifest, ...]: + resource = self._cleaner.manifest_reset() + if self.to: + resource = resource + tuple( + StrimziKafkaTopic.from_topic(topic) for topic in self.to.kafka_topics + ) + return resource + @override def manifest_clean(self) -> tuple[KubernetesManifest, ...]: if get_config().operation_mode is OperationMode.MANIFEST: diff --git a/tests/pipeline/snapshots/test_manifest/test_manifest_reset_argo_mode/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_manifest_reset_argo_mode/manifest.yaml new file mode 100644 index 000000000..63dc20d35 --- /dev/null +++ b/tests/pipeline/snapshots/test_manifest/test_manifest_reset_argo_mode/manifest.yaml @@ -0,0 +1,107 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + labels: + app: resources-manifest-pipeline-my-streams-app-clean + chart: streams-app-cleanup-job-3.0.3 + release: resources-manifest-pipeline-my-streams-app-clean + name: resources-manifest-pipeline-my-streams-app-clean +spec: + backoffLimit: 6 + template: + metadata: + labels: + app: resources-manifest-pipeline-my-streams-app-clean + release: resources-manifest-pipeline-my-streams-app-clean + spec: + containers: + - args: + - reset + env: + - name: ENV_PREFIX + value: APP_ + - name: APP_BOOTSTRAP_SERVERS + value: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + - name: APP_SCHEMA_REGISTRY_URL + value: http://localhost:8081/ + - name: APP_INPUT_TOPICS + value: my-input-topic + - name: APP_INPUT_PATTERN + value: my-input-pattern + - name: APP_OUTPUT_TOPIC + value: my-output-topic + - name: APP_ERROR_TOPIC + value: resources-manifest-pipeline-my-streams-app-error + - name: APP_LABELED_OUTPUT_TOPICS + value: my-output-topic-label=my-labeled-topic-output, + - name: APP_LABELED_INPUT_TOPICS + value: my-input-topic-label=my-labeled-input-topic, + - name: APP_LABELED_INPUT_PATTERNS + value: my-input-topic-labeled-pattern=my-labeled-input-pattern, + - name: APP_APPLICATION_ID + value: my-streams-app-id + - name: JAVA_TOOL_OPTIONS + value: '-XX:MaxRAMPercentage=75.0 ' + image: my-registry/my-streams-app-image:1.0.0 + imagePullPolicy: Always + name: resources-manifest-pipeline-my-streams-app-clean + resources: + limits: + cpu: 500m + memory: 2G + requests: + cpu: 200m + memory: 300Mi + restartPolicy: OnFailure + ttlSecondsAfterFinished: 30 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-error-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: resources-manifest-pipeline-my-streams-app-error +spec: + config: + cleanup.policy: compact,delete + partitions: 1 + replicas: 1 + diff --git a/tests/pipeline/snapshots/test_manifest/test_manifest_reset_manifest_mode/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_manifest_reset_manifest_mode/manifest.yaml new file mode 100644 index 000000000..63dc20d35 --- /dev/null +++ b/tests/pipeline/snapshots/test_manifest/test_manifest_reset_manifest_mode/manifest.yaml @@ -0,0 +1,107 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + labels: + app: resources-manifest-pipeline-my-streams-app-clean + chart: streams-app-cleanup-job-3.0.3 + release: resources-manifest-pipeline-my-streams-app-clean + name: resources-manifest-pipeline-my-streams-app-clean +spec: + backoffLimit: 6 + template: + metadata: + labels: + app: resources-manifest-pipeline-my-streams-app-clean + release: resources-manifest-pipeline-my-streams-app-clean + spec: + containers: + - args: + - reset + env: + - name: ENV_PREFIX + value: APP_ + - name: APP_BOOTSTRAP_SERVERS + value: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + - name: APP_SCHEMA_REGISTRY_URL + value: http://localhost:8081/ + - name: APP_INPUT_TOPICS + value: my-input-topic + - name: APP_INPUT_PATTERN + value: my-input-pattern + - name: APP_OUTPUT_TOPIC + value: my-output-topic + - name: APP_ERROR_TOPIC + value: resources-manifest-pipeline-my-streams-app-error + - name: APP_LABELED_OUTPUT_TOPICS + value: my-output-topic-label=my-labeled-topic-output, + - name: APP_LABELED_INPUT_TOPICS + value: my-input-topic-label=my-labeled-input-topic, + - name: APP_LABELED_INPUT_PATTERNS + value: my-input-topic-labeled-pattern=my-labeled-input-pattern, + - name: APP_APPLICATION_ID + value: my-streams-app-id + - name: JAVA_TOOL_OPTIONS + value: '-XX:MaxRAMPercentage=75.0 ' + image: my-registry/my-streams-app-image:1.0.0 + imagePullPolicy: Always + name: resources-manifest-pipeline-my-streams-app-clean + resources: + limits: + cpu: 500m + memory: 2G + requests: + cpu: 200m + memory: 300Mi + restartPolicy: OnFailure + ttlSecondsAfterFinished: 30 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-error-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: resources-manifest-pipeline-my-streams-app-error +spec: + config: + cleanup.policy: compact,delete + partitions: 1 + replicas: 1 + diff --git a/tests/pipeline/snapshots/test_manifest/test_manifest_reset_python_api/manifest.yaml b/tests/pipeline/snapshots/test_manifest/test_manifest_reset_python_api/manifest.yaml new file mode 100644 index 000000000..63dc20d35 --- /dev/null +++ b/tests/pipeline/snapshots/test_manifest/test_manifest_reset_python_api/manifest.yaml @@ -0,0 +1,107 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + labels: + app: resources-manifest-pipeline-my-streams-app-clean + chart: streams-app-cleanup-job-3.0.3 + release: resources-manifest-pipeline-my-streams-app-clean + name: resources-manifest-pipeline-my-streams-app-clean +spec: + backoffLimit: 6 + template: + metadata: + labels: + app: resources-manifest-pipeline-my-streams-app-clean + release: resources-manifest-pipeline-my-streams-app-clean + spec: + containers: + - args: + - reset + env: + - name: ENV_PREFIX + value: APP_ + - name: APP_BOOTSTRAP_SERVERS + value: http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092 + - name: APP_SCHEMA_REGISTRY_URL + value: http://localhost:8081/ + - name: APP_INPUT_TOPICS + value: my-input-topic + - name: APP_INPUT_PATTERN + value: my-input-pattern + - name: APP_OUTPUT_TOPIC + value: my-output-topic + - name: APP_ERROR_TOPIC + value: resources-manifest-pipeline-my-streams-app-error + - name: APP_LABELED_OUTPUT_TOPICS + value: my-output-topic-label=my-labeled-topic-output, + - name: APP_LABELED_INPUT_TOPICS + value: my-input-topic-label=my-labeled-input-topic, + - name: APP_LABELED_INPUT_PATTERNS + value: my-input-topic-labeled-pattern=my-labeled-input-pattern, + - name: APP_APPLICATION_ID + value: my-streams-app-id + - name: JAVA_TOOL_OPTIONS + value: '-XX:MaxRAMPercentage=75.0 ' + image: my-registry/my-streams-app-image:1.0.0 + imagePullPolicy: Always + name: resources-manifest-pipeline-my-streams-app-clean + resources: + limits: + cpu: 500m + memory: 2G + requests: + cpu: 200m + memory: 300Mi + restartPolicy: OnFailure + ttlSecondsAfterFinished: 30 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-output-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-error-topic +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: my-labeled-topic-output +spec: + config: {} + partitions: 1 + replicas: 1 + +--- +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + labels: + strimzi.io/cluster: my-cluster + name: resources-manifest-pipeline-my-streams-app-error +spec: + config: + cleanup.policy: compact,delete + partitions: 1 + replicas: 1 + diff --git a/tests/pipeline/test_manifest.py b/tests/pipeline/test_manifest.py index e0f208516..87fe9b5a0 100644 --- a/tests/pipeline/test_manifest.py +++ b/tests/pipeline/test_manifest.py @@ -231,6 +231,52 @@ def test_manifest_destroy_python_api( captured = capsys.readouterr() snapshot.assert_match(captured.out, MANIFEST_YAML) + def test_manifest_reset_manifest_mode(self, snapshot: Snapshot): + result = runner.invoke( + app, + [ + "reset", + str(RESOURCE_PATH / "manifest-pipeline" / PIPELINE_YAML), + "--operation-mode", + "manifest", + ], + catch_exceptions=False, + ) + assert result.exit_code == 0, result.stdout + snapshot.assert_match(result.stdout, MANIFEST_YAML) + + def test_manifest_reset_argo_mode(self, snapshot: Snapshot): + result = runner.invoke( + app, + [ + "reset", + str(RESOURCE_PATH / "manifest-pipeline" / PIPELINE_YAML), + "--operation-mode", + "argo", + ], + catch_exceptions=False, + ) + assert result.exit_code == 0, result.stdout + snapshot.assert_match(result.stdout, MANIFEST_YAML) + + def test_manifest_reset_python_api( + self, capsys: CaptureFixture, snapshot: Snapshot + ): + generator = kpops.manifest_reset( + RESOURCE_PATH / "manifest-pipeline" / PIPELINE_YAML, + environment="development", + ) + assert isinstance(generator, Iterator) + resources = list(generator) + assert len(resources) == 2 + for resource in resources: + for manifest in resource: + assert isinstance(manifest, KubernetesManifest) + print_yaml(manifest.model_dump()) + + captured = capsys.readouterr() + snapshot.assert_match(captured.out, MANIFEST_YAML) + def test_manifest_clean_manifest_mode(self, snapshot: Snapshot): result = runner.invoke( app,