Skip to content

Commit

Permalink
Mqtli 10 support websockts, including tls (#28)
Browse files Browse the repository at this point in the history
* feat: enable websocket feature for rumqttc

* feat(mqtt): add support for websocket (also via TLS)

The version of rumqttc used to enable websocket support is not officially released yet. There was a bug with creating the websocket transport layer which has now been fixed but not yet released. Whenever the bugfix is contained in the official releases, the version will be adjusted accordingly.

* docs(readme): update current config and websocket support
  • Loading branch information
kaans authored Jan 10, 2024
1 parent 864c925 commit af66192
Show file tree
Hide file tree
Showing 10 changed files with 455 additions and 117 deletions.
412 changes: 345 additions & 67 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ clap = { version = "4.4.11", features = ["derive", "env"] }
derive-getters = "0.3.0"
log = "0.4.20"
protofish = "0.5.2"
rumqttc = "0.23.0"
rumqttc = { git = "https://github.com/bytebeamio/rumqtt.git", rev = "431be1b", features = ["websocket"] }
serde = { version = "1.0.193", features = ["derive"] }
serde_yaml = "0.9.27"
simplelog = "0.12.1"
Expand Down
28 changes: 17 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The supported data formats and the conversion rules are listed under [supported
* Configuration via cli arguments and config file (yaml)
* MQTT v5 and v3.1.1
* TLS support (v1.2 and v1.3)
* Websocket support (unencrypted and TLS)
* Client authentication via username/password
* Client authentication via TLS certificates
* Last will
Expand All @@ -54,27 +55,29 @@ can only be specified in the config file because it would be too complex to spec

The following lists all possible command line arguments and environment variables (also available via `mqtli --help`):

```shell
```
Usage: mqtli.exe [OPTIONS]
Options:
--help Print help
--version Print version
-c, --config-file <CONFIG_FILE> Path to the config file (default: config.yaml) [env: CONFIG_FILE_PATH=]
-h, --help Print help
-V, --version Print version
Broker:
-o, --host <HOST> The ip address or hostname of the broker (default: localhost) [env: BROKER_HOST=]
-p, --port <PORT> The port the broker is listening on (default: 1883) [env: BROKER_PORT=]
-i, --client-id <CLIENT_ID> The client id for this mqtli instance (default: mqtli) [env: BROKER_CLIENT_ID=]
--keep-alive <KEEP_ALIVE> Keep alive time (default: 5 seconds) [env: BROKER_KEEP_ALIVE=]
-u, --username <USERNAME> (optional) Username used to authenticate against the broker; if used then username must be given too (default: empty) [env: BROKER_USERNAME=]
-w, --password <PASSWORD> (optional) Password used to authenticate against the broker; if used then password must be given too (default: empty) [env: BROKER_PASSWORD=]
-h, --host <HOST> The ip address or hostname of the broker (default: localhost) [env: BROKER_HOST=]
-p, --port <PORT> The port the broker is listening on (default: 1883) [env: BROKER_PORT=]
--protocol <PROTOCOL> The protocol to use to communicate with the broker (default: tcp) [env: BROKER_PROTOCOL=] [possible values: tcp, websocket]
-i, --client-id <CLIENT_ID> The client id for this mqtli instance (default: mqtli) [env: BROKER_CLIENT_ID=]
-v, --mqtt-version <MQTT_VERSION> The MQTT version to use (default: v5) [env: BROKER_MQTT_VERSION=] [possible values: v311, v5]
--keep-alive <KEEP_ALIVE> Keep alive time in seconds (default: 5 seconds) [env: BROKER_KEEP_ALIVE=]
-u, --username <USERNAME> (optional) Username used to authenticate against the broker; if used then username must be given too (default: empty) [env: BROKER_USERNAME=]
-w, --password <PASSWORD> (optional) Password used to authenticate against the broker; if used then password must be given too (default: empty) [env: BROKER_PASSWORD=]
TLS:
--use-tls <USE_TLS>
If specified, TLS is used to communicate with the broker (default: false) [env: BROKER_USE_TLS=] [possible values: true, false]
--ca-file <TLS_CA_FILE>
Path to a PEM encoded ca certificate to verify the broker`s certificate (default: empty) [env: BROKER_TLS_CA_FILE=]
Path to a PEM encoded ca certificate to verify the broker's certificate (default: empty) [env: BROKER_TLS_CA_FILE=]
--client-cert <TLS_CLIENT_CERTIFICATE>
(optional) Path to a PEM encoded client certificate for authenticating against the broker; must be specified with client-key (default: empty) [env: BROKER_TLS_CLIENT_CERTIFICATE_FILE=]
--client-key <TLS_CLIENT_KEY>
Expand Down Expand Up @@ -567,8 +570,11 @@ topics:

## Future plans

* Support websockets
* Single-topic clients for each subscribe and publish
* publish one message (or the same message repeatedly) to a single topic
* subscribe for one topic
* this mode is only configurable via cli args
* Support MQTT5 attributes
* user properties
* content-type (to automatically detect the format of a topic)
* other attributes
2 changes: 2 additions & 0 deletions config.default.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#broker:
# host: "localhost"
# port: 1883
# protocol: tcp # tcp or websocket
#
# client_id: "mqtcli"
# keep_alive: 5 # in seconds
# username: ""
Expand Down
14 changes: 11 additions & 3 deletions src/config/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use log::LevelFilter;
use serde::de::{Error, Unexpected};
use serde::{Deserialize, Deserializer};

use crate::config::mqtli_config::{MqttVersion, TlsVersion};
use crate::config::mqtli_config::{MqttProtocol, MqttVersion, TlsVersion};
use crate::config::{args, ConfigError, PayloadType, PublishInputType};
use crate::mqtt::QoS;

Expand All @@ -19,10 +19,10 @@ use crate::mqtt::QoS;
#[clap(disable_version_flag = true)]
#[clap(disable_help_flag = true)]
pub struct MqtliArgs {
#[clap(long, action = clap::ArgAction::HelpLong)]
#[clap(long, action = clap::ArgAction::HelpLong, help = "Print help")]
help: Option<bool>,

#[clap(long, action = clap::ArgAction::Version)]
#[clap(long, action = clap::ArgAction::Version, help = "Print version")]
version: Option<bool>,

#[command(flatten)]
Expand Down Expand Up @@ -73,6 +73,14 @@ pub struct MqttBrokerConnectArgs {
)]
pub port: Option<u16>,

#[arg(
long = "protocol",
env = "BROKER_PROTOCOL",
help_heading = "Broker",
help = "The protocol to use to communicate with the broker (default: tcp)"
)]
pub protocol: Option<MqttProtocol>,

#[arg(
short = 'i',
long = "client-id",
Expand Down
18 changes: 18 additions & 0 deletions src/config/mqtli_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,13 +316,27 @@ pub enum MqttVersion {
V5,
}

#[derive(Clone, Debug, Default, Deserialize, PartialEq, ValueEnum)]
pub enum MqttProtocol {
#[default]
#[serde(rename = "tcp")]
#[clap(name = "tcp")]
Tcp,

#[serde(rename = "websocket")]
#[clap(name = "websocket")]
Websocket,
}

#[derive(Clone, Debug, Getters, Validate)]
#[validate(schema(function = "validate_credentials", skip_on_field_errors = false))]
#[validate(schema(function = "validate_tls_client", skip_on_field_errors = false))]
pub struct MqttBrokerConnectArgs {
#[validate(length(min = 1, message = "Hostname must be given"))]
host: String,
port: u16,
protocol: MqttProtocol,

#[validate(length(min = 1, message = "Client id must be given"))]
client_id: String,
mqtt_version: MqttVersion,
Expand Down Expand Up @@ -352,6 +366,9 @@ impl MqttBrokerConnectArgs {
if let Some(port) = other.port {
self.port = port
}
if let Some(protocol) = &other.protocol {
self.protocol = protocol.clone()
}
if let Some(client_id) = &other.client_id {
self.client_id = client_id.to_string()
}
Expand Down Expand Up @@ -409,6 +426,7 @@ impl Default for MqttBrokerConnectArgs {
Self {
host: "localhost".to_string(),
port: 1883,
protocol: MqttProtocol::Tcp,
client_id: "mqtli".to_string(),
mqtt_version: MqttVersion::V5,
keep_alive: Duration::from_secs(5),
Expand Down
44 changes: 41 additions & 3 deletions src/mqtt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::io::BufReader;
use std::path::PathBuf;
use std::sync::Arc;

use crate::config::mqtli_config::{MqttBrokerConnectArgs, TlsVersion};
use crate::config::mqtli_config::{MqttBrokerConnectArgs, MqttProtocol, TlsVersion};
use async_trait::async_trait;
use log::{debug, info};
use rumqttc::tokio_rustls::rustls::version::{TLS12, TLS13};
use rumqttc::tokio_rustls::rustls::{Certificate, PrivateKey, SupportedProtocolVersion};
use rumqttc::TlsConfiguration;
use rumqttc::{TlsConfiguration, Transport};
use thiserror::Error;
use tokio::sync::broadcast;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -101,7 +101,7 @@ pub enum MqttEvent {
V311(rumqttc::Event),
}

pub fn configure_tls_rustls(
fn configure_tls_rustls(
config: Arc<MqttBrokerConnectArgs>,
) -> Result<TlsConfiguration, MqttServiceError> {
fn load_private_key_from_file(path: &PathBuf) -> Result<PrivateKey, MqttServiceError> {
Expand Down Expand Up @@ -222,3 +222,41 @@ pub fn configure_tls_rustls(

Ok(TlsConfiguration::Rustls(Arc::new(tls_config)))
}

fn get_transport_parameters(
config: Arc<MqttBrokerConnectArgs>,
) -> Result<(Transport, String), MqttServiceError> {
let (transport, hostname) = match config.protocol() {
MqttProtocol::Tcp => match *config.use_tls() {
false => {
info!("Using TCP");
(Transport::Tcp, config.host().to_string())
}
true => {
info!("Using TCP with TLS");
(
Transport::Tls(configure_tls_rustls(config.clone())?),
config.host().to_string(),
)
}
},
MqttProtocol::Websocket => match *config.use_tls() {
false => {
info!("Using websockets");

let hostname = format!("ws://{}:{}/mqtt", config.host(), config.port());
(Transport::Ws, hostname)
}
true => {
info!("Using websockets with TLS");

let hostname = format!("wss://{}:{}/mqtt", config.host(), config.port());
(
Transport::Wss(configure_tls_rustls(config.clone())?),
hostname,
)
}
},
};
Ok((transport, hostname))
}
9 changes: 5 additions & 4 deletions src/mqtt/mqtt_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,15 @@ mod v311 {
use std::str::from_utf8;

use log::info;
use rumqttc::{Event, Incoming};

use crate::config::mqtli_config::Topic;
use crate::mqtt::mqtt_handler::MqttHandler;

pub fn handle_event(event: rumqttc::Event, topics: &Vec<Topic>) {
pub fn handle_event(event: Event, topics: &Vec<Topic>) {
match event {
rumqttc::Event::Incoming(event) => {
if let rumqttc::Incoming::Publish(value) = event {
Event::Incoming(event) => {
if let Incoming::Publish(value) = event {
let incoming_topic = from_utf8(value.topic.as_ref()).unwrap();

info!(
Expand All @@ -150,7 +151,7 @@ mod v311 {
);
}
}
rumqttc::Event::Outgoing(_event) => {}
Event::Outgoing(_event) => {}
}
}
}
22 changes: 8 additions & 14 deletions src/mqtt/v311/mqtt_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::sync::Arc;
use async_trait::async_trait;
use log::{debug, error, info};
use rumqttc::{AsyncClient, ConnectionError, Event, EventLoop, Incoming, MqttOptions, StateError};
use rumqttc::{ConnectReturnCode, LastWill, Transport};
use rumqttc::{ConnectReturnCode, LastWill};
use tokio::sync::{broadcast, Mutex};
use tokio::task::JoinHandle;

use crate::config::mqtli_config::MqttBrokerConnectArgs;
use crate::mqtt::{configure_tls_rustls, MqttEvent, MqttService, MqttServiceError, QoS};
use crate::mqtt::{get_transport_parameters, MqttEvent, MqttService, MqttServiceError, QoS};

pub struct MqttServiceV311 {
client: Option<AsyncClient>,
Expand Down Expand Up @@ -90,23 +90,17 @@ impl MqttService for MqttServiceV311 {
&mut self,
channel: Option<broadcast::Sender<MqttEvent>>,
) -> Result<JoinHandle<()>, MqttServiceError> {
let (transport, hostname) = get_transport_parameters(self.config.clone())?;

info!(
"Connection to {}:{} with client id {}",
self.config.host(),
"Connecting to {} on port {} with client id {}",
hostname,
self.config.port(),
self.config.client_id()
);
let mut options = MqttOptions::new(
self.config.client_id(),
self.config.host(),
*self.config.port(),
);
let mut options = MqttOptions::new(self.config.client_id(), hostname, *self.config.port());

if *self.config.use_tls() {
info!("Using TLS");

options.set_transport(Transport::Tls(configure_tls_rustls(self.config.clone())?));
}
options.set_transport(transport);

debug!(
"Setting keep alive to {} seconds",
Expand Down
21 changes: 7 additions & 14 deletions src/mqtt/v5/mqtt_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ use rumqttc::v5::mqttbytes::v5::{ConnectReturnCode, LastWill};
use rumqttc::v5::{
AsyncClient, ConnectionError, Event, EventLoop, Incoming, MqttOptions, StateError,
};
use rumqttc::Transport;
use tokio::sync::{broadcast, Mutex};
use tokio::task::JoinHandle;

use crate::config::mqtli_config::MqttBrokerConnectArgs;
use crate::mqtt::{configure_tls_rustls, MqttEvent, MqttService, MqttServiceError, QoS};
use crate::mqtt::{get_transport_parameters, MqttEvent, MqttService, MqttServiceError, QoS};

pub struct MqttServiceV5 {
config: Arc<MqttBrokerConnectArgs>,
Expand Down Expand Up @@ -93,23 +92,17 @@ impl MqttService for MqttServiceV5 {
&mut self,
channel: Option<broadcast::Sender<MqttEvent>>,
) -> Result<JoinHandle<()>, MqttServiceError> {
let (transport, hostname) = get_transport_parameters(self.config.clone())?;

info!(
"Connection to {}:{} with client id {}",
self.config.host(),
"Connecting to {} on port {} with client id {}",
hostname,
self.config.port(),
self.config.client_id()
);
let mut options = MqttOptions::new(
self.config.client_id(),
self.config.host(),
*self.config.port(),
);
let mut options = MqttOptions::new(self.config.client_id(), hostname, *self.config.port());

if *self.config.use_tls() {
info!("Using TLS");

options.set_transport(Transport::Tls(configure_tls_rustls(self.config.clone())?));
}
options.set_transport(transport);

debug!(
"Setting keep alive to {} seconds",
Expand Down

0 comments on commit af66192

Please sign in to comment.