Skip to content

Commit

Permalink
Manifest Kubernetes resources for destroy command (#552)
Browse files Browse the repository at this point in the history
closes #553
  • Loading branch information
raminqaf authored Dec 11, 2024
1 parent 2071c1b commit 35bec1a
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs/docs/user/references/cli-commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ $ kpops destroy [OPTIONS] PIPELINE_PATHS...
* `--dry-run / --execute`: Whether to dry run the command or execute it [default: dry-run]
* `--verbose / --no-verbose`: Enable verbose printing [default: no-verbose]
* `--parallel / --no-parallel`: Enable or disable parallel execution of pipeline steps. If enabled, multiple steps can be processed concurrently. If disabled, steps will be processed sequentially. [default: no-parallel]
* `--operation-mode [argo|manifest|managed]`: How KPOps should operate. [env var: KPOPS_OPERATION_MODE; default: managed]
* `--help`: Show this message and exit.

## `kpops generate`
Expand Down
25 changes: 25 additions & 0 deletions kpops/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,31 @@ def manifest_deploy(
yield resource


def manifest_destroy(
pipeline_path: Path,
dotenv: list[Path] | None = None,
config: Path = Path(),
steps: set[str] | None = None,
filter_type: FilterType = FilterType.INCLUDE,
environment: str | None = None,
verbose: bool = True,
operation_mode: OperationMode = OperationMode.MANIFEST,
) -> Iterator[tuple[KubernetesManifest, ...]]:
pipeline = generate(
pipeline_path=pipeline_path,
dotenv=dotenv,
config=config,
steps=steps,
filter_type=filter_type,
environment=environment,
verbose=verbose,
operation_mode=operation_mode,
)
for component in pipeline.components:
resource = component.manifest_destroy()
yield resource


def deploy(
pipeline_path: Path,
dotenv: list[Path] | None = None,
Expand Down
42 changes: 30 additions & 12 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,19 +241,37 @@ def destroy(
dry_run: bool = DRY_RUN,
verbose: bool = VERBOSE_OPTION,
parallel: bool = PARALLEL,
operation_mode: OperationMode = OPERATION_MODE_OPTION,
):
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.destroy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
match operation_mode:
case OperationMode.MANAGED:
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
kpops.destroy(
pipeline_path=pipeline_file_path,
dotenv=dotenv,
config=config,
steps=parse_steps(steps),
filter_type=filter_type,
environment=environment,
dry_run=dry_run,
verbose=verbose,
parallel=parallel,
)
case _:
for pipeline_file_path in collect_pipeline_paths(pipeline_paths):
resources = kpops.manifest_destroy(
pipeline_file_path,
dotenv,
config,
parse_steps(steps),
filter_type,
environment,
verbose,
operation_mode,
)
for resource in resources:
for rendered_manifest in resource:
print_yaml(rendered_manifest.model_dump())


@app.command(help="Reset pipeline steps")
Expand Down
8 changes: 8 additions & 0 deletions kpops/components/streams_bootstrap/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,11 @@ def manifest_deploy(self) -> tuple[KubernetesManifest, ...]:
)

return resource

@override
def manifest_destroy(self) -> tuple[KubernetesManifest, ...]:
if self.to:
return tuple(
StrimziKafkaTopic.from_topic(topic) for topic in self.to.kafka_topics
)
return ()
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-producer-app-output-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-labeled-producer-app-topic-output
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-output-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-error-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-labeled-topic-output
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: resources-manifest-pipeline-my-streams-app-error
spec:
config:
cleanup.policy: compact,delete
partitions: 1
replicas: 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-producer-app-output-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-labeled-producer-app-topic-output
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-output-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-error-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-labeled-topic-output
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: resources-manifest-pipeline-my-streams-app-error
spec:
config:
cleanup.policy: compact,delete
partitions: 1
replicas: 1

Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-producer-app-output-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-labeled-producer-app-topic-output
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-output-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-error-topic
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: my-labeled-topic-output
spec:
config: {}
partitions: 1
replicas: 1

---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
labels:
strimzi.io/cluster: my-cluster
name: resources-manifest-pipeline-my-streams-app-error
spec:
config:
cleanup.policy: compact,delete
partitions: 1
replicas: 1

50 changes: 49 additions & 1 deletion tests/pipeline/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ def test_manifest_command(self, snapshot: Snapshot):
assert result.exit_code == 0, result.stdout
snapshot.assert_match(result.stdout, MANIFEST_YAML)

def test_python_api(self, capsys: CaptureFixture, snapshot: Snapshot):
def test_manifest_deploy_python_api(
self, capsys: CaptureFixture, snapshot: Snapshot
):
generator = kpops.manifest_deploy(
RESOURCE_PATH / "manifest-pipeline" / PIPELINE_YAML,
environment="development",
Expand Down Expand Up @@ -182,3 +184,49 @@ def test_deploy_argo_mode(self, snapshot: Snapshot):
)
assert result.exit_code == 0, result.stdout
snapshot.assert_match(result.stdout, MANIFEST_YAML)

def test_manifest_destroy_manifest_mode(self, snapshot: Snapshot):
result = runner.invoke(
app,
[
"destroy",
str(RESOURCE_PATH / "manifest-pipeline" / PIPELINE_YAML),
"--operation-mode",
"manifest",
],
catch_exceptions=False,
)
assert result.exit_code == 0, result.stdout
snapshot.assert_match(result.stdout, MANIFEST_YAML)

def test_manifest_destroy_argo_mode(self, snapshot: Snapshot):
result = runner.invoke(
app,
[
"destroy",
str(RESOURCE_PATH / "manifest-pipeline" / PIPELINE_YAML),
"--operation-mode",
"argo",
],
catch_exceptions=False,
)
assert result.exit_code == 0, result.stdout
snapshot.assert_match(result.stdout, MANIFEST_YAML)

def test_manifest_destroy_python_api(
self, capsys: CaptureFixture, snapshot: Snapshot
):
generator = kpops.manifest_destroy(
RESOURCE_PATH / "manifest-pipeline" / PIPELINE_YAML,
environment="development",
)
assert isinstance(generator, Iterator)
resources = list(generator)
assert len(resources) == 2
for resource in resources:
for manifest in resource:
assert isinstance(manifest, KubernetesManifest)
print_yaml(manifest.model_dump())

captured = capsys.readouterr()
snapshot.assert_match(captured.out, MANIFEST_YAML)

0 comments on commit 35bec1a

Please sign in to comment.