Skip to content

Commit

Permalink
Merge pull request #37 from wasmCloud/fix_nats_ports
Browse files Browse the repository at this point in the history
fix: add options for setting NATS leaf and client ports separately
  • Loading branch information
protochron authored May 28, 2024
2 parents 45ada5a + 6df71d1 commit cfc139f
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "wasmcloud-operator-types"
version = "0.1.3"
version = "0.1.4"
edition = "2021"

[dependencies]
Expand Down
18 changes: 16 additions & 2 deletions crates/types/src/v1alpha1/wasmcloud_host_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use k8s_openapi::api::core::v1::{PodSpec, ResourceRequirements};
use kube::CustomResource;
use schemars::{gen::SchemaGenerator, schema::Schema, JsonSchema};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeSet, HashMap};
use std::collections::{BTreeMap, BTreeSet};

#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[cfg_attr(test, derive(Default))]
Expand All @@ -26,7 +26,7 @@ pub struct WasmCloudHostConfigSpec {
/// The lattice to use for these hosts.
pub lattice: String,
/// An optional set of labels to apply to these hosts.
pub host_labels: Option<HashMap<String, String>>,
pub host_labels: Option<BTreeMap<String, String>>,
/// The version of the wasmCloud host to deploy.
pub version: String,
/// The image to use for the wasmCloud host.
Expand All @@ -51,6 +51,12 @@ pub struct WasmCloudHostConfigSpec {
/// The address of the NATS server to connect to. Defaults to "nats://nats.default.svc.cluster.local".
#[serde(default = "default_nats_address")]
pub nats_address: String,
/// The port of the NATS server to connect to. Defaults to 4222.
#[serde(default = "default_nats_port")]
pub nats_client_port: u16,
/// The port of the NATS server to connect to for leaf node connections. Defaults to 7422.
#[serde(default = "default_nats_leafnode_port")]
pub nats_leafnode_port: u16,
/// The Jetstream domain to use for the NATS sidecar. Defaults to "default".
#[serde(default = "default_jetstream_domain")]
pub jetstream_domain: String,
Expand Down Expand Up @@ -128,6 +134,14 @@ fn default_log_level() -> String {
"INFO".to_string()
}

fn default_nats_port() -> u16 {
4222
}

fn default_nats_leafnode_port() -> u16 {
7422
}

#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
pub struct WasmCloudHostConfigResources {
pub nats: Option<ResourceRequirements>,
Expand Down
3 changes: 2 additions & 1 deletion sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ spec:
# Additional labels to apply to the host other than the defaults set in the controller
hostLabels:
test: value
cluster: kind
# Which wasmCloud version to use
version: "1.0.2"
# The name of a secret in the same namespace that provides the required secrets.
secretName: cluster-secrets
logLevel: INFO
natsAddress: nats://nats-cluster.default.svc.cluster.local:7422
natsAddress: nats://nats-cluster.default.svc.cluster.local
################################################
# Additional options that can be set for hosts:
################################################
Expand Down
6 changes: 4 additions & 2 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Resul
let nc = s.nats_creds.map(SecretString::new);
let apps = crate::resources::application::list_apps(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
nc.as_ref(),
cfg.spec.lattice.clone(),
)
Expand Down Expand Up @@ -202,6 +203,7 @@ async fn reconcile_crd(config: &WasmCloudHostConfig, ctx: Arc<Context>) -> Resul
// Start the watcher so that services are automatically created in the cluster.
let nats_client = get_client(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
ctx.nats_creds.clone(),
NameNamespace::new(name.clone(), ns.clone()),
)
Expand Down Expand Up @@ -835,7 +837,7 @@ jetstream {
leafnodes {
remotes: [
{
url: "{{cluster_url}}"
url: "{{cluster_url}}:{{leafnode_port}}"
{{#if use_credentials}}
credentials: "/nats/nats.creds"
{{/if}}
Expand All @@ -844,7 +846,7 @@ leafnodes {
}
"#;
let tpl = Handlebars::new();
let rendered = tpl.render_template(template, &json!({"jetstream_domain": config.spec.leaf_node_domain, "cluster_url": config.spec.nats_address, "use_credentials": use_nats_creds}))?;
let rendered = tpl.render_template(template, &json!({"jetstream_domain": config.spec.leaf_node_domain, "cluster_url": config.spec.nats_address, "leafnode_port": config.spec.nats_leafnode_port,"use_credentials": use_nats_creds}))?;
let mut contents = BTreeMap::new();
contents.insert("nats.conf".to_string(), rendered);
let cm = ConfigMap {
Expand Down
37 changes: 28 additions & 9 deletions src/resources/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,14 @@ pub async fn list_all_applications(
let secret = map.get(&nst);
// Prevent listing applications within a given lattice more than once
if !lattices.contains(&lattice_id) {
let result = match list_apps(&cfg.spec.nats_address, secret, lattice_id.clone()).await {
let result = match list_apps(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
secret,
lattice_id.clone(),
)
.await
{
Ok(apps) => apps,
Err(e) => return internal_error(anyhow!("unable to list applications: {}", e)),
};
Expand Down Expand Up @@ -440,7 +447,14 @@ pub async fn list_applications(
let secret = map.get(&nst);
// This is to check that we don't list a lattice more than once
if !lattices.contains(&lattice_id) {
let result = match list_apps(&cfg.spec.nats_address, secret, lattice_id.clone()).await {
let result = match list_apps(
&cfg.spec.nats_address,
&cfg.spec.nats_client_port,
secret,
lattice_id.clone(),
)
.await
{
Ok(apps) => apps,
Err(e) => return internal_error(anyhow!("unable to list applications: {}", e)),
};
Expand All @@ -466,16 +480,18 @@ pub async fn list_applications(

pub async fn list_apps(
cluster_url: &str,
port: &u16,
creds: Option<&SecretString>,
lattice_id: String,
) -> Result<Vec<ModelSummary>, Error> {
let addr = format!("{}:{}", cluster_url, port);
let client = match creds {
Some(creds) => {
ConnectOptions::with_credentials(creds.expose_secret())?
.connect(cluster_url)
.connect(addr)
.await?
}
None => ConnectOptions::new().connect(cluster_url).await?,
None => ConnectOptions::new().connect(addr).await?,
};
let models = wash_lib::app::get_models(&client, Some(lattice_id)).await?;

Expand All @@ -484,19 +500,21 @@ pub async fn list_apps(

pub async fn get_client(
cluster_url: &str,
port: &u16,
nats_creds: Arc<RwLock<HashMap<NameNamespace, SecretString>>>,
namespace: NameNamespace,
) -> Result<async_nats::Client, async_nats::ConnectError> {
let addr = format!("{}:{}", cluster_url, port);
let creds = nats_creds.read().await;
match creds.get(&namespace) {
Some(creds) => {
let creds = creds.expose_secret();
ConnectOptions::with_credentials(creds)
.expect("unable to create nats client")
.connect(cluster_url)
.connect(addr)
.await
}
None => ConnectOptions::new().connect(cluster_url).await,
None => ConnectOptions::new().connect(addr).await,
}
}

Expand Down Expand Up @@ -809,11 +827,12 @@ async fn get_lattice_connection(
let lattice_id = cfg.spec.lattice;
let lattice_name = cfg.metadata.name?;
let nst: NameNamespace = NameNamespace::new(lattice_name, namespace);
Some((cluster_url, nst, lattice_id))
let port = cfg.spec.nats_client_port;
Some((cluster_url, nst, lattice_id, port))
});

for (cluster_url, ns, lattice_id) in connection_data {
match get_client(&cluster_url, state.nats_creds.clone(), ns).await {
for (cluster_url, ns, lattice_id, port) in connection_data {
match get_client(&cluster_url, &port, state.nats_creds.clone(), ns).await {
Ok(c) => return Ok((c, lattice_id)),
Err(e) => {
error!(err = %e, %lattice_id, "error connecting to nats");
Expand Down

0 comments on commit cfc139f

Please sign in to comment.