From 420897be60c61a99ddd764573acfb4fa1413c062 Mon Sep 17 00:00:00 2001 From: Joonas Bergius Date: Sat, 29 Jun 2024 19:49:35 -0500 Subject: [PATCH] chore: Adapt to the new wadm_events stream hierarchy Signed-off-by: Joonas Bergius --- src/services.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/services.rs b/src/services.rs index 5247377..6a89d20 100644 --- a/src/services.rs +++ b/src/services.rs @@ -37,8 +37,9 @@ use crate::controller::{ const CONSUMER_PREFIX: &str = "wasmcloud_operator_service"; // This should probably be exposed by wadm somewhere -const WADM_EVT_SUBJECT: &str = "wadm.evt"; +const WADM_EVENT_STREAM_NAME: &str = "wadm_events"; const OPERATOR_STREAM_NAME: &str = "wasmcloud_operator_events"; +const OPERATOR_STREAM_SUBJECT: &str = "wasmcloud_operator_events.*.>"; /// Commands that can be sent to the watcher to trigger an update or removal of a service. #[derive(Clone, Debug)] @@ -305,8 +306,6 @@ impl ServiceWatcher { } let js = jetstream::new(client.clone()); - let source_subject = format!("{WADM_EVT_SUBJECT}.{}", lattice_id.clone()); - let destination_subject = format!("wasmcloud_operator_events.{}", lattice_id.clone()); // Should we also be doing this when we first create the ServiceWatcher? let stream = js @@ -321,10 +320,10 @@ impl ServiceWatcher { allow_rollup: false, num_replicas: self.stream_replicas as usize, mirror: Some(Source { - name: "wadm_events".to_string(), + name: WADM_EVENT_STREAM_NAME.to_string(), subject_transforms: vec![SubjectTransform { - source: source_subject, - destination: format!("wasmcloud_operator_events.{}", lattice_id.clone()), + source: wadm::DEFAULT_WADM_EVENTS_TOPIC.to_string(), + destination: OPERATOR_STREAM_SUBJECT.replacen('*', "{{wildcard(1)}}", 1), }], ..Default::default() }), @@ -343,7 +342,7 @@ impl ServiceWatcher { ack_wait: std::time::Duration::from_secs(2), max_deliver: 3, deliver_policy: async_nats::jetstream::consumer::DeliverPolicy::All, - filter_subject: destination_subject.clone(), + filter_subject: OPERATOR_STREAM_SUBJECT.replacen('*', &lattice_id, 1), ..Default::default() }, ) @@ -623,7 +622,7 @@ fn http_server_component(manifest: &Manifest) -> Option { for p in props.source_config.iter() { if let Some(config_props) = &p.properties { if let Some(addr) = config_props.get("address") { - details.address = addr.clone(); + details.address.clone_from(addr); should_create_service = true; }; }