diff --git a/src/relay.rs b/src/relay.rs index a490775..4f0e6fe 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -174,10 +174,11 @@ impl IntoResponse for JsonError { } } +type TaskMap = HashMap, mpsc::Sender)>; +type SharedTaskMap = Arc>; + async fn handle_publish( - Extension(tasks): Extension< - Arc, mpsc::Sender)>>>, - >, + Extension(tasks): Extension, payload: Result, JsonRejection>, ) -> Result { let payload = match payload { diff --git a/src/schema/mod.rs b/src/schema/mod.rs index 3f06b74..38ad2d3 100644 --- a/src/schema/mod.rs +++ b/src/schema/mod.rs @@ -14,10 +14,10 @@ pub struct ConfigFile { pub authorization_token: String, pub tagoio_url: Option, // Default is "https://api.tago.io" pub downlink_port: Option, // Default is "3000" - pub mqtt: MQTT, + pub mqtt: Mqtt, } #[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)] -pub struct MQTT { +pub struct Mqtt { pub client_id: Option, // Default is "tagoio-relay" pub tls_enabled: bool, pub address: String, @@ -63,7 +63,7 @@ impl ConfigFile { } } -impl MQTT { +impl Mqtt { pub fn with_defaults(mut self) -> anyhow::Result { if self.client_id.is_none() { self.client_id = Some("tagoio-relay".to_string()); @@ -89,12 +89,6 @@ pub enum InitiatedState { Running, } -impl Default for InitiatedState { - fn default() -> Self { - InitiatedState::Stopped - } -} - #[derive(serde::Deserialize)] pub struct PublishRequest { pub topic: String, @@ -115,7 +109,7 @@ mod tests { authorization_token: "authorization_token".to_string(), tagoio_url: None, downlink_port: None, - mqtt: MQTT { + mqtt: Mqtt { client_id: None, tls_enabled: false, address: "localhost".to_string(), @@ -150,7 +144,7 @@ mod tests { authorization_token: "authorization_token".to_string(), tagoio_url: None, downlink_port: None, - mqtt: MQTT { + mqtt: Mqtt { client_id: None, tls_enabled: false, address: "localhost".to_string(), @@ -173,7 +167,7 @@ mod tests { #[test] fn test_mqtt_with_defaults() { - let mqtt = MQTT { + let mqtt = Mqtt { client_id: None, tls_enabled: false, address: "localhost".to_string(), @@ -194,7 +188,7 @@ mod tests { #[test] #[should_panic(expected = "Invalid MQTT address: invalid_address")] fn test_invalid_mqtt_address() { - let mqtt = MQTT { + let mqtt = Mqtt { client_id: None, tls_enabled: false, address: "invalid_address".to_string(), diff --git a/src/services/mqttrelay.rs b/src/services/mqttrelay.rs index 3fd9726..ec0cd06 100644 --- a/src/services/mqttrelay.rs +++ b/src/services/mqttrelay.rs @@ -74,7 +74,7 @@ fn initialize_mqtt_options(relay_cfg: &RelayConfig) -> MqttOptions { let crt_file = &relay_cfg.config.mqtt.broker_tls_cert; let key_file = &relay_cfg.config.mqtt.broker_tls_key; - let mut mqttoptions = MqttOptions::new(&client_id, &relay_cfg.config.mqtt.address, relay_cfg.config.mqtt.port); + let mut mqttoptions = MqttOptions::new(client_id, &relay_cfg.config.mqtt.address, relay_cfg.config.mqtt.port); mqttoptions.set_keep_alive(Duration::from_secs(30)); mqttoptions.set_max_packet_size(1024 * 1024, 1024 * 1024); // 1mb in/out @@ -162,17 +162,11 @@ async fn publish_messages( async fn process_incoming_messages(eventloop: &mut rumqttc::EventLoop, relay_cfg: &RelayConfig) { while let Ok(notification) = eventloop.poll().await { - match notification { - rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) => { - log::info!(target: "mqtt", "[Broker] Received message on topic {}", publish.topic); - if let Err(e) = crate::services::tagoio::forward_buffer_messages(&relay_cfg, &publish).await { - log::error!(target: "mqtt", "Failed to forward message to TagoIO: {:?}", e.to_string()); - } + if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) = notification { + log::info!(target: "mqtt", "[Broker] Received message on topic {}", publish.topic); + if let Err(e) = crate::services::tagoio::forward_buffer_messages(relay_cfg, &publish).await { + log::error!(target: "mqtt", "Failed to forward message to TagoIO: {:?}", e.to_string()); } - // rumqttc::Event::Incoming(rumqttc::Packet::SubAck(suback)) => { - // println!("Subscription acknowledged: {:?}", suback); - // } - _ => {} } } } diff --git a/src/services/tagoio.rs b/src/services/tagoio.rs index b176c1c..e5bb775 100644 --- a/src/services/tagoio.rs +++ b/src/services/tagoio.rs @@ -171,7 +171,7 @@ pub async fn verify_network_token(relay_cfg: &RelayConfig) -> Result<(), CustomE #[cfg(test)] mod tests { use super::*; - use crate::schema::{ConfigFile, MQTT}; + use crate::schema::{ConfigFile, Mqtt}; use mockito::Matcher; use rumqttc::{Publish, QoS}; use tokio; @@ -184,7 +184,7 @@ mod tests { authorization_token: "test_authorization_token".to_string(), tagoio_url: Some(server.url()), downlink_port: Some("3000".to_string()), - mqtt: MQTT { + mqtt: Mqtt { client_id: Some("test_client_id".to_string()), tls_enabled: false, address: "localhost".to_string(),