Skip to content

Commit

Permalink
mavlink-core: AsyncMavConnection recv_raw support
Browse files Browse the repository at this point in the history
  • Loading branch information
roby2014 committed Sep 24, 2024
1 parent 137c77a commit 112070d
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 7 deletions.
18 changes: 15 additions & 3 deletions mavlink-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ embedded-io-async = { version = "0.6.1", optional = true }
serde = { version = "1.0.115", optional = true, features = ["derive"] }
serde_arrays = { version = "0.1.0", optional = true }
serial = { version = "0.4", optional = true }
tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs"], optional = true }
tokio = { version = "1.0", default-features = false, features = [
"io-util",
"net",
"sync",
"fs",
], optional = true }
sha2 = { version = "0.10", optional = true }
async-trait = { version = "0.1.18", optional = true }
tokio-serial = { version = "5.4.4", default-features = false, optional = true }
Expand All @@ -44,7 +49,14 @@ tokio-serial = { version = "5.4.4", default-features = false, optional = true }
"serde" = ["dep:serde", "dep:serde_arrays"]
"tokio-1" = ["dep:tokio", "dep:async-trait", "dep:tokio-serial"]
"signing" = ["dep:sha2"]
default = ["std", "tcp", "udp", "direct-serial", "serde"]
default = ["std", "tcp", "udp", "direct-serial", "serde", "tokio-1"]

[dev-dependencies]
tokio = { version = "1.0", default-features = false, features = ["io-util", "net", "sync", "fs", "macros", "rt"] }
tokio = { version = "1.0", default-features = false, features = [
"io-util",
"net",
"sync",
"fs",
"macros",
"rt",
] }
34 changes: 33 additions & 1 deletion mavlink-core/src/async_connection/direct_serial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use std::io;
use tokio::sync::Mutex;
use tokio_serial::{SerialPort, SerialPortBuilderExt, SerialStream};

use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message};
use crate::{
async_peek_reader::AsyncPeekReader, read_v1_raw_message_async, read_v2_raw_message_async,
read_v2_raw_message_async_signed, MAVLinkRawMessage, MAVLinkV2MessageRaw, MavHeader,
MavlinkVersion, Message,
};

#[cfg(not(feature = "signing"))]
use crate::{read_versioned_msg_async, write_versioned_msg_async};
Expand Down Expand Up @@ -74,6 +78,34 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncSerialConnection {
result
}

async fn recv_raw(&self) -> Result<MAVLinkRawMessage, crate::error::MessageReadError> {
let mut port = self.port.lock().await;
#[cfg(not(feature = "signing"))]
let result = match self.protocol_version {
MavlinkVersion::V1 => {
MAVLinkRawMessage::V1(read_v1_raw_message_async::<M, _>(port.deref_mut()).await?)
}
MavlinkVersion::V2 => {
MAVLinkRawMessage::V2(read_v2_raw_message_async::<M, _>(port.deref_mut()).await?)
}
};
#[cfg(feature = "signing")]
let result = match self.protocol_version {
MavlinkVersion::V1 => {
MAVLinkRawMessage::V1(read_v1_raw_message_async::<M, _>(port.deref_mut()).await?)
}
MavlinkVersion::V2 => MAVLinkRawMessage::V2(
read_v2_raw_message_async_signed::<M, _>(
port.deref_mut(),
self.signing_data.as_ref(),
)
.await?,
),
};

Ok(result)
}

async fn send(
&self,
header: &MavHeader,
Expand Down
32 changes: 32 additions & 0 deletions mavlink-core/src/async_connection/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ use super::AsyncMavConnection;
use crate::error::{MessageReadError, MessageWriteError};

use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message};
use crate::{
read_v1_raw_message_async, read_v2_raw_message_async, read_v2_raw_message_async_signed,
MAVLinkRawMessage, MAVLinkV2MessageRaw,
};

use tokio::fs::File;
use tokio::io;
Expand Down Expand Up @@ -64,6 +68,34 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncFileConnection {
}
}

async fn recv_raw(&self) -> Result<MAVLinkRawMessage, crate::error::MessageReadError> {
let mut file = self.file.lock().await;
#[cfg(not(feature = "signing"))]
let result = match self.protocol_version {
MavlinkVersion::V1 => {
MAVLinkRawMessage::V1(read_v1_raw_message_async::<M, _>(file.deref_mut()).await?)
}
MavlinkVersion::V2 => {
MAVLinkRawMessage::V2(read_v2_raw_message_async::<M, _>(file.deref_mut()).await?)
}
};
#[cfg(feature = "signing")]
let result = match self.protocol_version {
MavlinkVersion::V1 => {
MAVLinkRawMessage::V1(read_v1_raw_message_async::<M, _>(file.deref_mut()).await?)
}
MavlinkVersion::V2 => MAVLinkRawMessage::V2(
read_v2_raw_message_async_signed::<M, _>(
file.deref_mut(),
self.signing_data.as_ref(),
)
.await?,
),
};

Ok(result)
}

