diff --git a/mavlink-core/Cargo.toml b/mavlink-core/Cargo.toml index 5bf97b903c..ac75eceb1f 100644 --- a/mavlink-core/Cargo.toml +++ b/mavlink-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mavlink-core" -version = "0.13.2" +version = "0.14.0" authors = [ "Todd Stellanova", "Michal Podhradsky", @@ -26,9 +26,15 @@ embedded-io-async = { version = "0.6.1", optional = true } serde = { version = "1.0.115", optional = true, features = ["derive"] } serde_arrays = { version = "0.1.0", optional = true } serial = { version = "0.4", optional = true } -tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs"], optional = true } +tokio = { version = "1.0", default-features = false, features = [ + "io-util", + "net", + "sync", + "fs", +], optional = true } sha2 = { version = "0.10", optional = true } async-trait = { version = "0.1.18", optional = true } +tokio-serial = { version = "5.4.4", default-features = false, optional = true } [features] "std" = ["byteorder/std"] @@ -41,9 +47,16 @@ async-trait = { version = "0.1.18", optional = true } "embedded" = ["dep:embedded-io", "dep:embedded-io-async"] "embedded-hal-02" = ["dep:nb", "dep:embedded-hal-02"] "serde" = ["dep:serde", "dep:serde_arrays"] -"tokio-1" = ["dep:tokio", "dep:async-trait"] +"tokio-1" = ["dep:tokio", "dep:async-trait", "dep:tokio-serial"] "signing" = ["dep:sha2"] -default = ["std", "tcp", "udp", "direct-serial", "serde"] +default = ["std", "tcp", "udp", "direct-serial", "serde", "tokio-1"] [dev-dependencies] -tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs", "macros", "rt"] } \ No newline at end of file +tokio = { version = "1.0", default-features = false, features = [ + "io-util", + "net", + "sync", + "fs", + "macros", + "rt", +] } diff --git a/mavlink-core/src/async_connection/direct_serial.rs b/mavlink-core/src/async_connection/direct_serial.rs new file mode 100644 index 0000000000..4263d4ea12 --- /dev/null +++ b/mavlink-core/src/async_connection/direct_serial.rs @@ -0,0 +1,152 @@ +//! Async Serial MAVLINK connection + +use core::ops::DerefMut; +use std::io; + +use tokio::sync::Mutex; +use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream}; + +use crate::{ + async_peek_reader::AsyncPeekReader, read_v1_raw_message_async, read_v2_raw_message_async, + read_v2_raw_message_async_signed, MAVLinkRawMessage, MAVLinkV2MessageRaw, MavHeader, + MavlinkVersion, Message, +}; + +#[cfg(not(feature = "signing"))] +use crate::{read_versioned_msg_async, write_versioned_msg_async}; +#[cfg(feature = "signing")] +use crate::{ + read_versioned_msg_async_signed, write_versioned_msg_async_signed, SigningConfig, SigningData, +}; + +use super::AsyncMavConnection; + +pub fn open(settings: &str) -> io::Result { + let settings_toks: Vec<&str> = settings.split(':').collect(); + if settings_toks.len() < 2 { + return Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Incomplete port settings", + )); + } + + let Ok(baud) = settings_toks[1].parse::() else { + return Err(io::Error::new( + io::ErrorKind::AddrNotAvailable, + "Invalid baud rate", + )); + }; + + let port_name = settings_toks[0]; + let mut port = tokio_serial::new(port_name, baud).open_native_async()?; + port.set_data_bits(tokio_serial::DataBits::Eight)?; + port.set_parity(tokio_serial::Parity::None)?; + port.set_stop_bits(tokio_serial::StopBits::One)?; + port.set_flow_control(tokio_serial::FlowControl::None)?; + + Ok(AsyncSerialConnection { + port: Mutex::new(AsyncPeekReader::new(port)), + sequence: Mutex::new(0), + protocol_version: MavlinkVersion::V2, + #[cfg(feature = "signing")] + signing_data: None, + }) +} + +pub struct AsyncSerialConnection { + port: Mutex>, + sequence: Mutex, + protocol_version: MavlinkVersion, + #[cfg(feature = "signing")] + signing_data: Option, +} + +#[async_trait::async_trait] +impl AsyncMavConnection for AsyncSerialConnection { + async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> { + let mut port = self.port.lock().await; + + #[cfg(not(feature = "signing"))] + let result = read_versioned_msg_async(port.deref_mut(), self.protocol_version).await; + #[cfg(feature = "signing")] + let result = read_versioned_msg_async_signed( + port.deref_mut(), + self.protocol_version, + self.signing_data.as_ref(), + ) + .await; + result + } + + async fn recv_raw(&self) -> Result { + let mut port = self.port.lock().await; + #[cfg(not(feature = "signing"))] + let result = match self.protocol_version { + MavlinkVersion::V1 => { + MAVLinkRawMessage::V1(read_v1_raw_message_async::(port.deref_mut()).await?) + } + MavlinkVersion::V2 => { + MAVLinkRawMessage::V2(read_v2_raw_message_async::(port.deref_mut()).await?) + } + }; + #[cfg(feature = "signing")] + let result = match self.protocol_version { + MavlinkVersion::V1 => { + MAVLinkRawMessage::V1(read_v1_raw_message_async::(port.deref_mut()).await?) + } + MavlinkVersion::V2 => MAVLinkRawMessage::V2( + read_v2_raw_message_async_signed::( + port.deref_mut(), + self.signing_data.as_ref(), + ) + .await?, + ), + }; + + Ok(result) + } + + async fn send( + &self, + header: &MavHeader, + data: &M, + ) -> Result { + let mut port = self.port.lock().await; + let mut sequence = self.sequence.lock().await; + + let header = MavHeader { + sequence: *sequence, + system_id: header.system_id, + component_id: header.component_id, + }; + + *sequence = sequence.wrapping_add(1); + + #[cfg(not(feature = "signing"))] + let result = + write_versioned_msg_async(port.reader_mut(), self.protocol_version, header, data).await; + #[cfg(feature = "signing")] + let result = write_versioned_msg_async_signed( + port.reader_mut(), + self.protocol_version, + header, + data, + self.signing_data.as_ref(), + ) + .await; + result + } + + fn set_protocol_version(&mut self, version: MavlinkVersion) { + self.protocol_version = version; + } + + fn get_protocol_version(&self) -> MavlinkVersion { + self.protocol_version + } + + #[cfg(feature = "signing")] + fn setup_signing(&mut self, signing_data: Option) { + self.signing_data = signing_data.map(SigningData::from_config) + } +} diff --git a/mavlink-core/src/async_connection/file.rs b/mavlink-core/src/async_connection/file.rs index 5dae56d99e..2feef1378f 100644 --- a/mavlink-core/src/async_connection/file.rs +++ b/mavlink-core/src/async_connection/file.rs @@ -6,6 +6,10 @@ use super::AsyncMavConnection; use crate::error::{MessageReadError, MessageWriteError}; use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; +use crate::{ + read_v1_raw_message_async, read_v2_raw_message_async, read_v2_raw_message_async_signed, + MAVLinkRawMessage, MAVLinkV2MessageRaw, +}; use tokio::fs::File; use tokio::io; @@ -64,6 +68,34 @@ impl AsyncMavConnection for AsyncFileConnection { } } + async fn recv_raw(&self) -> Result { + let mut file = self.file.lock().await; + #[cfg(not(feature = "signing"))] + let result = match self.protocol_version { + MavlinkVersion::V1 => { + MAVLinkRawMessage::V1(read_v1_raw_message_async::(file.deref_mut()).await?) + } + MavlinkVersion::V2 => { + MAVLinkRawMessage::V2(read_v2_raw_message_async::(file.deref_mut()).await?) + } + }; + #[cfg(feature = "signing")] + let result = match self.protocol_version { + MavlinkVersion::V1 => { + MAVLinkRawMessage::V1(read_v1_raw_message_async::(file.deref_mut()).await?) + } + MavlinkVersion::V2 => MAVLinkRawMessage::V2( + read_v2_raw_message_async_signed::( + file.deref_mut(), + self.signing_data.as_ref(), + ) + .await?, + ), + }; + + Ok(result) + } + async fn send(&self, _header: &MavHeader, _data: &M) -> Result { Ok(0) } diff --git a/mavlink-core/src/async_connection/mod.rs b/mavlink-core/src/async_connection/mod.rs index 35e2fa0e78..38880c14d6 100644 --- a/mavlink-core/src/async_connection/mod.rs +++ b/mavlink-core/src/async_connection/mod.rs @@ -1,6 +1,6 @@ use tokio::io; -use crate::{MavFrame, MavHeader, MavlinkVersion, Message}; +use crate::{MAVLinkRawMessage, MAVLinkV2MessageRaw, MavFrame, MavHeader, MavlinkVersion, Message}; #[cfg(feature = "tcp")] mod tcp; @@ -8,6 +8,9 @@ mod tcp; #[cfg(feature = "udp")] mod udp; +#[cfg(feature = "direct-serial")] +mod direct_serial; + mod file; #[cfg(feature = "signing")] @@ -21,6 +24,8 @@ pub trait AsyncMavConnection { /// Yield until a valid frame is received, ignoring invalid messages. async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError>; + async fn recv_raw(&self) -> Result; + /// Send a mavlink message async fn send( &self, @@ -70,9 +75,9 @@ pub trait AsyncMavConnection { /// * `udpin::` to create a UDP server, listening for incoming packets /// * `udpout::` to create a UDP client /// * `udpbcast::` to create a UDP broadcast +/// * `serial::` to create a serial connection /// * `file:` to extract file data /// -/// Serial is currently not supported for async connections, use [`crate::connect`] instead. /// The type of the connection is determined at runtime based on the address type, so the /// connection is returned as a trait object. pub async fn connect_async( @@ -101,6 +106,15 @@ pub async fn connect_async( { protocol_err } + } else if cfg!(feature = "direct-serial") && address.starts_with("serial") { + #[cfg(feature = "direct-serial")] + { + Ok(Box::new(direct_serial::open(&address["serial:".len()..])?)) + } + #[cfg(not(feature = "direct-serial"))] + { + protocol_err + } } else if address.starts_with("file") { Ok(Box::new(file::open(&address["file:".len()..]).await?)) } else { diff --git a/mavlink-core/src/async_connection/tcp.rs b/mavlink-core/src/async_connection/tcp.rs index 4cd1f9ac9a..3035353f56 100644 --- a/mavlink-core/src/async_connection/tcp.rs +++ b/mavlink-core/src/async_connection/tcp.rs @@ -2,7 +2,11 @@ use super::{get_socket_addr, AsyncMavConnection}; use crate::async_peek_reader::AsyncPeekReader; -use crate::{MavHeader, MavlinkVersion, Message}; +use crate::{ + read_v1_raw_message, read_v1_raw_message_async, read_v2_raw_message_async, + read_v2_raw_message_async_signed, MAVLinkRawMessage, MAVLinkV2MessageRaw, MavHeader, + MavlinkVersion, Message, +}; use core::ops::DerefMut; use tokio::io; @@ -112,6 +116,34 @@ impl AsyncMavConnection for AsyncTcpConnection { result } + async fn recv_raw(&self) -> Result { + let mut reader = self.reader.lock().await; + #[cfg(not(feature = "signing"))] + let result = match self.protocol_version { + MavlinkVersion::V1 => { + MAVLinkRawMessage::V1(read_v1_raw_message_async::(reader.deref_mut()).await?) + } + MavlinkVersion::V2 => { + MAVLinkRawMessage::V2(read_v2_raw_message_async::(reader.deref_mut()).await?) + } + }; + #[cfg(feature = "signing")] + let result = match self.protocol_version { + MavlinkVersion::V1 => { + MAVLinkRawMessage::V1(read_v1_raw_message_async::(reader.deref_mut()).await?) + } + MavlinkVersion::V2 => MAVLinkRawMessage::V2( + read_v2_raw_message_async_signed::( + reader.deref_mut(), + self.signing_data.as_ref(), + ) + .await?, + ), + }; + + Ok(result) + } + async fn send( &self, header: &MavHeader, diff --git a/mavlink-core/src/async_connection/udp.rs b/mavlink-core/src/async_connection/udp.rs index 3f06b83e1d..285e0ce99e 100644 --- a/mavlink-core/src/async_connection/udp.rs +++ b/mavlink-core/src/async_connection/udp.rs @@ -9,7 +9,11 @@ use tokio::{ sync::Mutex, }; -use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message}; +use crate::{ + async_peek_reader::AsyncPeekReader, read_v1_raw_message_async, read_v2_raw_message_async, + read_v2_raw_message_async_signed, MAVLinkRawMessage, MAVLinkV2MessageRaw, MavHeader, + MavlinkVersion, Message, +}; use super::{get_socket_addr, AsyncMavConnection}; @@ -176,6 +180,40 @@ impl AsyncMavConnection for AsyncUdpConnection { } } + async fn recv_raw(&self) -> Result { + let mut reader = self.reader.lock().await; + loop { + #[cfg(not(feature = "signing"))] + let result = match self.protocol_version { + MavlinkVersion::V1 => MAVLinkRawMessage::V1( + read_v1_raw_message_async::(reader.deref_mut()).await?, + ), + MavlinkVersion::V2 => MAVLinkRawMessage::V2( + read_v2_raw_message_async::(reader.deref_mut()).await?, + ), + }; + #[cfg(feature = "signing")] + let result = match self.protocol_version { + MavlinkVersion::V1 => MAVLinkRawMessage::V1( + read_v1_raw_message_async::(reader.deref_mut()).await?, + ), + MavlinkVersion::V2 => MAVLinkRawMessage::V2( + read_v2_raw_message_async_signed::( + reader.deref_mut(), + self.signing_data.as_ref(), + ) + .await?, + ), + }; + if self.server { + if let addr @ Some(_) = reader.reader_ref().last_recv_address { + self.writer.lock().await.dest = addr; + } + } + return Ok(result); + } + } + async fn send( &self, header: &MavHeader, diff --git a/mavlink-core/src/lib.rs b/mavlink-core/src/lib.rs index 8af70ad89b..5057d1ad36 100644 --- a/mavlink-core/src/lib.rs +++ b/mavlink-core/src/lib.rs @@ -127,6 +127,13 @@ pub enum MavlinkVersion { V2, } +/// MAVLink raw message types wrapper +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum MAVLinkRawMessage { + V1(MAVLinkV1MessageRaw), + V2(MAVLinkV2MessageRaw), +} + /// Message framing marker for mavlink v1 pub const MAV_STX: u8 = 0xFE; diff --git a/mavlink/Cargo.toml b/mavlink/Cargo.toml index 669d9b0752..5bae1e707b 100644 --- a/mavlink/Cargo.toml +++ b/mavlink/Cargo.toml @@ -26,6 +26,11 @@ name = "mavlink-dump" path = "examples/mavlink-dump/src/main.rs" required-features = ["ardupilotmega"] +[[example]] +name = "mavlink-dump-raw-async" +path = "examples/mavlink-dump-raw-async/src/main.rs" +required-features = ["tokio-1"] + [dependencies] mavlink-core = { path = "../mavlink-core", default-features = false } num-traits = { workspace = true, default-features = false } @@ -113,8 +118,12 @@ features = [ "emit-extensions", "format-generated-code", "tokio-1", - "signing" + "signing", ] [dev-dependencies] -tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "time" ] } +tokio = { version = "1.0", default-features = false, features = [ + "macros", + "rt", + "time", +] } diff --git a/mavlink/examples/mavlink-dump-raw-async/Cargo.toml b/mavlink/examples/mavlink-dump-raw-async/Cargo.toml new file mode 100644 index 0000000000..2cfb0cf81c --- /dev/null +++ b/mavlink/examples/mavlink-dump-raw-async/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "mavlink-dump" +authors = [ + "Patrick José Pereira ", + "Roberto Petrisoru ", +] +license = "MIT/Apache-2.0" +edition = "2018" +version = "0.1.0" + +[profile.release] +opt-level = 3 +lto = true # Performs "fat" LTO which attempts to perform optimizations across all crates within the dependency graph + +[dependencies.mavlink] # MAVLink library +path = "../../" + +[dependencies] +tokio = { version = "1.40.0", default-features = false } diff --git a/mavlink/examples/mavlink-dump-raw-async/src/main.rs b/mavlink/examples/mavlink-dump-raw-async/src/main.rs new file mode 100644 index 0000000000..33d406e9e1 --- /dev/null +++ b/mavlink/examples/mavlink-dump-raw-async/src/main.rs @@ -0,0 +1,99 @@ +use mavlink::error::MessageReadError; +use std::{env, sync::Arc, thread, time::Duration}; + +#[tokio::main(flavor = "current_thread")] +async fn main() { + let args: Vec<_> = env::args().collect(); + + if args.len() < 2 { + println!( + "Usage: mavlink-dump-raw (tcpout|tcpin|udpout|udpin|udpbcast|serial|file):(ip|dev|path):(port|baud)" + ); + return; + } + + // It's possible to change the mavlink dialect to be used in the connect call + let mut mavconn = mavlink::connect_async::(&args[1]) + .await + .unwrap(); + + let vehicle = Arc::new(mavconn); + vehicle + .send(&mavlink::MavHeader::default(), &request_parameters()) + .await + .unwrap(); + vehicle + .send(&mavlink::MavHeader::default(), &request_stream()) + .await + .unwrap(); + + let vehicle_clone = vehicle.clone(); + tokio::spawn(async move { + let res = vehicle_clone.send_default(&heartbeat_message()).await; + if res.is_ok() { + tokio::time::sleep(Duration::from_secs(1)).await; + } else { + println!("send failed: {res:?}"); + } + }); + + loop { + match vehicle.recv_raw().await { + Ok(raw_msg) => match raw_msg { + mavlink::MAVLinkRawMessage::V1(msg) => { + println!("received v1 raw message (id = {})", msg.message_id()); + } + mavlink::MAVLinkRawMessage::V2(msg) => { + println!("received v2 raw message (id = {})", msg.message_id()); + } + }, + Err(MessageReadError::Io(e)) => { + if e.kind() == std::io::ErrorKind::WouldBlock { + //no messages currently available to receive -- wait a while + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } else { + println!("recv error: {e:?}"); + break; + } + } + // messages that didn't get through due to parser errors are ignored + _ => {} + } + } +} + +/// Create a heartbeat message using 'ardupilotmega' dialect +pub fn heartbeat_message() -> mavlink::ardupilotmega::MavMessage { + mavlink::ardupilotmega::MavMessage::HEARTBEAT(mavlink::ardupilotmega::HEARTBEAT_DATA { + custom_mode: 0, + mavtype: mavlink::ardupilotmega::MavType::MAV_TYPE_QUADROTOR, + autopilot: mavlink::ardupilotmega::MavAutopilot::MAV_AUTOPILOT_ARDUPILOTMEGA, + base_mode: mavlink::ardupilotmega::MavModeFlag::empty(), + system_status: mavlink::ardupilotmega::MavState::MAV_STATE_STANDBY, + mavlink_version: 0x3, + }) +} + +/// Create a message requesting the parameters list +pub fn request_parameters() -> mavlink::ardupilotmega::MavMessage { + mavlink::ardupilotmega::MavMessage::PARAM_REQUEST_LIST( + mavlink::ardupilotmega::PARAM_REQUEST_LIST_DATA { + target_system: 0, + target_component: 0, + }, + ) +} + +/// Create a message enabling data streaming +pub fn request_stream() -> mavlink::ardupilotmega::MavMessage { + mavlink::ardupilotmega::MavMessage::REQUEST_DATA_STREAM( + mavlink::ardupilotmega::REQUEST_DATA_STREAM_DATA { + target_system: 0, + target_component: 0, + req_stream_id: 0, + req_message_rate: 10, + start_stop: 1, + }, + ) +}