Skip to content

Commit

Permalink
chore: Adapt to the new wadm_events stream hierarchy
Browse files Browse the repository at this point in the history
Signed-off-by: Joonas Bergius <joonas@cosmonic.com>
  • Loading branch information
joonas committed Jun 30, 2024
1 parent 7dcb409 commit 420897b
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ use crate::controller::{

const CONSUMER_PREFIX: &str = "wasmcloud_operator_service";
// This should probably be exposed by wadm somewhere
const WADM_EVT_SUBJECT: &str = "wadm.evt";
const WADM_EVENT_STREAM_NAME: &str = "wadm_events";
const OPERATOR_STREAM_NAME: &str = "wasmcloud_operator_events";
const OPERATOR_STREAM_SUBJECT: &str = "wasmcloud_operator_events.*.>";

/// Commands that can be sent to the watcher to trigger an update or removal of a service.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -305,8 +306,6 @@ impl ServiceWatcher {
}

let js = jetstream::new(client.clone());
let source_subject = format!("{WADM_EVT_SUBJECT}.{}", lattice_id.clone());
let destination_subject = format!("wasmcloud_operator_events.{}", lattice_id.clone());

// Should we also be doing this when we first create the ServiceWatcher?
let stream = js
Expand All @@ -321,10 +320,10 @@ impl ServiceWatcher {
allow_rollup: false,
num_replicas: self.stream_replicas as usize,
mirror: Some(Source {
name: "wadm_events".to_string(),
name: WADM_EVENT_STREAM_NAME.to_string(),
subject_transforms: vec![SubjectTransform {
source: source_subject,
destination: format!("wasmcloud_operator_events.{}", lattice_id.clone()),
source: wadm::DEFAULT_WADM_EVENTS_TOPIC.to_string(),
destination: OPERATOR_STREAM_SUBJECT.replacen('*', "{{wildcard(1)}}", 1),
}],
..Default::default()
}),
Expand All @@ -343,7 +342,7 @@ impl ServiceWatcher {
ack_wait: std::time::Duration::from_secs(2),
max_deliver: 3,
deliver_policy: async_nats::jetstream::consumer::DeliverPolicy::All,
filter_subject: destination_subject.clone(),
filter_subject: OPERATOR_STREAM_SUBJECT.replacen('*', &lattice_id, 1),
..Default::default()
},
)
Expand Down Expand Up @@ -623,7 +622,7 @@ fn http_server_component(manifest: &Manifest) -> Option<HttpServerComponent> {
for p in props.source_config.iter() {
if let Some(config_props) = &p.properties {
if let Some(addr) = config_props.get("address") {
details.address = addr.clone();
details.address.clone_from(addr);
should_create_service = true;
};
}
Expand Down

0 comments on commit 420897b

Please sign in to comment.