async fn send(&self, _header: &MavHeader, _data: &M) -> Result<usize, MessageWriteError> {
Ok(0)
}
Expand Down
4 changes: 3 additions & 1 deletion mavlink-core/src/async_connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use tokio::io;

use crate::{MavFrame, MavHeader, MavlinkVersion, Message};
use crate::{MAVLinkRawMessage, MAVLinkV2MessageRaw, MavFrame, MavHeader, MavlinkVersion, Message};

#[cfg(feature = "tcp")]
mod tcp;
Expand All @@ -24,6 +24,8 @@ pub trait AsyncMavConnection<M: Message + Sync + Send> {
/// Yield until a valid frame is received, ignoring invalid messages.
async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError>;

async fn recv_raw(&self) -> Result<MAVLinkRawMessage, crate::error::MessageReadError>;

/// Send a mavlink message
async fn send(
&self,
Expand Down
34 changes: 33 additions & 1 deletion mavlink-core/src/async_connection/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

use super::{get_socket_addr, AsyncMavConnection};
use crate::async_peek_reader::AsyncPeekReader;
use crate::{MavHeader, MavlinkVersion, Message};
use crate::{
read_v1_raw_message, read_v1_raw_message_async, read_v2_raw_message_async,
read_v2_raw_message_async_signed, MAVLinkRawMessage, MAVLinkV2MessageRaw, MavHeader,
MavlinkVersion, Message,
};

use core::ops::DerefMut;
use tokio::io;
Expand Down Expand Up @@ -112,6 +116,34 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
result
}

async fn recv_raw(&self) -> Result<MAVLinkRawMessage, crate::error::MessageReadError> {
let mut reader = self.reader.lock().await;
#[cfg(not(feature = "signing"))]
let result = match self.protocol_version {
MavlinkVersion::V1 => {
MAVLinkRawMessage::V1(read_v1_raw_message_async::<M, _>(reader.deref_mut()).await?)
}
MavlinkVersion::V2 => {
MAVLinkRawMessage::V2(read_v2_raw_message_async::<M, _>(reader.deref_mut()).await?)
}
};
#[cfg(feature = "signing")]
let result = match self.protocol_version {
MavlinkVersion::V1 => {
MAVLinkRawMessage::V1(read_v1_raw_message_async::<M, _>(reader.deref_mut()).await?)
}
MavlinkVersion::V2 => MAVLinkRawMessage::V2(
read_v2_raw_message_async_signed::<M, _>(
reader.deref_mut(),
self.signing_data.as_ref(),
)
.await?,
),
};

Ok(result)
}

async fn send(
&self,
header: &MavHeader,
Expand Down
40 changes: 39 additions & 1 deletion mavlink-core/src/async_connection/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use tokio::{
sync::Mutex,
};

use crate::{async_peek_reader::AsyncPeekReader, MavHeader, MavlinkVersion, Message};
use crate::{
async_peek_reader::AsyncPeekReader, read_v1_raw_message_async, read_v2_raw_message_async,
read_v2_raw_message_async_signed, MAVLinkRawMessage, MAVLinkV2MessageRaw, MavHeader,
MavlinkVersion, Message,
};

use super::{get_socket_addr, AsyncMavConnection};

Expand Down Expand Up @@ -176,6 +180,40 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncUdpConnection {
}
}

async fn recv_raw(&self) -> Result<MAVLinkRawMessage, crate::error::MessageReadError> {
let mut reader = self.reader.lock().await;
loop {
#[cfg(not(feature = "signing"))]
let result = match self.protocol_version {
MavlinkVersion::V1 => MAVLinkRawMessage::V1(
read_v1_raw_message_async::<M, _>(reader.deref_mut()).await?,
),
MavlinkVersion::V2 => MAVLinkRawMessage::V2(
read_v2_raw_message_async::<M, _>(reader.deref_mut()).await?,
),
};
#[cfg(feature = "signing")]
let result = match self.protocol_version {
MavlinkVersion::V1 => MAVLinkRawMessage::V1(
read_v1_raw_message_async::<M, _>(reader.deref_mut()).await?,
),
MavlinkVersion::V2 => MAVLinkRawMessage::V2(
read_v2_raw_message_async_signed::<M, _>(
reader.deref_mut(),
self.signing_data.as_ref(),
)
.await?,
),
};
if self.server {
if let addr @ Some(_) = reader.reader_ref().last_recv_address {
self.writer.lock().await.dest = addr;
}
}
return Ok(result);
}
}

async fn send(
&self,
header: &MavHeader,
Expand Down
7 changes: 7 additions & 0 deletions mavlink-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ pub enum MavlinkVersion {
V2,
}

/// MAVLink raw message types wrapper
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum MAVLinkRawMessage {
V1(MAVLinkV1MessageRaw),
V2(MAVLinkV2MessageRaw),
}

/// Message framing marker for mavlink v1
pub const MAV_STX: u8 = 0xFE;

Expand Down

0 comments on commit 112070d

Please sign in to comment.