diff --git a/Cargo.toml b/Cargo.toml index 88189b0..fd1f065 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,8 +42,12 @@ embedded-nal-async = "0.7" std-embedded-nal-async = "0.1" embassy-time = { version = "0.3", features = ["std", "generic-queue"] } embassy-sync = "0.5" +embassy-futures = "0.1" +embedded-svc = { version = "0.26", features = ["std"] } futures-lite = "1" rand = "0.8" +tokio = "1" # For the `mqtt_client` example +async-compat = "0.2" # For the `mqtt_client` example [[example]] name = "captive_portal" @@ -81,6 +85,10 @@ required-features = ["std"] name = "std_nal" required-features = ["std"] +[[example]] +name = "mqtt_client" +required-features = ["std", "embedded-svc"] + [workspace] members = [ ".", diff --git a/edge-mqtt/README.md b/edge-mqtt/README.md new file mode 100644 index 0000000..da82ef0 --- /dev/null +++ b/edge-mqtt/README.md @@ -0,0 +1,129 @@ +# edge-mqtt + +[![CI](https://github.com/ivmarkov/edge-net/actions/workflows/ci.yml/badge.svg)](https://github.com/ivmarkov/edge-net/actions/workflows/ci.yml) +![crates.io](https://img.shields.io/crates/v/edge-net.svg) +[![Documentation](https://docs.rs/edge-net/badge.svg)](https://docs.rs/edge-net) + +A wrapper for the [`rumqttc`]() crate that adapts it to async [MQTT traits]() of the `embedded-svc` crate. + +**NOTE**: Needs STD! + +The plan for the future is to retire this crate in favor of []() once the latter gets MQTT 3.1 compatibility, and implements a more ergonomic API where sending can be done independently from receiving MQTT messages. + +... or implement a true `no_std` no-alloc alternative - just like all other `edge-*` crates - if "" does not see further development. + +## Example + +```rust +use core::fmt::Debug; + +use async_compat::CompatExt; + +use embedded_svc::mqtt::client::asynch::{Client, Connection, Publish, QoS}; +use embedded_svc::mqtt::client::Event; + +use embassy_futures::select::{select, Either}; +use embassy_time::{Duration, Timer}; + +use edge_mqtt::io::{AsyncClient, MqttClient, MqttConnection, MqttOptions}; + +use log::*; + +const MQTT_HOST: &str = "broker.emqx.io"; +const MQTT_PORT: u16 = 1883; +const MQTT_CLIENT_ID: &str = "edge-mqtt-demo"; +const MQTT_TOPIC: &str = "edge-mqtt-demo"; + +fn main() { + env_logger::init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let (client, conn) = mqtt_create(MQTT_CLIENT_ID, MQTT_HOST, MQTT_PORT).unwrap(); + + futures_lite::future::block_on( + run(client, conn, MQTT_TOPIC).compat(), /* necessary for tokio */ + ) + .unwrap() +} + +async fn run(mut client: M, mut connection: C, topic: &str) -> Result<(), M::Error> +where + M: Client + Publish + 'static, + C: Connection + 'static, + M::Error: Debug + 'static, +{ + info!("About to start the MQTT client"); + + info!("MQTT client started"); + + client.subscribe(topic, QoS::AtMostOnce).await?; + + info!("Subscribed to topic \"{topic}\""); + + let res = select( + async move { + info!("MQTT Listening for messages"); + + loop { + let msg = connection.next().await; + + match msg { + Err(err) => info!("[Queue] Error: {:?}", err), + Ok(event) => { + if let Some(event) = event { + info!("[Queue] Event: {}", event.payload()); + } else { + break; + } + } + } + } + + Ok(()) + }, + async move { + // Just to give a chance of our connection to get even the first published message + Timer::after(Duration::from_millis(500)).await; + + let payload = "Hello from edge-mqtt-demo!"; + + loop { + client + .publish(topic, QoS::AtMostOnce, false, payload.as_bytes()) + .await?; + + info!("Published \"{payload}\" to topic \"{topic}\""); + + let sleep_secs = 2; + + info!("Now sleeping for {sleep_secs}s..."); + Timer::after(Duration::from_secs(sleep_secs)).await; + } + }, + ) + .await; + + match res { + Either::First(res) => res, + Either::Second(res) => res, + } +} + +fn mqtt_create( + client_id: &str, + host: &str, + port: u16, +) -> Result<(MqttClient, MqttConnection), anyhow::Error> { + let mut mqtt_options = MqttOptions::new(client_id, host, port); + + mqtt_options.set_keep_alive(core::time::Duration::from_secs(10)); + + let (rumqttc_client, rumqttc_eventloop) = AsyncClient::new(mqtt_options, 10); + + let mqtt_client = MqttClient::new(rumqttc_client); + let mqtt_conn = MqttConnection::new(rumqttc_eventloop); + + Ok((mqtt_client, mqtt_conn)) +} +``` diff --git a/edge-mqtt/src/io.rs b/edge-mqtt/src/io.rs index 882d294..30f0812 100644 --- a/edge-mqtt/src/io.rs +++ b/edge-mqtt/src/io.rs @@ -6,12 +6,10 @@ pub use embedded_svc_compat::*; #[cfg(feature = "embedded-svc")] mod embedded_svc_compat { use core::fmt::{Debug, Display}; - use core::marker::PhantomData; use embedded_svc::mqtt::client::asynch::{ - Client, Connection, Details, ErrorType, Event, Message, MessageId, Publish, QoS, + Client, Connection, Details, ErrorType, Event, EventPayload, MessageId, Publish, QoS, }; - use embedded_svc::mqtt::client::MessageImpl; use log::trace; @@ -32,7 +30,6 @@ mod embedded_svc_compat { } } - #[cfg(feature = "std")] impl std::error::Error for MqttError {} impl From for MqttError { @@ -87,85 +84,72 @@ mod embedded_svc_compat { } } - pub struct MessageRef<'a>(&'a rumqttc::Publish); - - impl<'a> MessageRef<'a> { - pub fn into_message_impl(&self) -> Option { - Some(MessageImpl::new(self)) + pub struct MqttEvent(rumqttc::Event); + + impl MqttEvent { + fn payload(&self) -> Option> { + match &self.0 { + rumqttc::Event::Incoming(incoming) => match incoming { + rumqttc::Packet::Connect(_) => Some(EventPayload::BeforeConnect), + rumqttc::Packet::ConnAck(_) => Some(EventPayload::Connected(true)), + rumqttc::Packet::Disconnect => Some(EventPayload::Disconnected), + rumqttc::Packet::PubAck(PubAck { pkid, .. }) => { + Some(EventPayload::Published(*pkid as _)) + } + rumqttc::Packet::SubAck(SubAck { pkid, .. }) => { + Some(EventPayload::Subscribed(*pkid as _)) + } + rumqttc::Packet::UnsubAck(UnsubAck { pkid, .. }) => { + Some(EventPayload::Unsubscribed(*pkid as _)) + } + rumqttc::Packet::Publish(rumqttc::Publish { + pkid, + topic, + payload, + .. + }) => Some(EventPayload::Received { + id: *pkid as _, + topic: Some(topic.as_str()), + data: payload, + details: Details::Complete, + }), + _ => None, + }, + rumqttc::Event::Outgoing(_) => None, + } } } - impl<'a> Message for MessageRef<'a> { - fn id(&self) -> MessageId { - self.0.pkid as _ - } - - fn topic(&self) -> Option<&'_ str> { - Some(&self.0.topic) - } - - fn data(&self) -> &'_ [u8] { - &self.0.payload - } - - fn details(&self) -> &Details { - &Details::Complete + impl Event for MqttEvent { + fn payload(&self) -> EventPayload<'_> { + MqttEvent::payload(self).unwrap() } } - pub struct MqttConnection(EventLoop, F, PhantomData M>); + pub struct MqttConnection(EventLoop); - impl MqttConnection { - pub const fn new(event_loop: EventLoop, message_converter: F) -> Self { - Self(event_loop, message_converter, PhantomData) + impl MqttConnection { + pub const fn new(event_loop: EventLoop) -> Self { + Self(event_loop) } } - impl ErrorType for MqttConnection { + impl ErrorType for MqttConnection { type Error = MqttError; } - impl Connection for MqttConnection - where - F: FnMut(&MessageRef) -> Option + Send, - M: Send, - { - type Message<'a> = M where Self: 'a; + impl Connection for MqttConnection { + type Event<'a> = MqttEvent where Self: 'a; - async fn next(&mut self) -> Option>, Self::Error>> { + async fn next(&mut self) -> Result>, Self::Error> { loop { - let event = self.0.poll().await; + let event = self.0.poll().await?; trace!("Got event: {:?}", event); - match event { - Ok(event) => { - let event = match event { - rumqttc::Event::Incoming(incoming) => match incoming { - rumqttc::Packet::Connect(_) => Some(Event::BeforeConnect), - rumqttc::Packet::ConnAck(_) => Some(Event::Connected(true)), - rumqttc::Packet::Disconnect => Some(Event::Disconnected), - rumqttc::Packet::PubAck(PubAck { pkid, .. }) => { - Some(Event::Published(pkid as _)) - } - rumqttc::Packet::SubAck(SubAck { pkid, .. }) => { - Some(Event::Subscribed(pkid as _)) - } - rumqttc::Packet::UnsubAck(UnsubAck { pkid, .. }) => { - Some(Event::Unsubscribed(pkid as _)) - } - rumqttc::Packet::Publish(publish) => { - (self.1)(&MessageRef(&publish)).map(Event::Received) - } - _ => None, - }, - rumqttc::Event::Outgoing(_) => None, - }; - - if let Some(event) = event { - return Some(Ok(event)); - } - } - Err(err) => return Some(Err(MqttError::ConnectionError(err))), + let event = MqttEvent(event); + + if event.payload().is_some() { + break Ok(Some(event)); } } } diff --git a/examples/mqtt_client.rs b/examples/mqtt_client.rs new file mode 100644 index 0000000..dcba5c8 --- /dev/null +++ b/examples/mqtt_client.rs @@ -0,0 +1,111 @@ +use core::fmt::Debug; + +use async_compat::CompatExt; + +use embedded_svc::mqtt::client::asynch::{Client, Connection, Publish, QoS}; +use embedded_svc::mqtt::client::Event; + +use embassy_futures::select::{select, Either}; +use embassy_time::{Duration, Timer}; + +use edge_mqtt::io::{AsyncClient, MqttClient, MqttConnection, MqttOptions}; + +use log::*; + +const MQTT_HOST: &str = "broker.emqx.io"; +const MQTT_PORT: u16 = 1883; +const MQTT_CLIENT_ID: &str = "edge-mqtt-demo"; +const MQTT_TOPIC: &str = "edge-mqtt-demo"; + +fn main() { + env_logger::init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let (client, conn) = mqtt_create(MQTT_CLIENT_ID, MQTT_HOST, MQTT_PORT).unwrap(); + + futures_lite::future::block_on( + run(client, conn, MQTT_TOPIC).compat(), /* necessary for tokio */ + ) + .unwrap() +} + +async fn run(mut client: M, mut connection: C, topic: &str) -> Result<(), M::Error> +where + M: Client + Publish + 'static, + C: Connection + 'static, + M::Error: Debug + 'static, +{ + info!("About to start the MQTT client"); + + info!("MQTT client started"); + + client.subscribe(topic, QoS::AtMostOnce).await?; + + info!("Subscribed to topic \"{topic}\""); + + let res = select( + async move { + info!("MQTT Listening for messages"); + + loop { + let msg = connection.next().await; + + match msg { + Err(err) => info!("[Queue] Error: {:?}", err), + Ok(event) => { + if let Some(event) = event { + info!("[Queue] Event: {}", event.payload()); + } else { + break; + } + } + } + } + + Ok(()) + }, + async move { + // Just to give a chance of our connection to get even the first published message + Timer::after(Duration::from_millis(500)).await; + + let payload = "Hello from edge-mqtt-demo!"; + + loop { + client + .publish(topic, QoS::AtMostOnce, false, payload.as_bytes()) + .await?; + + info!("Published \"{payload}\" to topic \"{topic}\""); + + let sleep_secs = 2; + + info!("Now sleeping for {sleep_secs}s..."); + Timer::after(Duration::from_secs(sleep_secs)).await; + } + }, + ) + .await; + + match res { + Either::First(res) => res, + Either::Second(res) => res, + } +} + +fn mqtt_create( + client_id: &str, + host: &str, + port: u16, +) -> Result<(MqttClient, MqttConnection), anyhow::Error> { + let mut mqtt_options = MqttOptions::new(client_id, host, port); + + mqtt_options.set_keep_alive(core::time::Duration::from_secs(10)); + + let (rumqttc_client, rumqttc_eventloop) = AsyncClient::new(mqtt_options, 10); + + let mqtt_client = MqttClient::new(rumqttc_client); + let mqtt_conn = MqttConnection::new(rumqttc_eventloop); + + Ok((mqtt_client, mqtt_conn)) +}