From af6619235d37992223be9856e283b9135e0d6fc2 Mon Sep 17 00:00:00 2001 From: Jens Date: Wed, 10 Jan 2024 16:14:35 +0100 Subject: [PATCH] Mqtli 10 support websockts, including tls (#28) * feat: enable websocket feature for rumqttc * feat(mqtt): add support for websocket (also via TLS) The version of rumqttc used to enable websocket support is not officially released yet. There was a bug with creating the websocket transport layer which has now been fixed but not yet released. Whenever the bugfix is contained in the official releases, the version will be adjusted accordingly. * docs(readme): update current config and websocket support --- Cargo.lock | 412 ++++++++++++++++++++++++++++------ Cargo.toml | 2 +- README.md | 28 ++- config.default.yaml | 2 + src/config/args.rs | 14 +- src/config/mqtli_config.rs | 18 ++ src/mqtt/mod.rs | 44 +++- src/mqtt/mqtt_handler.rs | 9 +- src/mqtt/v311/mqtt_service.rs | 22 +- src/mqtt/v5/mqtt_service.rs | 21 +- 10 files changed, 455 insertions(+), 117 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 169fd13..ad36e80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -91,9 +91,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.75" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" [[package]] name = "async-trait" @@ -106,6 +106,34 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "async-tungstenite" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1e9efbe14612da0a19fb983059a0b621e9cf6225d7018ecab4f9988215540dc" +dependencies = [ + "futures-io", + "futures-util", + "log", + "pin-project-lite", + "rustls-native-certs", + "tokio", + "tokio-rustls", + "tungstenite", +] + +[[package]] +name = "async_io_stream" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c" +dependencies = [ + "futures", + "pharos", + "rustc_version", + "tokio", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -129,9 +157,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.21.5" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +checksum = "c79fed4cdb43e993fcdadc7e58a09fd0e3e649c4436fa11da71c9f1f3ee7feb9" [[package]] name = "bitflags" @@ -139,6 +167,12 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "bitflags" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" + [[package]] name = "block-buffer" version = "0.10.4" @@ -154,6 +188,12 @@ version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.5.0" @@ -189,9 +229,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.11" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfaff671f6b22ca62406885ece523383b9b64022e341e53e009a62ebc47a45f2" +checksum = "33e92c5c1a78c62968ec57dbc2440366a2d6e5a23faf829970ff1585dc6b18e2" dependencies = [ "clap_builder", "clap_derive", @@ -199,9 +239,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.4.11" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a216b506622bb1d316cd51328dce24e07bdff4a6128a47c7e7fad11878d5adbb" +checksum = "f4323769dc8a61e2c39ad7dc26f6f2800524691a44d74fe3d1071a5c24db6370" dependencies = [ "anstream", "anstyle", @@ -260,9 +300,9 @@ checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" [[package]] name = "cpufeatures" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce420fe07aecd3e67c5f910618fe65e94158f6dcc0adf44e00d69ce2bdfe0fd0" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" dependencies = [ "libc", ] @@ -277,11 +317,17 @@ dependencies = [ "typenum", ] +[[package]] +name = "data-encoding" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e962a19be5cfc3f3bf6dd8f61eb50107f356ad6270fbb3ed41476571db78be5" + [[package]] name = "deranged" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", ] @@ -330,6 +376,12 @@ dependencies = [ "spin", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -339,32 +391,90 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -382,9 +492,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" +checksum = "190092ea657667030ac6a35e305e62fc4dd69fd98ac98631e5d3a2b1575a12b5" dependencies = [ "cfg-if", "libc", @@ -421,11 +531,28 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "http" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8947b1a6fad4393052c7ba1f4cd97bed3e953a95c79c92ad9b051a04611d9fbb" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + [[package]] name = "iana-time-zone" -version = "0.1.58" +version = "0.1.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" +checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -503,9 +630,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.151" +version = "0.2.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" [[package]] name = "lock_api" @@ -525,9 +652,9 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "memchr" -version = "2.6.4" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "miniz_oxide" @@ -566,7 +693,7 @@ dependencies = [ "protofish", "regex", "rumqttc", - "rustls 0.22.1", + "rustls 0.22.2", "rustls-pemfile", "serde", "serde_json", @@ -607,9 +734,9 @@ dependencies = [ [[package]] name = "object" -version = "0.32.1" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +checksum = "a6a622008b6e321afc04970976f62ee297fdbaa6f95318ca343e3eebb9648441" dependencies = [ "memchr", ] @@ -634,9 +761,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pest" -version = "2.7.5" +version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae9cee2a55a544be8b89dc6848072af97a20f2422603c10865be2a42b580fff5" +checksum = "1f200d8d83c44a45b21764d1916299752ca035d15ecd46faca3e9a2a2bf6ad06" dependencies = [ "memchr", "thiserror", @@ -645,9 +772,9 @@ dependencies = [ [[package]] name = "pest_derive" -version = "2.7.5" +version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81d78524685f5ef2a3b3bd1cafbc9fcabb036253d9b1463e726a91cd16e2dfc2" +checksum = "bcd6ab1236bbdb3a49027e920e693192ebfe8913f6d60e294de57463a493cfde" dependencies = [ "pest", "pest_generator", @@ -655,9 +782,9 @@ dependencies = [ [[package]] name = "pest_generator" -version = "2.7.5" +version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68bd1206e71118b5356dae5ddc61c8b11e28b09ef6a31acbd15ea48a28e0c227" +checksum = "2a31940305ffc96863a735bef7c7994a00b325a7138fdbc5bda0f1a0476d3275" dependencies = [ "pest", "pest_meta", @@ -668,15 +795,25 @@ dependencies = [ [[package]] name = "pest_meta" -version = "2.7.5" +version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c747191d4ad9e4a4ab9c8798f1e82a39affe7ef9648390b7e5548d18e099de6" +checksum = "a7ff62f5259e53b78d1af898941cdcdccfae7385cf7d793a6e55de5d05bb4b7d" dependencies = [ "once_cell", "pest", "sha2", ] +[[package]] +name = "pharos" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414" +dependencies = [ + "futures", + "rustc_version", +] + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -695,6 +832,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -749,6 +892,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "regex" version = "1.10.2" @@ -795,12 +968,13 @@ dependencies = [ [[package]] name = "rumqttc" version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d8941c6791801b667d52bfe9ff4fc7c968d4f3f9ae8ae7abdaaa1c966feafc8" +source = "git+https://github.com/bytebeamio/rumqtt.git?rev=431be1b#431be1b45b2a57b188db5e0b77ba77c9f47baa63" dependencies = [ + "async-tungstenite", "bytes", "flume", "futures-util", + "http", "log", "rustls-native-certs", "rustls-pemfile", @@ -808,6 +982,7 @@ dependencies = [ "thiserror", "tokio", "tokio-rustls", + "ws_stream_tungstenite", ] [[package]] @@ -816,6 +991,15 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustls" version = "0.21.10" @@ -830,14 +1014,14 @@ dependencies = [ [[package]] name = "rustls" -version = "0.22.1" +version = "0.22.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe6b63262c9fcac8659abfaa96cac103d28166d3ff3eaf8f412e19f3ae9e5a48" +checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41" dependencies = [ "log", "ring", "rustls-pki-types", - "rustls-webpki 0.102.0", + "rustls-webpki 0.102.1", "subtle", "zeroize", ] @@ -865,9 +1049,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7673e0aa20ee4937c6aacfc12bb8341cfbf054cdd21df6bec5fd0629fe9339b" +checksum = "9e9d979b3ce68192e42760c7810125eb6cf2ea10efae545a156063e61f314e2a" [[package]] name = "rustls-webpki" @@ -881,9 +1065,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.102.0" +version = "0.102.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de2635c8bc2b88d367767c5de8ea1d8db9af3f6219eba28442242d9ab81d1b89" +checksum = "ef4ca26037c909dedb327b48c3327d0ba91d3dd3c4e05dad328f210ffb68e95b" dependencies = [ "ring", "rustls-pki-types", @@ -898,11 +1082,11 @@ checksum = "f98d2aa92eebf49b69786be48e4477826b256916e84a57ff2a4f21923b48eb4c" [[package]] name = "schannel" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -927,7 +1111,7 @@ version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05b64fb303737d99b81884b2c63433e9ae28abebe5eb5045dcdd175dc2ecf4de" dependencies = [ - "bitflags", + "bitflags 1.3.2", "core-foundation", "core-foundation-sys", "libc", @@ -944,20 +1128,26 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" + [[package]] name = "serde" -version = "1.0.193" +version = "1.0.195" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25dd9975e68d0cb5aa1120c288333fc98731bd1dd12f561e468ea4728c042b89" +checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.193" +version = "1.0.195" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" +checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" dependencies = [ "proc-macro2", "quote", @@ -966,9 +1156,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.108" +version = "1.0.111" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" dependencies = [ "itoa", "ryu", @@ -977,9 +1167,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.27" +version = "0.9.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cc7a1570e38322cfe4154732e5110f887ea57e22b76f4bfd32b5bdd3368666c" +checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38" dependencies = [ "indexmap", "itoa", @@ -988,6 +1178,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sha2" version = "0.10.8" @@ -1113,18 +1314,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.51" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f11c217e1416d6f036b870f14e0413d480dbf28edbee1f877abaf0206af43bb7" +checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.51" +version = "1.0.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01742297787513b79cf8e29d1056ede1313e2420b7b3b15d0a768b4921f549df" +checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", @@ -1216,6 +1417,57 @@ dependencies = [ "tokio", ] +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "tungstenite" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "rustls 0.21.10", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -1272,6 +1524,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" @@ -1419,11 +1677,11 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-core" -version = "0.51.1" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -1558,6 +1816,26 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" +[[package]] +name = "ws_stream_tungstenite" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e283cc794a890f5bdc01e358ad7c34535025f79ba83c1b5c7e01e5d6c60b336d" +dependencies = [ + "async-tungstenite", + "async_io_stream", + "bitflags 2.4.1", + "futures-core", + "futures-io", + "futures-sink", + "futures-util", + "pharos", + "rustc_version", + "tokio", + "tracing", + "tungstenite", +] + [[package]] name = "zeroize" version = "1.7.0" diff --git a/Cargo.toml b/Cargo.toml index c8869e8..b517ade 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ clap = { version = "4.4.11", features = ["derive", "env"] } derive-getters = "0.3.0" log = "0.4.20" protofish = "0.5.2" -rumqttc = "0.23.0" +rumqttc = { git = "https://github.com/bytebeamio/rumqtt.git", rev = "431be1b", features = ["websocket"] } serde = { version = "1.0.193", features = ["derive"] } serde_yaml = "0.9.27" simplelog = "0.12.1" diff --git a/README.md b/README.md index c7d2b3a..78f26d7 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ The supported data formats and the conversion rules are listed under [supported * Configuration via cli arguments and config file (yaml) * MQTT v5 and v3.1.1 * TLS support (v1.2 and v1.3) +* Websocket support (unencrypted and TLS) * Client authentication via username/password * Client authentication via TLS certificates * Last will @@ -54,27 +55,29 @@ can only be specified in the config file because it would be too complex to spec The following lists all possible command line arguments and environment variables (also available via `mqtli --help`): -```shell +``` Usage: mqtli.exe [OPTIONS] Options: + --help Print help + --version Print version -c, --config-file Path to the config file (default: config.yaml) [env: CONFIG_FILE_PATH=] - -h, --help Print help - -V, --version Print version Broker: - -o, --host The ip address or hostname of the broker (default: localhost) [env: BROKER_HOST=] - -p, --port The port the broker is listening on (default: 1883) [env: BROKER_PORT=] - -i, --client-id The client id for this mqtli instance (default: mqtli) [env: BROKER_CLIENT_ID=] - --keep-alive Keep alive time (default: 5 seconds) [env: BROKER_KEEP_ALIVE=] - -u, --username (optional) Username used to authenticate against the broker; if used then username must be given too (default: empty) [env: BROKER_USERNAME=] - -w, --password (optional) Password used to authenticate against the broker; if used then password must be given too (default: empty) [env: BROKER_PASSWORD=] + -h, --host The ip address or hostname of the broker (default: localhost) [env: BROKER_HOST=] + -p, --port The port the broker is listening on (default: 1883) [env: BROKER_PORT=] + --protocol The protocol to use to communicate with the broker (default: tcp) [env: BROKER_PROTOCOL=] [possible values: tcp, websocket] + -i, --client-id The client id for this mqtli instance (default: mqtli) [env: BROKER_CLIENT_ID=] + -v, --mqtt-version The MQTT version to use (default: v5) [env: BROKER_MQTT_VERSION=] [possible values: v311, v5] + --keep-alive Keep alive time in seconds (default: 5 seconds) [env: BROKER_KEEP_ALIVE=] + -u, --username (optional) Username used to authenticate against the broker; if used then username must be given too (default: empty) [env: BROKER_USERNAME=] + -w, --password (optional) Password used to authenticate against the broker; if used then password must be given too (default: empty) [env: BROKER_PASSWORD=] TLS: --use-tls If specified, TLS is used to communicate with the broker (default: false) [env: BROKER_USE_TLS=] [possible values: true, false] --ca-file - Path to a PEM encoded ca certificate to verify the broker`s certificate (default: empty) [env: BROKER_TLS_CA_FILE=] + Path to a PEM encoded ca certificate to verify the broker's certificate (default: empty) [env: BROKER_TLS_CA_FILE=] --client-cert (optional) Path to a PEM encoded client certificate for authenticating against the broker; must be specified with client-key (default: empty) [env: BROKER_TLS_CLIENT_CERTIFICATE_FILE=] --client-key @@ -567,8 +570,11 @@ topics: ## Future plans -* Support websockets * Single-topic clients for each subscribe and publish * publish one message (or the same message repeatedly) to a single topic * subscribe for one topic * this mode is only configurable via cli args +* Support MQTT5 attributes + * user properties + * content-type (to automatically detect the format of a topic) + * other attributes diff --git a/config.default.yaml b/config.default.yaml index 8622d7d..b31edba 100644 --- a/config.default.yaml +++ b/config.default.yaml @@ -1,6 +1,8 @@ #broker: # host: "localhost" # port: 1883 +# protocol: tcp # tcp or websocket +# # client_id: "mqtcli" # keep_alive: 5 # in seconds # username: "" diff --git a/src/config/args.rs b/src/config/args.rs index a1b5197..39cc6c3 100644 --- a/src/config/args.rs +++ b/src/config/args.rs @@ -10,7 +10,7 @@ use log::LevelFilter; use serde::de::{Error, Unexpected}; use serde::{Deserialize, Deserializer}; -use crate::config::mqtli_config::{MqttVersion, TlsVersion}; +use crate::config::mqtli_config::{MqttProtocol, MqttVersion, TlsVersion}; use crate::config::{args, ConfigError, PayloadType, PublishInputType}; use crate::mqtt::QoS; @@ -19,10 +19,10 @@ use crate::mqtt::QoS; #[clap(disable_version_flag = true)] #[clap(disable_help_flag = true)] pub struct MqtliArgs { - #[clap(long, action = clap::ArgAction::HelpLong)] + #[clap(long, action = clap::ArgAction::HelpLong, help = "Print help")] help: Option, - #[clap(long, action = clap::ArgAction::Version)] + #[clap(long, action = clap::ArgAction::Version, help = "Print version")] version: Option, #[command(flatten)] @@ -73,6 +73,14 @@ pub struct MqttBrokerConnectArgs { )] pub port: Option, + #[arg( + long = "protocol", + env = "BROKER_PROTOCOL", + help_heading = "Broker", + help = "The protocol to use to communicate with the broker (default: tcp)" + )] + pub protocol: Option, + #[arg( short = 'i', long = "client-id", diff --git a/src/config/mqtli_config.rs b/src/config/mqtli_config.rs index 02590c0..7494a6b 100644 --- a/src/config/mqtli_config.rs +++ b/src/config/mqtli_config.rs @@ -316,6 +316,18 @@ pub enum MqttVersion { V5, } +#[derive(Clone, Debug, Default, Deserialize, PartialEq, ValueEnum)] +pub enum MqttProtocol { + #[default] + #[serde(rename = "tcp")] + #[clap(name = "tcp")] + Tcp, + + #[serde(rename = "websocket")] + #[clap(name = "websocket")] + Websocket, +} + #[derive(Clone, Debug, Getters, Validate)] #[validate(schema(function = "validate_credentials", skip_on_field_errors = false))] #[validate(schema(function = "validate_tls_client", skip_on_field_errors = false))] @@ -323,6 +335,8 @@ pub struct MqttBrokerConnectArgs { #[validate(length(min = 1, message = "Hostname must be given"))] host: String, port: u16, + protocol: MqttProtocol, + #[validate(length(min = 1, message = "Client id must be given"))] client_id: String, mqtt_version: MqttVersion, @@ -352,6 +366,9 @@ impl MqttBrokerConnectArgs { if let Some(port) = other.port { self.port = port } + if let Some(protocol) = &other.protocol { + self.protocol = protocol.clone() + } if let Some(client_id) = &other.client_id { self.client_id = client_id.to_string() } @@ -409,6 +426,7 @@ impl Default for MqttBrokerConnectArgs { Self { host: "localhost".to_string(), port: 1883, + protocol: MqttProtocol::Tcp, client_id: "mqtli".to_string(), mqtt_version: MqttVersion::V5, keep_alive: Duration::from_secs(5), diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs index ff9ef4a..11e4d1f 100644 --- a/src/mqtt/mod.rs +++ b/src/mqtt/mod.rs @@ -4,12 +4,12 @@ use std::io::BufReader; use std::path::PathBuf; use std::sync::Arc; -use crate::config::mqtli_config::{MqttBrokerConnectArgs, TlsVersion}; +use crate::config::mqtli_config::{MqttBrokerConnectArgs, MqttProtocol, TlsVersion}; use async_trait::async_trait; use log::{debug, info}; use rumqttc::tokio_rustls::rustls::version::{TLS12, TLS13}; use rumqttc::tokio_rustls::rustls::{Certificate, PrivateKey, SupportedProtocolVersion}; -use rumqttc::TlsConfiguration; +use rumqttc::{TlsConfiguration, Transport}; use thiserror::Error; use tokio::sync::broadcast; use tokio::task::JoinHandle; @@ -101,7 +101,7 @@ pub enum MqttEvent { V311(rumqttc::Event), } -pub fn configure_tls_rustls( +fn configure_tls_rustls( config: Arc, ) -> Result { fn load_private_key_from_file(path: &PathBuf) -> Result { @@ -222,3 +222,41 @@ pub fn configure_tls_rustls( Ok(TlsConfiguration::Rustls(Arc::new(tls_config))) } + +fn get_transport_parameters( + config: Arc, +) -> Result<(Transport, String), MqttServiceError> { + let (transport, hostname) = match config.protocol() { + MqttProtocol::Tcp => match *config.use_tls() { + false => { + info!("Using TCP"); + (Transport::Tcp, config.host().to_string()) + } + true => { + info!("Using TCP with TLS"); + ( + Transport::Tls(configure_tls_rustls(config.clone())?), + config.host().to_string(), + ) + } + }, + MqttProtocol::Websocket => match *config.use_tls() { + false => { + info!("Using websockets"); + + let hostname = format!("ws://{}:{}/mqtt", config.host(), config.port()); + (Transport::Ws, hostname) + } + true => { + info!("Using websockets with TLS"); + + let hostname = format!("wss://{}:{}/mqtt", config.host(), config.port()); + ( + Transport::Wss(configure_tls_rustls(config.clone())?), + hostname, + ) + } + }, + }; + Ok((transport, hostname)) +} diff --git a/src/mqtt/mqtt_handler.rs b/src/mqtt/mqtt_handler.rs index 7072929..aceba3c 100644 --- a/src/mqtt/mqtt_handler.rs +++ b/src/mqtt/mqtt_handler.rs @@ -128,14 +128,15 @@ mod v311 { use std::str::from_utf8; use log::info; + use rumqttc::{Event, Incoming}; use crate::config::mqtli_config::Topic; use crate::mqtt::mqtt_handler::MqttHandler; - pub fn handle_event(event: rumqttc::Event, topics: &Vec) { + pub fn handle_event(event: Event, topics: &Vec) { match event { - rumqttc::Event::Incoming(event) => { - if let rumqttc::Incoming::Publish(value) = event { + Event::Incoming(event) => { + if let Incoming::Publish(value) = event { let incoming_topic = from_utf8(value.topic.as_ref()).unwrap(); info!( @@ -150,7 +151,7 @@ mod v311 { ); } } - rumqttc::Event::Outgoing(_event) => {} + Event::Outgoing(_event) => {} } } } diff --git a/src/mqtt/v311/mqtt_service.rs b/src/mqtt/v311/mqtt_service.rs index 2c7aae1..287834f 100644 --- a/src/mqtt/v311/mqtt_service.rs +++ b/src/mqtt/v311/mqtt_service.rs @@ -4,12 +4,12 @@ use std::sync::Arc; use async_trait::async_trait; use log::{debug, error, info}; use rumqttc::{AsyncClient, ConnectionError, Event, EventLoop, Incoming, MqttOptions, StateError}; -use rumqttc::{ConnectReturnCode, LastWill, Transport}; +use rumqttc::{ConnectReturnCode, LastWill}; use tokio::sync::{broadcast, Mutex}; use tokio::task::JoinHandle; use crate::config::mqtli_config::MqttBrokerConnectArgs; -use crate::mqtt::{configure_tls_rustls, MqttEvent, MqttService, MqttServiceError, QoS}; +use crate::mqtt::{get_transport_parameters, MqttEvent, MqttService, MqttServiceError, QoS}; pub struct MqttServiceV311 { client: Option, @@ -90,23 +90,17 @@ impl MqttService for MqttServiceV311 { &mut self, channel: Option>, ) -> Result, MqttServiceError> { + let (transport, hostname) = get_transport_parameters(self.config.clone())?; + info!( - "Connection to {}:{} with client id {}", - self.config.host(), + "Connecting to {} on port {} with client id {}", + hostname, self.config.port(), self.config.client_id() ); - let mut options = MqttOptions::new( - self.config.client_id(), - self.config.host(), - *self.config.port(), - ); + let mut options = MqttOptions::new(self.config.client_id(), hostname, *self.config.port()); - if *self.config.use_tls() { - info!("Using TLS"); - - options.set_transport(Transport::Tls(configure_tls_rustls(self.config.clone())?)); - } + options.set_transport(transport); debug!( "Setting keep alive to {} seconds", diff --git a/src/mqtt/v5/mqtt_service.rs b/src/mqtt/v5/mqtt_service.rs index 9aeb5ca..11f6ee0 100644 --- a/src/mqtt/v5/mqtt_service.rs +++ b/src/mqtt/v5/mqtt_service.rs @@ -7,12 +7,11 @@ use rumqttc::v5::mqttbytes::v5::{ConnectReturnCode, LastWill}; use rumqttc::v5::{ AsyncClient, ConnectionError, Event, EventLoop, Incoming, MqttOptions, StateError, }; -use rumqttc::Transport; use tokio::sync::{broadcast, Mutex}; use tokio::task::JoinHandle; use crate::config::mqtli_config::MqttBrokerConnectArgs; -use crate::mqtt::{configure_tls_rustls, MqttEvent, MqttService, MqttServiceError, QoS}; +use crate::mqtt::{get_transport_parameters, MqttEvent, MqttService, MqttServiceError, QoS}; pub struct MqttServiceV5 { config: Arc, @@ -93,23 +92,17 @@ impl MqttService for MqttServiceV5 { &mut self, channel: Option>, ) -> Result, MqttServiceError> { + let (transport, hostname) = get_transport_parameters(self.config.clone())?; + info!( - "Connection to {}:{} with client id {}", - self.config.host(), + "Connecting to {} on port {} with client id {}", + hostname, self.config.port(), self.config.client_id() ); - let mut options = MqttOptions::new( - self.config.client_id(), - self.config.host(), - *self.config.port(), - ); + let mut options = MqttOptions::new(self.config.client_id(), hostname, *self.config.port()); - if *self.config.use_tls() { - info!("Using TLS"); - - options.set_transport(Transport::Tls(configure_tls_rustls(self.config.clone())?)); - } + options.set_transport(transport); debug!( "Setting keep alive to {} seconds",