From c4f4de93d6bd19b61960dea3369d40e41ff4bdb0 Mon Sep 17 00:00:00 2001 From: Dan Norris Date: Mon, 1 Apr 2024 13:49:06 -0400 Subject: [PATCH] feat: automatically create Kubernetes Services for apps using a httpserver component One advantage of wasmCloud applications is that we always know what components make up an application and therefore can intelligently make decisions based on what interfaces we're using. This makes it much easier for software like the operator to take particular actions based on the contents of an application manifest. This change adds an additional set of reconciliations we can perform on a per-lattice basis to automatically create Kubernetes Services for applications that deploy a httpserver component. The operator uses a NATS consumer that watches for manifest deploy and undeploy events and triggers a reconciliation on a managed Service. Right now this is restricted only to `daemonscalers`, since we do not have to do any bookeeping on where components are deployed if they are running on all of the hosts in a lattice or in a subset identified by label. We will add support for `spreadscalers` in a future PR. This allows for some interesting deployment scenarios, such as wasmCloud applications that span multiple Kubernetes deployments in the same namespace, or potentially in _multiple namespaces_ if we decide to implement support for them. Initially the operator is only creating endpoints that route traffic from a service to pods in the same namespace, but we may add an option to relax that assumption. --- Cargo.lock | 143 ++-- Cargo.toml | 4 + README.md | 12 +- .../templates/deployment.yaml | 2 +- deploy/base/deployment.yaml | 12 + deploy/local/local-registry.yaml | 5 + hack/run-kind-cluster.sh | 64 ++ src/controller.rs | 140 +++- src/lib.rs | 1 + src/resources/application.rs | 2 +- src/services.rs | 757 ++++++++++++++++++ 11 files changed, 1065 insertions(+), 77 deletions(-) create mode 100755 hack/run-kind-cluster.sh create mode 100644 src/services.rs diff --git a/Cargo.lock b/Cargo.lock index 1b9ea21..5d77b6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.8.3" @@ -523,9 +534,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e" dependencies = [ "android-tzdata", "iana-time-zone", @@ -533,7 +544,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -1224,6 +1235,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.8", +] [[package]] name = "hashbrown" @@ -1231,7 +1245,7 @@ version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" dependencies = [ - "ahash", + "ahash 0.8.3", "allocator-api2", ] @@ -1508,12 +1522,12 @@ checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" [[package]] name = "is-terminal" -version = "0.4.10" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bad00257d07be169d870ab665980b06cdb366d792ad690bf2e76876dc503455" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" dependencies = [ "hermit-abi", - "rustix", + "libc", "windows-sys 0.52.0", ] @@ -1572,14 +1586,16 @@ dependencies = [ ] [[package]] -name = "jsonpath_lib" -version = "0.3.0" +name = "jsonpath-rust" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaa63191d68230cccb81c5aa23abd53ed64d83337cacbb25a7b8c7979523774f" +checksum = "06cc127b7c3d270be504572364f9569761a180b981919dd0d87693a7f5fb7829" dependencies = [ - "log", - "serde", + "pest", + "pest_derive", + "regex", "serde_json", + "thiserror", ] [[package]] @@ -1588,7 +1604,7 @@ version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a071f4f7efc9a9118dfb627a0a94ef247986e1ab8606a4c806ae2b3aa3b6978" dependencies = [ - "ahash", + "ahash 0.8.3", "anyhow", "base64 0.21.4", "bytecount", @@ -1644,9 +1660,9 @@ dependencies = [ [[package]] name = "kube" -version = "0.87.1" +version = "0.87.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e34392aea935145070dcd5b39a6dea689ac6534d7d117461316c3d157b1d0fc3" +checksum = "3499c8d60c763246c7a213f51caac1e9033f46026904cb89bc8951ae8601f26e" dependencies = [ "k8s-openapi", "kube-client", @@ -1657,9 +1673,9 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.87.1" +version = "0.87.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7266548b9269d9fa19022620d706697e64f312fb2ba31b93e6986453fcc82c92" +checksum = "033450dfa0762130565890dadf2f8835faedf749376ca13345bcd8ecd6b5f29f" dependencies = [ "base64 0.21.4", "bytes", @@ -1672,7 +1688,7 @@ dependencies = [ "hyper", "hyper-rustls", "hyper-timeout", - "jsonpath_lib", + "jsonpath-rust", "k8s-openapi", "kube-core", "pem", @@ -1693,9 +1709,9 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.87.1" +version = "0.87.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8321c315b96b59f59ef6b33f604b84b905ab8f9ff114a4f909d934c520227b1" +checksum = "b5bba93d054786eba7994d03ce522f368ef7d48c88a1826faa28478d85fb63ae" dependencies = [ "chrono", "form_urlencoded", @@ -1711,9 +1727,9 @@ dependencies = [ [[package]] name = "kube-derive" -version = "0.87.1" +version = "0.87.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54591e1f37fc329d412c0fdaced010cc1305b546a39f283fc51700f8fb49421" +checksum = "91e98dd5e5767c7b894c1f0e41fd628b145f808e981feb8b08ed66455d47f1a4" dependencies = [ "darling", "proc-macro2", @@ -1724,11 +1740,11 @@ dependencies = [ [[package]] name = "kube-runtime" -version = "0.87.1" +version = "0.87.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e511e2c1a368d9d4bf6e70db58197e535d818df355b5a2007a8aeb17a370a8ba" +checksum = "2d8893eb18fbf6bb6c80ef6ee7dd11ec32b1dc3c034c988ac1b3a84d46a230ae" dependencies = [ - "ahash", + "ahash 0.8.3", "async-trait", "backoff", "derivative", @@ -1771,9 +1787,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.151" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libredox" @@ -2104,9 +2120,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl-probe" @@ -2737,9 +2753,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.28" +version = "0.38.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72e572a5e8ca657d7366229cdde4bd14c4eb5499a9573d4d366fe1b599daa316" +checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" dependencies = [ "bitflags 2.4.1", "errno", @@ -2990,7 +3006,6 @@ version = "1.0.107" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" dependencies = [ - "indexmap 2.0.2", "itoa", "ryu", "serde", @@ -3160,9 +3175,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.1" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "snafu" @@ -3321,18 +3336,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.49" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1177e8c6d7ede7afde3585fd2513e611227efd6481bd78d2e82ba1ce16557ed4" +checksum = "6e3de26b0965292219b4287ff031fcba86837900fe9cd2b34ea8ad893c0953d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.49" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc" +checksum = "268026685b2be38d7103e9e507c938a1fcb3d7e6eb15e87870b617bf37b6d581" dependencies = [ "proc-macro2", "quote", @@ -3501,6 +3516,8 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", + "futures-util", + "hashbrown 0.12.3", "pin-project-lite", "slab", "tokio", @@ -3858,9 +3875,9 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "utoipa" -version = "4.1.0" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ff05e3bac2c9428f57ade702667753ca3f5cf085e2011fe697de5bfd49aa72d" +checksum = "272ebdfbc99111033031d2f10e018836056e4d2c8e2acda76450ec7974269fa7" dependencies = [ "indexmap 2.0.2", "serde", @@ -3870,9 +3887,9 @@ dependencies = [ [[package]] name = "utoipa-gen" -version = "4.1.0" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0b6f4667edd64be0e820d6631a60433a269710b6ee89ac39525b872b76d61d" +checksum = "d3c9f4d08338c1bfa70dde39412a040a884c6f318b3d09aaaf3437a1e52027fc" dependencies = [ "proc-macro-error", "proc-macro2", @@ -4045,7 +4062,7 @@ dependencies = [ "wasmcloud-component-adapters", "wasmcloud-control-interface 0.33.0", "wasmcloud-core", - "wasmparser 0.118.1", + "wasmparser 0.118.2", "wat", "weld-codegen", "wit-bindgen-core", @@ -4162,6 +4179,15 @@ dependencies = [ "leb128", ] +[[package]] +name = "wasm-encoder" +version = "0.41.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "972f97a5d8318f908dded23594188a90bcd09365986b1163e66d70170e5287ae" +dependencies = [ + "leb128", +] + [[package]] name = "wasm-gen" version = "0.1.4" @@ -4174,9 +4200,9 @@ dependencies = [ [[package]] name = "wasm-metadata" -version = "0.10.14" +version = "0.10.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d835d67708f6374937c550ad8dd1d17c616ae009e3f00d7a0ac9f7825e78c36a" +checksum = "18ebaa7bd0f9e7a5e5dd29b9a998acf21c4abed74265524dd7e85934597bfb10" dependencies = [ "anyhow", "indexmap 2.0.2", @@ -4184,8 +4210,8 @@ dependencies = [ "serde_derive", "serde_json", "spdx", - "wasm-encoder 0.38.1", - "wasmparser 0.118.1", + "wasm-encoder 0.41.2", + "wasmparser 0.121.2", ] [[package]] @@ -4288,6 +4314,7 @@ dependencies = [ "async-nats", "axum", "axum-server", + "cloudevents-sdk", "ctrlc", "futures", "handlebars 5.1.0", @@ -4305,6 +4332,7 @@ dependencies = [ "thiserror", "time", "tokio", + "tokio-util", "tracing", "tracing-opentelemetry", "tracing-subscriber", @@ -4348,10 +4376,21 @@ dependencies = [ [[package]] name = "wasmparser" -version = "0.118.1" +version = "0.118.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77f1154f1ab868e2a01d9834a805faca7bf8b50d041b4ca714d005d0dab1c50c" +dependencies = [ + "indexmap 2.0.2", + "semver", +] + +[[package]] +name = "wasmparser" +version = "0.121.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ee9723b928e735d53000dec9eae7b07a60e490c85ab54abb66659fc61bfcd9" +checksum = "9dbe55c8f9d0dbd25d9447a5a889ff90c0cc3feaa7395310d3d826b2c703eaab" dependencies = [ + "bitflags 2.4.1", "indexmap 2.0.2", "semver", ] @@ -4677,7 +4716,7 @@ dependencies = [ "serde_json", "wasm-encoder 0.38.1", "wasm-metadata", - "wasmparser 0.118.1", + "wasmparser 0.118.2", "wit-parser", ] @@ -4700,9 +4739,9 @@ dependencies = [ [[package]] name = "xattr" -version = "1.2.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "914566e6413e7fa959cc394fb30e563ba80f3541fbd40816d4c05a0fc3f2a0f1" +checksum = "8da84f1a25939b27f6820d92aed108f83ff920fdf11a7b19366c27c4cda81d4f" dependencies = [ "libc", "linux-raw-sys", diff --git a/Cargo.toml b/Cargo.toml index 1e9679d..1589c62 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ axum = {workspace = true} axum-server = {workspace = true} anyhow = {workspace = true} ctrlc = {workspace = true} +cloudevents-sdk = {workspace = true} futures = {workspace = true} handlebars = {workspace = true} json-patch = {workspace = true} @@ -42,6 +43,7 @@ serde_yaml = {workspace = true} thiserror = {workspace = true} time = {workspace = true} tokio = {workspace = true} +tokio-util = {workspace = true} tracing = {workspace = true} tracing-opentelemetry = {workspace = true} tracing-subscriber = {workspace = true} @@ -56,6 +58,7 @@ async-nats = "0.33" axum = { version = "0.6", features = ["headers"] } axum-server = { version = "0.4", features = ["tls-rustls"] } anyhow = "1" +cloudevents-sdk = "0.7" ctrlc = "3" futures = "0.3" handlebars = "5.1" @@ -73,6 +76,7 @@ serde_yaml = "0.9" thiserror = "1" time = "0.3" tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["full"] } tracing = "0.1" tracing-opentelemetry = "0.20" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } diff --git a/README.md b/README.md index a3c02b4..0efce1e 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,6 @@ for more information on how to provision that secret. Once it is created, you can reference it in the WasmCloudHostConfig CRD by setting the `registryCredentialsSecret` field to the name of the secret. - ## Deploying the operator A wasmCloud cluster requires a few things to run: @@ -117,6 +116,17 @@ command will start WADM as a Kubernetes deployment: kubectl kustomize build deploy/local | kubectl apply -f - ``` +## Automatically Syncing Kubernetes Services + +The operator automatically creates Kubernetes Services for wasmCloud +applications. Right now this is limited only to applications that deploy the +wasmCloud httpserver component using a `daemonscaler`, but additional support +for `spreadscalers` will be added in the future. + +If you specify host label selectors on the `daemonscaler` then the operator +will honor those labels and will only create a service for the pods that match +those label selectors. + ## Argo CD Health Check Argo CD provides a way to define a [custom health diff --git a/charts/wasmcloud-operator/templates/deployment.yaml b/charts/wasmcloud-operator/templates/deployment.yaml index 32dd527..2297bf4 100644 --- a/charts/wasmcloud-operator/templates/deployment.yaml +++ b/charts/wasmcloud-operator/templates/deployment.yaml @@ -36,7 +36,7 @@ spec: imagePullPolicy: {{ .Values.image.pullPolicy }} env: - name: RUST_LOG - value: info + value: info,async_nats=error - name: POD_NAMESPACE valueFrom: fieldRef: diff --git a/deploy/base/deployment.yaml b/deploy/base/deployment.yaml index 9388c9c..b33a9d8 100644 --- a/deploy/base/deployment.yaml +++ b/deploy/base/deployment.yaml @@ -49,6 +49,7 @@ rules: - services - configmaps - serviceaccounts + - pods verbs: - get - list @@ -104,6 +105,17 @@ rules: - list - patch - update + - apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - create + - delete + - get + - list + - patch + - update - apiGroups: - k8s.wasmcloud.dev resources: diff --git a/deploy/local/local-registry.yaml b/deploy/local/local-registry.yaml index 3770763..9931997 100644 --- a/deploy/local/local-registry.yaml +++ b/deploy/local/local-registry.yaml @@ -1,3 +1,8 @@ - op: add path: /spec/template/spec/containers/0/image value: localhost:5001/wasmcloud-operator:latest +- op: replace + path: /spec/template/spec/containers/0/env/0 + value: + name: RUST_LOG + value: info,controller::services=debug,async_nats=warn,controller::controller=debug diff --git a/hack/run-kind-cluster.sh b/hack/run-kind-cluster.sh new file mode 100755 index 0000000..6d6ded8 --- /dev/null +++ b/hack/run-kind-cluster.sh @@ -0,0 +1,64 @@ +#!/bin/sh +set -o errexit + +# 1. Create registry container unless it already exists +reg_name='kind-registry' +reg_port='5001' +if [ "$(docker inspect -f '{{.State.Running}}' "${reg_name}" 2>/dev/null || true)" != 'true' ]; then + docker run \ + -d --restart=always -p "127.0.0.1:${reg_port}:5000" --name "${reg_name}" \ + registry:2 +fi + +# 2. Create kind cluster with containerd registry config dir enabled +# TODO: kind will eventually enable this by default and this patch will +# be unnecessary. +# +# See: +# https://github.com/kubernetes-sigs/kind/issues/2875 +# https://github.com/containerd/containerd/blob/main/docs/cri/config.md#registry-configuration +# See: https://github.com/containerd/containerd/blob/main/docs/hosts.md +cat <>>, + service_watcher: ServiceWatcher, } #[derive(Clone, Default)] @@ -177,7 +185,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc) -> Resul version: a.version.clone(), }) .collect::>(); - info!("Found apps: {:?}", app_names); + debug!("Found apps: {:?}", app_names); cfg.status = Some(WasmCloudHostConfigStatus { apps: app_names.clone(), app_count: app_names.len() as u32, @@ -195,6 +203,32 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc) -> Resul e })?; + // Start the watcher so that services are automatically created in the cluster. + let nats_client = get_client( + &cfg.spec.nats_address, + ctx.nats_creds.clone(), + NameNamespace::new(name.clone(), ns.clone()), + ) + .await + .map_err(|e| { + warn!("Failed to get nats client: {}", e); + Error::NatsError(format!("Failed to get nats client: {}", e)) + })?; + + ctx.service_watcher + .watch(nats_client, ns.clone(), cfg.spec.lattice.clone()) + .await + .map_err(|e| { + warn!("Failed to start service watcher: {}", e); + Error::NatsError(format!("Failed to watch services: {}", e)) + })?; + + // Reconcile the services in the lattice so that any existing apps ar refected as services in + // the cluster. + ctx.service_watcher + .reconcile_services(apps, cfg.spec.lattice.clone()) + .await; + Ok(Action::requeue(Duration::from_secs(5 * 60))) } @@ -204,16 +238,25 @@ async fn cleanup(config: &WasmCloudHostConfig, ctx: Arc) -> Result = Api::namespaced(client, &ns); - let nst = NameNamespace::new(name.clone(), ns.clone()); - ctx.nats_creds.write().await.remove(&nst); + if config.metadata.deletion_timestamp.is_some() { + let nst = NameNamespace::new(name.clone(), ns.clone()); + ctx.nats_creds.write().await.remove(&nst); + } + + debug!("Cleaning up service\"{}\" in {}", name, ns); + ctx.service_watcher + .stop_watch(config.spec.lattice.clone(), ns.clone()) + .await + .map_err(|e| { + warn!("Failed to stop service watcher: {}", e); + Error::NatsError(format!("Failed to stop service watcher: {}", e)) + })?; Ok(Action::await_change()) } fn pod_template(config: &WasmCloudHostConfig, _ctx: Arc) -> PodTemplateSpec { - let mut labels = BTreeMap::new(); - labels.insert("app.kubernetes.io/name".to_string(), config.name_any()); - labels.insert("app.kubernetes.io/instance".to_string(), config.name_any()); + let labels = pod_labels(config); let mut wasmcloud_env = vec![ EnvVar { @@ -440,11 +483,8 @@ fn pod_template(config: &WasmCloudHostConfig, _ctx: Arc) -> PodTemplate fn deployment_spec(config: &WasmCloudHostConfig, ctx: Arc) -> DeploymentSpec { let pod_template = pod_template(config, ctx); - let mut labels = BTreeMap::new(); - labels.insert("app.kubernetes.io/name".to_string(), config.name_any()); - labels.insert("app.kubernetes.io/instance".to_string(), config.name_any()); let ls = LabelSelector { - match_labels: Some(labels.clone()), + match_labels: Some(selector_labels(config)), ..Default::default() }; @@ -456,13 +496,34 @@ fn deployment_spec(config: &WasmCloudHostConfig, ctx: Arc) -> Deploymen } } -fn daemonset_spec(config: &WasmCloudHostConfig, ctx: Arc) -> DaemonSetSpec { - let pod_template = pod_template(config, ctx); +fn pod_labels(config: &WasmCloudHostConfig) -> BTreeMap { + let mut labels = selector_labels(config); + labels.extend(common_labels()); + if let Some(host_labels) = &config.spec.host_labels { + for (k, v) in host_labels.iter() { + labels.insert( + format!("{WASMCLOUD_OPERATOR_HOST_LABEL_PREFIX}/{}", k.clone()), + v.clone(), + ); + } + } + labels +} + +fn selector_labels(config: &WasmCloudHostConfig) -> BTreeMap { let mut labels = BTreeMap::new(); - labels.insert("app.kubernetes.io/name".to_string(), config.name_any()); + labels.insert( + "app.kubernetes.io/name".to_string(), + "wasmcloud".to_string(), + ); labels.insert("app.kubernetes.io/instance".to_string(), config.name_any()); + labels +} + +fn daemonset_spec(config: &WasmCloudHostConfig, ctx: Arc) -> DaemonSetSpec { + let pod_template = pod_template(config, ctx); let ls = LabelSelector { - match_labels: Some(labels.clone()), + match_labels: Some(selector_labels(config)), ..Default::default() }; @@ -553,6 +614,7 @@ async fn configure_hosts(config: &WasmCloudHostConfig, ctx: Arc) -> Res name: Some(config.name_any()), namespace: Some(config.namespace().unwrap()), owner_references: Some(vec![config.controller_owner_ref(&()).unwrap()]), + labels: Some(common_labels()), ..Default::default() }, spec: Some(spec), @@ -580,6 +642,7 @@ async fn configure_hosts(config: &WasmCloudHostConfig, ctx: Arc) -> Res name: Some(config.name_any()), namespace: Some(config.namespace().unwrap()), owner_references: Some(vec![config.controller_owner_ref(&()).unwrap()]), + labels: Some(common_labels()), ..Default::default() }, spec: Some(spec), @@ -600,7 +663,10 @@ async fn configure_hosts(config: &WasmCloudHostConfig, ctx: Arc) -> Res async fn configure_service(config: &WasmCloudHostConfig, ctx: Arc) -> Result<()> { let mut label_selector = BTreeMap::new(); - label_selector.insert("app.kubernetes.io/name".to_string(), config.name_any()); + label_selector.insert( + "app.kubernetes.io/name".to_string(), + "wasmcloud".to_string(), + ); label_selector.insert("app.kubernetes.io/instance".to_string(), config.name_any()); let mut labels = label_selector.clone(); @@ -811,18 +877,30 @@ pub async fn run(state: State) -> anyhow::Result<()> { let cms = Api::::all(client.clone()); let deployments = Api::::all(client.clone()); let secrets = Api::::all(client.clone()); + let services = Api::::all(client.clone()); + let pods = Api::::all(client.clone()); + let watcher = ServiceWatcher::new(client.clone()); let config = Config::default(); let ctx = Context { client, wasmcloud_config: state.config.clone(), nats_creds: state.nats_creds.clone(), + service_watcher: watcher, + }; + + let label_config_watcher = watcher::Config { + label_selector: Some(common_labels_selector()), + ..Default::default() }; + // TODO: Restrict these to use the right label selectors Controller::new(configs, watcher::Config::default()) .owns(cms, watcher::Config::default()) - .owns(deployments, watcher::Config::default()) + .owns(deployments, label_config_watcher.clone()) .owns(secrets, watcher::Config::default()) + .owns(services, label_config_watcher.clone()) + .owns(pods, label_config_watcher.clone()) .with_config(config) .shutdown_on_signal() .run(reconcile, error_policy, Arc::new(ctx)) @@ -835,3 +913,21 @@ pub async fn run(state: State) -> anyhow::Result<()> { .await; Ok(()) } + +pub(crate) fn common_labels() -> BTreeMap { + BTreeMap::from_iter(vec![ + ( + "app.kubernetes.io/name".to_string(), + "wasmcloud".to_string(), + ), + ( + "app.kubernetes.io/managed-by".to_string(), + "wasmcloud-operator".to_string(), + ), + ]) +} +fn common_labels_selector() -> String { + format!( + "app.kubernetes.io/name=wasmcloud,app.{WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT}" + ) +} diff --git a/src/lib.rs b/src/lib.rs index 6362907..0e0ce35 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,7 @@ pub mod header; pub(crate) mod openapi; pub mod resources; pub mod router; +pub(crate) mod services; pub(crate) mod table; pub use crate::controller::*; diff --git a/src/resources/application.rs b/src/resources/application.rs index ea2534b..59fab2e 100644 --- a/src/resources/application.rs +++ b/src/resources/application.rs @@ -501,7 +501,7 @@ pub async fn list_apps( Ok(models) } -async fn get_client( +pub async fn get_client( cluster_url: &str, nats_creds: Arc>>, namespace: NameNamespace, diff --git a/src/services.rs b/src/services.rs new file mode 100644 index 0000000..f47be8e --- /dev/null +++ b/src/services.rs @@ -0,0 +1,757 @@ +use crate::controller::{ + common_labels, CLUSTER_CONFIG_FINALIZER, SERVICE_FINALIZER, + WASMCLOUD_OPERATOR_HOST_LABEL_PREFIX, WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT, +}; +use anyhow::Result; +use async_nats::{ + jetstream, + jetstream::{ + consumer::{pull::Config, Consumer}, + stream::{Config as StreamConfig, RetentionPolicy, Source, StorageType, SubjectTransform}, + AckKind, + }, + Client, +}; +use cloudevents::{AttributesReader, Event as CloudEvent}; +use futures::StreamExt; +use k8s_openapi::api::core::v1::{Pod, Service, ServicePort, ServiceSpec}; +use k8s_openapi::api::discovery::v1::{Endpoint, EndpointPort, EndpointSlice}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; +use kube::{ + api::{Api, DeleteParams, ListParams, Patch, PatchParams}, + client::Client as KubeClient, + Resource, +}; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, warn}; +use wadm::{ + events::{Event, ManifestPublished, ManifestUnpublished}, + model::{Manifest, TraitProperty}, + server::{GetResult, ModelSummary}, +}; +use wash_lib::app; +use wasmcloud_operator_types::v1alpha1::WasmCloudHostConfig; + +const CONSUMER_PREFIX: &str = "wasmcloud_operator_service"; +// This should probably be exposed by wadm somewhere +const WADM_EVT_SUBJECT: &str = "wadm.evt"; +const OPERATOR_STREAM_NAME: &str = "wasmcloud_operator_events"; + +/// Commands that can be sent to the watcher to trigger an update or removal of a service. +#[derive(Clone, Debug)] +enum WatcherCommand { + UpsertService(ServiceParams), + RemoveService { + name: String, + namespaces: HashSet, + }, + RemoveServices { + namespaces: HashSet, + }, +} + +/// Parameters for creating or updating a service in the cluster. +#[derive(Clone, Debug)] +pub struct ServiceParams { + name: String, + namespaces: HashSet, + lattice_id: String, + port: u16, + host_labels: Option>, +} + +/// Watches for new services to be created in the cluster for a partcular lattice and creates or +/// updates them as necessary. +#[derive(Clone, Debug)] +pub struct Watcher { + namespaces: HashSet, + lattice_id: String, + nats_client: Client, + shutdown: CancellationToken, + consumer: Consumer, + tx: mpsc::Sender, +} + +impl Drop for Watcher { + fn drop(&mut self) { + self.shutdown.cancel(); + } +} + +impl Watcher { + /// Creates a new watcher for a particular lattice. + fn new( + namespace: String, + lattice_id: String, + nats_client: Client, + consumer: Consumer, + tx: mpsc::Sender, + ) -> Self { + let watcher = Self { + namespaces: HashSet::from([namespace]), + nats_client, + lattice_id: lattice_id.clone(), + consumer, + shutdown: CancellationToken::new(), + tx, + }; + + // TODO is there a better way to handle this? + let watcher_dup = watcher.clone(); + tokio::spawn(async move { + tokio::select! { + _ = watcher_dup.shutdown.cancelled() => { + debug!("Service watcher shutting down for lattice {}", lattice_id); + } + _ = watcher_dup.watch_events(&watcher_dup.consumer) => { + error!("Service watcher for lattice {} has stopped", lattice_id); + } + } + }); + + watcher + } + + /// Watches for new events on the mirrored wadm_events stream and processes them. + async fn watch_events(&self, consumer: &Consumer) -> Result<()> { + let mut messages = consumer.stream().messages().await?; + while let Some(message) = messages.next().await { + if let Ok(message) = message { + match self.handle_event(message.clone()) { + Ok(_) => message + .ack() + .await + .map_err(|e| { + error!(error=%e, "Error acking message"); + e + }) + .ok(), + Err(_) => message + .ack_with(AckKind::Nak(None)) + .await + .map_err(|e| { + error!(error=%e, "Error nacking message"); + e + }) + .ok(), + }; + } + } + Ok(()) + } + + /// Handles a new event from the consumer. + fn handle_event(&self, message: async_nats::jetstream::Message) -> Result<()> { + let event = serde_json::from_slice::(&message.payload) + .map_err(|e| anyhow::anyhow!("Error parsing cloudevent: {}", e))?; + let evt = match Event::try_from(event.clone()) { + Ok(evt) => evt, + Err(e) => { + warn!( + error=%e, + event_type=%event.ty(), + "Error converting cloudevent to wadm event", + ); + return Ok(()); + } + }; + match evt { + Event::ManifestPublished(mp) => { + let name = mp.manifest.metadata.name.clone(); + self.handle_manifest_published(mp).map_err(|e| { + error!(lattice_id = %self.lattice_id, manifest = name, "Error handling manifest published event: {}", e); + e + })?; + } + Event::ManifestUnpublished(mu) => { + let name = mu.name.clone(); + self.handle_manifest_unpublished(mu).map_err(|e| { + error!(lattice_id = %self.lattice_id, manifest = name, "Error handling manifest unpublished event: {}", e); + e + })?; + } + _ => {} + } + Ok(()) + } + + /// Handles a manifest published event. + fn handle_manifest_published(&self, mp: ManifestPublished) -> Result<()> { + debug!(manifest=?mp, "Handling manifest published event"); + let manifest = mp.manifest; + if let Some(httpserver_service) = http_server_component(&manifest) { + if let Some(address) = find_address(&manifest, httpserver_service.name.as_str()) { + debug!(address = address, "Found address"); + if let Ok(addr) = address.parse::() { + debug!("Upserting service for manifest: {}", manifest.metadata.name); + self.tx + .try_send(WatcherCommand::UpsertService(ServiceParams { + name: manifest.metadata.name.clone(), + lattice_id: self.lattice_id.clone(), + port: addr.port(), + namespaces: self.namespaces.clone(), + host_labels: httpserver_service.labels, + })) + .map_err(|e| anyhow::anyhow!("Error sending command to watcher: {}", e))?; + } else { + error!(address = address, "Invalid address in manifest"); + } + } + } + Ok(()) + } + + /// Handles a manifest unpublished event. + fn handle_manifest_unpublished(&self, mu: ManifestUnpublished) -> Result<()> { + self.tx + .try_send(WatcherCommand::RemoveService { + name: mu.name, + namespaces: self.namespaces.clone(), + }) + .map_err(|e| anyhow::anyhow!("Error sending command to watcher: {}", e))?; + Ok(()) + } +} + +/// Waits for commands to update or remove services based on manifest deploy/undeploy events in +/// underlying lattices. +/// Each lattice is managed by a [Watcher] which listens for events relayed by a NATS consumer and +/// issues commands to create or update services in the cluster. +pub struct ServiceWatcher { + watchers: Arc>>, + sender: mpsc::Sender, +} + +impl ServiceWatcher { + /// Creates a new service watcher. + pub fn new(k8s_client: KubeClient) -> Self { + // TODO: Should this be unbounded or have a larger bound? + let (tx, mut rx) = mpsc::channel::(1000); + + let client = k8s_client.clone(); + tokio::spawn(async move { + while let Some(cmd) = rx.recv().await { + match cmd { + WatcherCommand::UpsertService(params) => { + create_or_update_service(client.clone(), ¶ms, None) + .await + .map_err(|e| error!(error=%e, "Error creating/updating service")) + .ok(); + } + WatcherCommand::RemoveService { name, namespaces } => { + for namespace in namespaces { + delete_service(client.clone(), &namespace, name.as_str()) + .await + .map_err(|e| error!(error=%e, namespace=namespace, "Error deleting service")) + .ok(); + } + } + WatcherCommand::RemoveServices { namespaces } => { + for namespace in namespaces { + delete_services(client.clone(), namespace.as_str()) + .await + .map_err(|e| error!(error=%e, namespace=namespace, "Error deleting service")) + .ok(); + } + } + } + } + }); + + Self { + watchers: Arc::new(RwLock::new(HashMap::new())), + sender: tx, + } + } + + /// Reconciles services for a set of apps in a lattice. + /// This intended to be called by the controller whenever it reconciles state. + pub async fn reconcile_services(&self, apps: Vec, lattice_id: String) { + if let Some(watcher) = self.watchers.read().await.get(lattice_id.as_str()) { + for app in apps { + if app.deployed_version.is_none() { + continue; + } + match app::get_model_details( + &watcher.nats_client, + Some(lattice_id.clone()), + app.name.as_str(), + app.deployed_version, + ) + .await + { + Ok(model) => { + if model.result == GetResult::Success { + // TODO handle this or decide on whether or not to return a result at + // all + let _ = watcher.handle_manifest_published(ManifestPublished { + manifest: model.manifest.unwrap(), + }).map_err(|e| error!(error=%e, lattice_id=%lattice_id, app=app.name, "failed to trigger service reconciliation for app")); + } + } + Err(e) => warn!(error=%e, "Unable to retieve model"), + }; + } + }; + } + + /// Create a new [Watcher] for a lattice. + /// It will return early if a [Watcher] already exists for the lattice. + pub async fn watch(&self, client: Client, namespace: String, lattice_id: String) -> Result<()> { + // If we're already watching this lattice then return early + // TODO is there an easy way to do this with a read lock? + let mut watchers = self.watchers.write().await; + if let Some(watcher) = watchers.get_mut(lattice_id.as_str()) { + if !watcher.namespaces.contains(&namespace) { + watcher.namespaces.insert(namespace); + } + return Ok(()); + } + + let js = jetstream::new(client.clone()); + let source_subject = format!("{WADM_EVT_SUBJECT}.{}", lattice_id.clone()); + let destination_subject = format!("wasmcloud_operator_events.{}", lattice_id.clone()); + + // TODO should any of this be configurable? + // Should we also be doing this when we first create the ServiceWatcher? + let stream = js + .get_or_create_stream(StreamConfig { + name: OPERATOR_STREAM_NAME.to_string(), + description: Some( + "Stream for wadm events consumed by the wasmCloud K8s Operator".to_string(), + ), + max_age: wadm::DEFAULT_EXPIRY_TIME, + retention: RetentionPolicy::WorkQueue, + storage: StorageType::File, + allow_rollup: false, + num_replicas: 1, + mirror: Some(Source { + name: "wadm_events".to_string(), + subject_transforms: vec![SubjectTransform { + source: source_subject, + destination: format!("wasmcloud_operator_events.{}", lattice_id.clone()), + }], + ..Default::default() + }), + ..Default::default() + }) + .await?; + + let consumer_name = format!("{CONSUMER_PREFIX}-{}", lattice_id.clone()); + let consumer = stream + .get_or_create_consumer( + consumer_name.as_str(), + Config { + durable_name: Some(consumer_name.clone()), + description: Some("Consumer created by the wasmCloud K8s Operator to watch for new service endpoints in wadm manifests".to_string()), + ack_policy: jetstream::consumer::AckPolicy::Explicit, + ack_wait: std::time::Duration::from_secs(2), + max_deliver: 3, + deliver_policy: async_nats::jetstream::consumer::DeliverPolicy::All, + filter_subject: destination_subject.clone(), + ..Default::default() + }, + ) + .await?; + + let watcher = Watcher::new( + namespace, + lattice_id.clone(), + client.clone(), + consumer, + self.sender.clone(), + ); + watchers.insert(lattice_id.clone(), watcher); + Ok(()) + } + + /// Stops watching a lattice by stopping the underlying [Watcher] if no namespaces require it. + pub async fn stop_watch(&self, lattice_id: String, namespace: String) -> Result<()> { + let mut watchers = self.watchers.write().await; + if let Some(watcher) = watchers.get_mut(lattice_id.as_str()) { + watcher.namespaces.remove(namespace.as_str()); + if watcher.namespaces.is_empty() { + watchers.remove(lattice_id.as_str()); + } + + self.sender + .try_send(WatcherCommand::RemoveServices { + namespaces: HashSet::from([namespace]), + }) + .map_err(|e| anyhow::anyhow!("Error sending command to watcher: {}", e))?; + } + Ok(()) + } +} + +/// Creates or updates a service in the cluster based on the provided parameters. +pub async fn create_or_update_service( + k8s_client: KubeClient, + params: &ServiceParams, + owner_ref: Option, +) -> Result<()> { + let mut labels = common_labels(); + labels.extend(BTreeMap::from([( + "app.kubernetes.io/name".to_string(), + params.name.to_string(), + )])); + let mut selector = BTreeMap::new(); + let mut create_endpoints = false; + if let Some(host_labels) = ¶ms.host_labels { + selector.insert( + "app.kubernetes.io/name".to_string(), + "wasmcloud".to_string(), + ); + host_labels.iter().for_each(|(k, v)| { + selector.insert(format_service_selector(k), v.clone()); + }) + } else { + create_endpoints = true; + } + + for namespace in params.namespaces.iter() { + let api = Api::::namespaced(k8s_client.clone(), namespace); + + let mut svc = Service { + metadata: kube::api::ObjectMeta { + name: Some(params.name.clone()), + labels: Some(labels.clone()), + finalizers: Some(vec![SERVICE_FINALIZER.to_string()]), + namespace: Some(namespace.clone()), + ..Default::default() + }, + spec: Some(ServiceSpec { + selector: Some(selector.clone()), + ports: Some(vec![ServicePort { + name: Some("http".to_string()), + port: params.port as i32, + protocol: Some("TCP".to_string()), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + }; + + if let Some(owner_ref) = &owner_ref { + svc.metadata.owner_references = Some(vec![owner_ref.clone()]); + } + + debug!(service =? svc, namespace=namespace, "Creating/updating service"); + + let svc = api + .patch( + params.name.as_str(), + &PatchParams::apply(SERVICE_FINALIZER), + &Patch::Apply(svc), + ) + .await + .map_err(|e| { + error!("Error creating/updating service: {}", e); + e + })?; + + if create_endpoints { + let crds = + Api::::namespaced(k8s_client.clone(), namespace.as_str()); + let pods = Api::::namespaced(k8s_client.clone(), namespace.as_str()); + let endpoints = + Api::::namespaced(k8s_client.clone(), namespace.as_str()); + + let configs = crds.list(&ListParams::default()).await?; + let mut ips = vec![]; + for cfg in configs { + if cfg.spec.lattice == params.lattice_id { + let name = cfg.metadata.name.unwrap(); + let pods = pods + .list(&ListParams { + label_selector: Some(format!( + "app.kubernetes.io/name=wasmcloud,app.kubernetes.io/instance={name}" + )), + ..Default::default() + }) + .await?; + for pod in pods { + if let Some(status) = pod.status { + if status.phase == Some("Running".to_string()) { + if let Some(pod_ips) = status.pod_ips { + ips.extend(pod_ips); + } + } + } + } + } + } + + // Create an EndpointSlice if we're working with a daemonscaler without label requirements. + // This means we need to manually map the endpoints to each wasmCloud host belonging to the + // lattice in this namespace. + // TODO: This can actually span namespaces, same with the label requirements so should we + // be querying _all_ CRDs to find all available pods? + if !ips.is_empty() { + let mut labels = labels.clone(); + labels.insert( + "kubernetes.io/service-name".to_string(), + params.name.clone(), + ); + let endpoint_slice = EndpointSlice { + metadata: kube::api::ObjectMeta { + name: Some(params.name.clone()), + labels: Some(labels.clone()), + // SAFETY: This should be safe according to the kube.rs docs, which specifiy + // that anything created through the apiserver should have a populated field + // here. + owner_references: Some(vec![svc.controller_owner_ref(&()).unwrap()]), + ..Default::default() + }, + // TODO is there a way to figure this out automatically? Maybe based on the number + // of IPs that come back or what they are + address_type: "IPv4".to_string(), + endpoints: ips + .iter() + .filter_map(|ip| { + ip.ip.as_ref().map(|i| Endpoint { + addresses: vec![i.clone()], + hostname: None, + target_ref: None, + ..Default::default() + }) + }) + .collect(), + ports: Some(vec![EndpointPort { + name: Some("http".to_string()), + port: Some(params.port as i32), + protocol: Some("TCP".to_string()), + app_protocol: None, + }]), + }; + // TODO this should probably do the usual get/patch or get/replce bit since I don't + // think this is fully syncing endpoints when pods are deleted. Also we should update + // this based on pod status since we may end up having stale IPs + endpoints + .patch( + params.name.as_str(), + &PatchParams::apply(CLUSTER_CONFIG_FINALIZER), + &Patch::Apply(endpoint_slice), + ) + .await + .map_err(|e| { + error!("Error creating endpoint slice: {}", e); + e + })?; + } + }; + } + + debug!("Created/updated service"); + Ok(()) +} + +#[derive(Default)] +pub struct HttpServerComponent { + name: String, + labels: Option>, +} + +/// Finds the httpserver component in a manifest and returns the details needed to create a service +fn http_server_component(manifest: &Manifest) -> Option { + for component in manifest.spec.components.iter() { + if let wadm::model::Properties::Capability { properties } = &component.properties { + if properties.contract == "wasmcloud:httpserver" { + let mut details = HttpServerComponent { + name: component.name.clone(), + ..Default::default() + }; + + // Store the set of labels for this component so that we can match them to hosts + // when creating the label selector on the service. + if let Some(traits) = &component.traits { + for t in traits { + // The only way we know how to properly create a service without + // being told to in the manifest is if we're using a daemonscaler. + // That guarantees a k8s service can route traffic to any pod in a + // deployment and that it will actually handle the inbound request. + // Alternatively we could try spying on wadm commands and + // reconciling host inventories, but that might not be worth it. + if t.trait_type != "daemonscaler" { + continue; + } + if let TraitProperty::SpreadScaler(scaler) = &t.properties { + for spread in scaler.spread.iter() { + spread.requirements.iter().for_each(|(k, v)| { + details + .labels + .get_or_insert_with(HashMap::new) + .insert(k.clone(), v.clone()); + }); + } + } + return Some(details); + } + } + } + } + } + None +} + +/// Finds the address for a target in a manifest +fn find_address(manifest: &Manifest, target: &str) -> Option { + for component in manifest.spec.components.iter() { + if let wadm::model::Properties::Actor { properties: _ } = &component.properties { + if let Some(traits) = &component.traits { + for t in traits { + if let wadm::model::TraitProperty::Linkdef(props) = &t.properties { + if props.target == target { + if let Some(values) = &props.values { + if let Some(address) = values.get("address") { + return Some(address.clone()); + } + } + } + } + } + } + } + } + None +} + +/// Deletes a service in the cluster. +async fn delete_service(k8s_client: KubeClient, namespace: &str, name: &str) -> Result<()> { + debug!(namespace = namespace, name = name, "Deleting service"); + let api = Api::::namespaced(k8s_client.clone(), namespace); + // Remove the finalizer so that the service can be deleted + let mut svc = api.get(name).await?; + svc.metadata.finalizers = None; + svc.metadata.managed_fields = None; + api.patch( + name, + &PatchParams::apply(SERVICE_FINALIZER).force(), + &Patch::Apply(svc), + ) + .await + .map_err(|e| { + error!("Error removing finalizer from service: {}", e); + e + })?; + + api.delete(name, &DeleteParams::default()).await?; + Ok(()) +} + +async fn delete_services(k8s_client: KubeClient, namespace: &str) -> Result<()> { + let api = Api::::namespaced(k8s_client.clone(), namespace); + let services = api + .list(&ListParams { + label_selector: Some(WASMCLOUD_OPERATOR_MANAGED_BY_LABEL_REQUIREMENT.to_string()), + ..Default::default() + }) + .await?; + for svc in services { + let name = svc.metadata.name.unwrap(); + delete_service(k8s_client.clone(), namespace, name.as_str()).await?; + } + Ok(()) +} + +/// Formats a service selector for a given name. +fn format_service_selector(name: &str) -> String { + format!("{WASMCLOUD_OPERATOR_HOST_LABEL_PREFIX}/{}", name) +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_daemonscaler_should_return() { + let manifest = r#" +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: echo + annotations: + version: abc123 + description: "wasmCloud Echo Example" +spec: + components: + - name: echo + type: actor + properties: + image: wasmcloud.azurecr.io/echo:0.3.8 + traits: + - type: spreadscaler + properties: + replicas: 1 + - type: linkdef + properties: + target: httpserver + values: + address: 0.0.0.0:8080 + + - name: httpserver + type: capability + properties: + image: wasmcloud.azurecr.io/httpserver:0.17.0 + contract: wasmcloud:httpserver + traits: + - type: daemonscaler + properties: + replicas: 1 + spread: + - name: test + requirements: + test: value +"#; + let m = serde_yaml::from_str::(manifest).unwrap(); + let component = http_server_component(&m); + assert!(component.is_some()); + + let manifest = r#" +apiVersion: core.oam.dev/v1beta1 +kind: Application +metadata: + name: echo + annotations: + version: abc123 + description: "wasmCloud Echo Example" +spec: + components: + - name: echo + type: actor + properties: + image: wasmcloud.azurecr.io/echo:0.3.8 + traits: + - type: spreadscaler + properties: + replicas: 1 + - type: linkdef + properties: + target: httpserver + values: + address: 0.0.0.0:8080 + + - name: httpserver + type: capability + properties: + image: wasmcloud.azurecr.io/httpserver:0.17.0 + contract: wasmcloud:httpserver + traits: + - type: spreadscaler + properties: + replicas: 1 + spread: + - name: test + requirements: + test: value +"#; + let m = serde_yaml::from_str::(manifest).unwrap(); + let component = http_server_component(&m); + assert!(component.is_none()); + } +}