Skip to content

Commit

Permalink
Latest embedded-svc; MQTT demo
Browse files Browse the repository at this point in the history
  • Loading branch information
ivmarkov committed Jan 22, 2024
1 parent b8043d6 commit 4c4f317
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 66 deletions.
8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ embedded-nal-async = "0.7"
std-embedded-nal-async = "0.1"
embassy-time = { version = "0.3", features = ["std", "generic-queue"] }
embassy-sync = "0.5"
embassy-futures = "0.1"
embedded-svc = { version = "0.26", features = ["std"] }
futures-lite = "1"
rand = "0.8"
tokio = "1" # For the `mqtt_client` example
async-compat = "0.2" # For the `mqtt_client` example

[[example]]
name = "captive_portal"
Expand Down Expand Up @@ -81,6 +85,10 @@ required-features = ["std"]
name = "std_nal"
required-features = ["std"]

[[example]]
name = "mqtt_client"
required-features = ["std", "embedded-svc"]

[workspace]
members = [
".",
Expand Down
129 changes: 129 additions & 0 deletions edge-mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# edge-mqtt

[![CI](https://github.com/ivmarkov/edge-net/actions/workflows/ci.yml/badge.svg)](https://github.com/ivmarkov/edge-net/actions/workflows/ci.yml)
![crates.io](https://img.shields.io/crates/v/edge-net.svg)
[![Documentation](https://docs.rs/edge-net/badge.svg)](https://docs.rs/edge-net)

A wrapper for the [`rumqttc`]() crate that adapts it to async [MQTT traits]() of the `embedded-svc` crate.

**NOTE**: Needs STD!

The plan for the future is to retire this crate in favor of []() once the latter gets MQTT 3.1 compatibility, and implements a more ergonomic API where sending can be done independently from receiving MQTT messages.

... or implement a true `no_std` no-alloc alternative - just like all other `edge-*` crates - if "" does not see further development.

## Example

```rust
use core::fmt::Debug;

use async_compat::CompatExt;

use embedded_svc::mqtt::client::asynch::{Client, Connection, Publish, QoS};
use embedded_svc::mqtt::client::Event;

use embassy_futures::select::{select, Either};
use embassy_time::{Duration, Timer};

use edge_mqtt::io::{AsyncClient, MqttClient, MqttConnection, MqttOptions};

use log::*;

const MQTT_HOST: &str = "broker.emqx.io";
const MQTT_PORT: u16 = 1883;
const MQTT_CLIENT_ID: &str = "edge-mqtt-demo";
const MQTT_TOPIC: &str = "edge-mqtt-demo";

fn main() {
env_logger::init_from_env(
env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"),
);

let (client, conn) = mqtt_create(MQTT_CLIENT_ID, MQTT_HOST, MQTT_PORT).unwrap();

futures_lite::future::block_on(
run(client, conn, MQTT_TOPIC).compat(), /* necessary for tokio */
)
.unwrap()
}

async fn run<M, C>(mut client: M, mut connection: C, topic: &str) -> Result<(), M::Error>
where
M: Client + Publish + 'static,
C: Connection<Error = M::Error> + 'static,
M::Error: Debug + 'static,
{
info!("About to start the MQTT client");

info!("MQTT client started");

client.subscribe(topic, QoS::AtMostOnce).await?;

info!("Subscribed to topic \"{topic}\"");

let res = select(
async move {
info!("MQTT Listening for messages");

loop {
let msg = connection.next().await;

match msg {
Err(err) => info!("[Queue] Error: {:?}", err),
Ok(event) => {
if let Some(event) = event {
info!("[Queue] Event: {}", event.payload());
} else {
break;
}
}
}
}

Ok(())
},
async move {
// Just to give a chance of our connection to get even the first published message
Timer::after(Duration::from_millis(500)).await;

let payload = "Hello from edge-mqtt-demo!";

loop {
client
.publish(topic, QoS::AtMostOnce, false, payload.as_bytes())
.await?;

info!("Published \"{payload}\" to topic \"{topic}\"");

let sleep_secs = 2;

info!("Now sleeping for {sleep_secs}s...");
Timer::after(Duration::from_secs(sleep_secs)).await;
}
},
)
.await;

match res {
Either::First(res) => res,
Either::Second(res) => res,
}
}

fn mqtt_create(
client_id: &str,
host: &str,
port: u16,
) -> Result<(MqttClient, MqttConnection), anyhow::Error> {
let mut mqtt_options = MqttOptions::new(client_id, host, port);

mqtt_options.set_keep_alive(core::time::Duration::from_secs(10));

let (rumqttc_client, rumqttc_eventloop) = AsyncClient::new(mqtt_options, 10);

let mqtt_client = MqttClient::new(rumqttc_client);
let mqtt_conn = MqttConnection::new(rumqttc_eventloop);

Ok((mqtt_client, mqtt_conn))
}
```
116 changes: 50 additions & 66 deletions edge-mqtt/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ pub use embedded_svc_compat::*;
#[cfg(feature = "embedded-svc")]
mod embedded_svc_compat {
use core::fmt::{Debug, Display};
use core::marker::PhantomData;

use embedded_svc::mqtt::client::asynch::{
Client, Connection, Details, ErrorType, Event, Message, MessageId, Publish, QoS,
Client, Connection, Details, ErrorType, Event, EventPayload, MessageId, Publish, QoS,
};
use embedded_svc::mqtt::client::MessageImpl;

use log::trace;

Expand All @@ -32,7 +30,6 @@ mod embedded_svc_compat {
}
}

#[cfg(feature = "std")]
impl std::error::Error for MqttError {}

impl From<ClientError> for MqttError {
Expand Down Expand Up @@ -87,85 +84,72 @@ mod embedded_svc_compat {
}
}

pub struct MessageRef<'a>(&'a rumqttc::Publish);

impl<'a> MessageRef<'a> {
pub fn into_message_impl(&self) -> Option<MessageImpl> {
Some(MessageImpl::new(self))
pub struct MqttEvent(rumqttc::Event);

impl MqttEvent {
fn payload(&self) -> Option<EventPayload<'_>> {
match &self.0 {
rumqttc::Event::Incoming(incoming) => match incoming {
rumqttc::Packet::Connect(_) => Some(EventPayload::BeforeConnect),
rumqttc::Packet::ConnAck(_) => Some(EventPayload::Connected(true)),
rumqttc::Packet::Disconnect => Some(EventPayload::Disconnected),
rumqttc::Packet::PubAck(PubAck { pkid, .. }) => {
Some(EventPayload::Published(*pkid as _))
}
rumqttc::Packet::SubAck(SubAck { pkid, .. }) => {
Some(EventPayload::Subscribed(*pkid as _))
}
rumqttc::Packet::UnsubAck(UnsubAck { pkid, .. }) => {
Some(EventPayload::Unsubscribed(*pkid as _))
}
rumqttc::Packet::Publish(rumqttc::Publish {
pkid,
topic,
payload,
..
}) => Some(EventPayload::Received {
id: *pkid as _,
topic: Some(topic.as_str()),
data: payload,
details: Details::Complete,
}),
_ => None,
},
rumqttc::Event::Outgoing(_) => None,
}
}
}

impl<'a> Message for MessageRef<'a> {
fn id(&self) -> MessageId {
self.0.pkid as _
}

fn topic(&self) -> Option<&'_ str> {
Some(&self.0.topic)
}

fn data(&self) -> &'_ [u8] {
&self.0.payload
}

fn details(&self) -> &Details {
&Details::Complete
impl Event for MqttEvent {
fn payload(&self) -> EventPayload<'_> {
MqttEvent::payload(self).unwrap()
}
}

pub struct MqttConnection<F, M>(EventLoop, F, PhantomData<fn() -> M>);
pub struct MqttConnection(EventLoop);

impl<F, M> MqttConnection<F, M> {
pub const fn new(event_loop: EventLoop, message_converter: F) -> Self {
Self(event_loop, message_converter, PhantomData)
impl MqttConnection {
pub const fn new(event_loop: EventLoop) -> Self {
Self(event_loop)
}
}

impl<F, M> ErrorType for MqttConnection<F, M> {
impl ErrorType for MqttConnection {
type Error = MqttError;
}

impl<F, M> Connection for MqttConnection<F, M>
where
F: FnMut(&MessageRef) -> Option<M> + Send,
M: Send,
{
type Message<'a> = M where Self: 'a;
impl Connection for MqttConnection {
type Event<'a> = MqttEvent where Self: 'a;

async fn next(&mut self) -> Option<Result<Event<Self::Message<'_>>, Self::Error>> {
async fn next(&mut self) -> Result<Option<Self::Event<'_>>, Self::Error> {
loop {
let event = self.0.poll().await;
let event = self.0.poll().await?;
trace!("Got event: {:?}", event);

match event {
Ok(event) => {
let event = match event {
rumqttc::Event::Incoming(incoming) => match incoming {
rumqttc::Packet::Connect(_) => Some(Event::BeforeConnect),
rumqttc::Packet::ConnAck(_) => Some(Event::Connected(true)),
rumqttc::Packet::Disconnect => Some(Event::Disconnected),
rumqttc::Packet::PubAck(PubAck { pkid, .. }) => {
Some(Event::Published(pkid as _))
}
rumqttc::Packet::SubAck(SubAck { pkid, .. }) => {
Some(Event::Subscribed(pkid as _))
}
rumqttc::Packet::UnsubAck(UnsubAck { pkid, .. }) => {
Some(Event::Unsubscribed(pkid as _))
}
rumqttc::Packet::Publish(publish) => {
(self.1)(&MessageRef(&publish)).map(Event::Received)
}
_ => None,
},
rumqttc::Event::Outgoing(_) => None,
};

if let Some(event) = event {
return Some(Ok(event));
}
}
Err(err) => return Some(Err(MqttError::ConnectionError(err))),
let event = MqttEvent(event);

if event.payload().is_some() {
break Ok(Some(event));
}
}
}
Expand Down
Loading

0 comments on commit 4c4f317

Please sign in to comment.