Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add options for setting NATS leaf and client ports separately #37

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wasmcloud-operator-types"
version = "0.1.3"
version = "0.1.4"
edition = "2021"

[dependencies]
Expand Down
18 changes: 16 additions & 2 deletions crates/types/src/v1alpha1/wasmcloud_host_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use k8s_openapi::api::core::v1::{PodSpec, ResourceRequirements};
use kube::CustomResource;
use schemars::{gen::SchemaGenerator, schema::Schema, JsonSchema};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeMap, BTreeSet};

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[cfg_attr(test, derive(Default))]
Expand All @@ -26,7 +26,7 @@ pub struct WasmCloudHostConfigSpec {
/// The lattice to use for these hosts.
pub lattice: String,
/// An optional set of labels to apply to these hosts.
pub host_labels: Option<HashMap<String, String>>,
pub host_labels: Option<BTreeMap<String, String>>,
/// The version of the wasmCloud host to deploy.
pub version: String,
/// The image to use for the wasmCloud host.
Expand All @@ -51,6 +51,12 @@ pub struct WasmCloudHostConfigSpec {
/// The address of the NATS server to connect to. Defaults to "nats://nats.default.svc.cluster.local".
#[serde(default = "default_nats_address")]
pub nats_address: String,
/// The port of the NATS server to connect to. Defaults to 4222.
#[serde(default = "default_nats_port")]
pub nats_client_port: u16,
/// The port of the NATS server to connect to for leaf node connections. Defaults to 7422.
#[serde(default = "default_nats_leafnode_port")]
pub nats_leafnode_port: u16,
/// The Jetstream domain to use for the NATS sidecar. Defaults to "default".
#[serde(default = "default_jetstream_domain")]
pub jetstream_domain: String,
Expand Down Expand Up @@ -128,6 +134,14 @@ fn default_log_level() -> String {
"INFO".to_string()
}

fn default_nats_port() -> u16 {
4222
}

fn default_nats_leafnode_port() -> u16 {
7422
}

#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
pub struct WasmCloudHostConfigResources {
pub nats: Option<ResourceRequirements>,
Expand Down
3 changes: 2 additions & 1 deletion sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ spec:
# Additional labels to apply to the host other than the defaults set in the controller
hostLabels:
test: value
cluster: kind
# Which wasmCloud version to use
version: "1.0.2"
# The name of a secret in the same namespace that provides the required secrets.
secretName: cluster-secrets
logLevel: INFO
natsAddress: nats://nats-cluster.default.svc.cluster.local:7422
natsAddress: nats://nats-cluster.default.svc.cluster.local
################################################
# Additional options that can be set for hosts:
################################################
Expand Down
6 changes: 4 additions & 2 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Resul
let nc = s.nats_creds.map(SecretString::new);
let apps = crate::resources::application::list_apps(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
nc.as_ref(),
cfg.spec.lattice.clone(),
)
Expand Down Expand Up @@ -202,6 +203,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Resul
// Start the watcher so that services are automatically created in the cluster.
let nats_client = get_client(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
ctx.nats_creds.clone(),
NameNamespace::new(name.clone(), ns.clone()),
)
Expand Down Expand Up @@ -835,7 +837,7 @@ jetstream {
leafnodes {
remotes: [
{
url: "{{cluster_url}}"
url: "{{cluster_url}}:{{leafnode_port}}"
{{#if use_credentials}}
credentials: "/nats/nats.creds"
{{/if}}
Expand All @@ -844,7 +846,7 @@ leafnodes {
}
"#;
let tpl = Handlebars::new();
let rendered = tpl.render_template(template, &json!({"jetstream_domain": config.spec.leaf_node_domain, "cluster_url": config.spec.nats_address, "use_credentials": use_nats_creds}))?;
let rendered = tpl.render_template(template, &json!({"jetstream_domain": config.spec.leaf_node_domain, "cluster_url": config.spec.nats_address, "leafnode_port": config.spec.nats_leafnode_port,"use_credentials": use_nats_creds}))?;
let mut contents = BTreeMap::new();
contents.insert("nats.conf".to_string(), rendered);
let cm = ConfigMap {
Expand Down
37 changes: 28 additions & 9 deletions src/resources/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,14 @@ pub async fn list_all_applications(
let secret = map.get(&nst);
// Prevent listing applications within a given lattice more than once
if !lattices.contains(&lattice_id) {
let result = match list_apps(&cfg.spec.nats_address, secret, lattice_id.clone()).await {
let result = match list_apps(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
secret,
lattice_id.clone(),
)
.await
{
Ok(apps) => apps,
Err(e) => return internal_error(anyhow!("unable to list applications: {}", e)),
};
Expand Down Expand Up @@ -440,7 +447,14 @@ pub async fn list_applications(
let secret = map.get(&nst);
// This is to check that we don't list a lattice more than once
if !lattices.contains(&lattice_id) {
let result = match list_apps(&cfg.spec.nats_address, secret, lattice_id.clone()).await {
let result = match list_apps(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
secret,
lattice_id.clone(),
)
.await
{
Ok(apps) => apps,
Err(e) => return internal_error(anyhow!("unable to list applications: {}", e)),
};
Expand All @@ -466,16 +480,18 @@ pub async fn list_applications(

pub async fn list_apps(
cluster_url: &str,
port: &u16,
creds: Option<&SecretString>,
lattice_id: String,
) -> Result<Vec<ModelSummary>, Error> {
let addr = format!("{}:{}", cluster_url, port);
let client = match creds {
Some(creds) => {
ConnectOptions::with_credentials(creds.expose_secret())?
.connect(cluster_url)
.connect(addr)
.await?
}
None => ConnectOptions::new().connect(cluster_url).await?,
None => ConnectOptions::new().connect(addr).await?,
};
let models = wash_lib::app::get_models(&client, Some(lattice_id)).await?;

Expand All @@ -484,19 +500,21 @@ pub async fn list_apps(

pub async fn get_client(
cluster_url: &str,
port: &u16,
nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
namespace: NameNamespace,
) -> Result<async_nats::Client, async_nats::ConnectError> {
let addr = format!("{}:{}", cluster_url, port);
let creds = nats_creds.read().await;
match creds.get(&namespace) {
Some(creds) => {
let creds = creds.expose_secret();
ConnectOptions::with_credentials(creds)
.expect("unable to create nats client")
.connect(cluster_url)
.connect(addr)
.await
}
None => ConnectOptions::new().connect(cluster_url).await,
None => ConnectOptions::new().connect(addr).await,
}
}

Expand Down Expand Up @@ -809,11 +827,12 @@ async fn get_lattice_connection(
let lattice_id = cfg.spec.lattice;
let lattice_name = cfg.metadata.name?;
let nst: NameNamespace = NameNamespace::new(lattice_name, namespace);
Some((cluster_url, nst, lattice_id))
let port = cfg.spec.nats_client_port;
Some((cluster_url, nst, lattice_id, port))
});

for (cluster_url, ns, lattice_id) in connection_data {
match get_client(&cluster_url, state.nats_creds.clone(), ns).await {
for (cluster_url, ns, lattice_id, port) in connection_data {
match get_client(&cluster_url, &port, state.nats_creds.clone(), ns).await {
Ok(c) => return Ok((c, lattice_id)),
Err(e) => {
error!(err = %e, %lattice_id, "error connecting to nats");
Expand Down
Loading