Skip to content

Commit

Permalink
feat: fix clippy error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
vitorfdl committed Jun 17, 2024
1 parent 1cfe94e commit 092ea39
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 29 deletions.
7 changes: 4 additions & 3 deletions src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,11 @@ impl IntoResponse for JsonError {
}
}

type TaskMap = HashMap<String, (tokio::task::JoinHandle<()>, mpsc::Sender<PublishMessage>)>;
type SharedTaskMap = Arc<RwLock<TaskMap>>;

async fn handle_publish(
Extension(tasks): Extension<
Arc<RwLock<HashMap<String, (tokio::task::JoinHandle<()>, mpsc::Sender<PublishMessage>)>>>,
>,
Extension(tasks): Extension<SharedTaskMap>,
payload: Result<Json<PublishRequest>, JsonRejection>,
) -> Result<impl IntoResponse, JsonError> {
let payload = match payload {
Expand Down
20 changes: 7 additions & 13 deletions src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ pub struct ConfigFile {
pub authorization_token: String,
pub tagoio_url: Option<String>, // Default is "https://api.tago.io"
pub downlink_port: Option<String>, // Default is "3000"
pub mqtt: MQTT,
pub mqtt: Mqtt,
}
#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone)]
pub struct MQTT {
pub struct Mqtt {
pub client_id: Option<String>, // Default is "tagoio-relay"
pub tls_enabled: bool,
pub address: String,
Expand Down Expand Up @@ -63,7 +63,7 @@ impl ConfigFile {
}
}

impl MQTT {
impl Mqtt {
pub fn with_defaults(mut self) -> anyhow::Result<Self> {
if self.client_id.is_none() {
self.client_id = Some("tagoio-relay".to_string());
Expand All @@ -89,12 +89,6 @@ pub enum InitiatedState {
Running,
}

impl Default for InitiatedState {
fn default() -> Self {
InitiatedState::Stopped
}
}

#[derive(serde::Deserialize)]
pub struct PublishRequest {
pub topic: String,
Expand All @@ -115,7 +109,7 @@ mod tests {
authorization_token: "authorization_token".to_string(),
tagoio_url: None,
downlink_port: None,
mqtt: MQTT {
mqtt: Mqtt {
client_id: None,
tls_enabled: false,
address: "localhost".to_string(),
Expand Down Expand Up @@ -150,7 +144,7 @@ mod tests {
authorization_token: "authorization_token".to_string(),
tagoio_url: None,
downlink_port: None,
mqtt: MQTT {
mqtt: Mqtt {
client_id: None,
tls_enabled: false,
address: "localhost".to_string(),
Expand All @@ -173,7 +167,7 @@ mod tests {

#[test]
fn test_mqtt_with_defaults() {
let mqtt = MQTT {
let mqtt = Mqtt {
client_id: None,
tls_enabled: false,
address: "localhost".to_string(),
Expand All @@ -194,7 +188,7 @@ mod tests {
#[test]
#[should_panic(expected = "Invalid MQTT address: invalid_address")]
fn test_invalid_mqtt_address() {
let mqtt = MQTT {
let mqtt = Mqtt {
client_id: None,
tls_enabled: false,
address: "invalid_address".to_string(),
Expand Down
16 changes: 5 additions & 11 deletions src/services/mqttrelay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn initialize_mqtt_options(relay_cfg: &RelayConfig) -> MqttOptions {
let crt_file = &relay_cfg.config.mqtt.broker_tls_cert;
let key_file = &relay_cfg.config.mqtt.broker_tls_key;

let mut mqttoptions = MqttOptions::new(&client_id, &relay_cfg.config.mqtt.address, relay_cfg.config.mqtt.port);
let mut mqttoptions = MqttOptions::new(client_id, &relay_cfg.config.mqtt.address, relay_cfg.config.mqtt.port);
mqttoptions.set_keep_alive(Duration::from_secs(30));
mqttoptions.set_max_packet_size(1024 * 1024, 1024 * 1024); // 1mb in/out

Expand Down Expand Up @@ -162,17 +162,11 @@ async fn publish_messages(

async fn process_incoming_messages(eventloop: &mut rumqttc::EventLoop, relay_cfg: &RelayConfig) {
while let Ok(notification) = eventloop.poll().await {
match notification {
rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) => {
log::info!(target: "mqtt", "[Broker] Received message on topic {}", publish.topic);
if let Err(e) = crate::services::tagoio::forward_buffer_messages(&relay_cfg, &publish).await {
log::error!(target: "mqtt", "Failed to forward message to TagoIO: {:?}", e.to_string());
}
if let rumqttc::Event::Incoming(rumqttc::Packet::Publish(publish)) = notification {
log::info!(target: "mqtt", "[Broker] Received message on topic {}", publish.topic);
if let Err(e) = crate::services::tagoio::forward_buffer_messages(relay_cfg, &publish).await {
log::error!(target: "mqtt", "Failed to forward message to TagoIO: {:?}", e.to_string());
}
// rumqttc::Event::Incoming(rumqttc::Packet::SubAck(suback)) => {
// println!("Subscription acknowledged: {:?}", suback);
// }
_ => {}
}
}
}
4 changes: 2 additions & 2 deletions src/services/tagoio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ pub async fn verify_network_token(relay_cfg: &RelayConfig) -> Result<(), CustomE
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{ConfigFile, MQTT};
use crate::schema::{ConfigFile, Mqtt};
use mockito::Matcher;
use rumqttc::{Publish, QoS};
use tokio;
Expand All @@ -184,7 +184,7 @@ mod tests {
authorization_token: "test_authorization_token".to_string(),
tagoio_url: Some(server.url()),
downlink_port: Some("3000".to_string()),
mqtt: MQTT {
mqtt: Mqtt {
client_id: Some("test_client_id".to_string()),
tls_enabled: false,
address: "localhost".to_string(),
Expand Down

0 comments on commit 092ea39

Please sign in to comment.