Skip to content

Commit

Permalink
fix: rename tcp/udp connection to AsyncTcpConnection/AsyncUdpConnecti…
Browse files Browse the repository at this point in the history
…on, remove unneeded tokio:: prefixes
  • Loading branch information
pv42 committed Sep 9, 2024
1 parent c338795 commit b2c3cb4
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
12 changes: 6 additions & 6 deletions mavlink-core/src/async_connection/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ pub async fn select_protocol<M: Message + Sync + Send>(
Ok(Box::new(connection?))
}

pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
let addr = get_socket_addr(address)?;

let socket = TcpStream::connect(addr).await?;

let (reader, writer) = socket.into_split();

Ok(TcpConnection {
Ok(AsyncTcpConnection {
reader: Mutex::new(AsyncPeekReader::new(reader)),
writer: Mutex::new(TcpWrite {
socket: writer,
Expand All @@ -53,15 +53,15 @@ pub async fn tcpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<TcpCon
})
}

pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<TcpConnection> {
pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncTcpConnection> {
let addr = get_socket_addr(address)?;
let listener = TcpListener::bind(addr).await?;

//For now we only accept one incoming stream: this yields until we get one
match listener.accept().await {
Ok((socket, _)) => {
let (reader, writer) = socket.into_split();
return Ok(TcpConnection {
return Ok(AsyncTcpConnection {
reader: Mutex::new(AsyncPeekReader::new(reader)),
writer: Mutex::new(TcpWrite {
socket: writer,
Expand All @@ -83,7 +83,7 @@ pub async fn tcpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<TcpConn
))
}

pub struct TcpConnection {
pub struct AsyncTcpConnection {
reader: Mutex<AsyncPeekReader<OwnedReadHalf>>,
writer: Mutex<TcpWrite>,
protocol_version: MavlinkVersion,
Expand All @@ -97,7 +97,7 @@ struct TcpWrite {
}

#[async_trait::async_trait]
impl<M: Message + Sync + Send> AsyncMavConnection<M> for TcpConnection {
impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncTcpConnection {
async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
let mut reader = self.reader.lock().await;
#[cfg(not(feature = "signing"))]
Expand Down
30 changes: 15 additions & 15 deletions mavlink-core/src/async_connection/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,46 @@ use crate::{

pub async fn select_protocol<M: Message + Sync + Send>(
address: &str,
) -> tokio::io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> {
) -> io::Result<Box<dyn AsyncMavConnection<M> + Sync + Send>> {
let connection = if let Some(address) = address.strip_prefix("udpin:") {
udpin(address).await
} else if let Some(address) = address.strip_prefix("udpout:") {
udpout(address).await
} else if let Some(address) = address.strip_prefix("udpbcast:") {
udpbcast(address).await
} else {
Err(tokio::io::Error::new(
tokio::io::ErrorKind::AddrNotAvailable,
Err(io::Error::new(
io::ErrorKind::AddrNotAvailable,
"Protocol unsupported",
))
};

Ok(Box::new(connection?))
}

pub async fn udpbcast<T: std::net::ToSocketAddrs>(address: T) -> tokio::io::Result<UdpConnection> {
pub async fn udpbcast<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncUdpConnection> {
let addr = get_socket_addr(address)?;
let socket = UdpSocket::bind("0.0.0.0:0").await?;
socket
.set_broadcast(true)
.expect("Couldn't bind to broadcast address.");
UdpConnection::new(socket, false, Some(addr))
AsyncUdpConnection::new(socket, false, Some(addr))
}

pub async fn udpout<T: std::net::ToSocketAddrs>(address: T) -> tokio::io::Result<UdpConnection> {
pub async fn udpout<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncUdpConnection> {
let addr = get_socket_addr(address)?;
let socket = UdpSocket::bind("0.0.0.0:0").await?;
UdpConnection::new(socket, false, Some(addr))
AsyncUdpConnection::new(socket, false, Some(addr))
}

pub async fn udpin<T: std::net::ToSocketAddrs>(address: T) -> tokio::io::Result<UdpConnection> {
pub async fn udpin<T: std::net::ToSocketAddrs>(address: T) -> io::Result<AsyncUdpConnection> {
let addr = address
.to_socket_addrs()
.unwrap()
.next()
.expect("Invalid address");
let socket = UdpSocket::bind(addr).await?;
UdpConnection::new(socket, true, None)
AsyncUdpConnection::new(socket, true, None)
}

struct UdpRead {
Expand All @@ -75,7 +75,7 @@ impl AsyncRead for UdpRead {
fn poll_read(
mut self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
buf: &mut io::ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if self.buffer.is_empty() {
let mut read_buffer = [0u8; MTU_SIZE];
Expand Down Expand Up @@ -115,7 +115,7 @@ struct UdpWrite {
sequence: u8,
}

pub struct UdpConnection {
pub struct AsyncUdpConnection {
reader: Mutex<AsyncPeekReader<UdpRead>>,
writer: Mutex<UdpWrite>,
protocol_version: MavlinkVersion,
Expand All @@ -124,12 +124,12 @@ pub struct UdpConnection {
signing_data: Option<SigningData>,
}

impl UdpConnection {
impl AsyncUdpConnection {
fn new(
socket: UdpSocket,
server: bool,
dest: Option<std::net::SocketAddr>,
) -> tokio::io::Result<Self> {
) -> io::Result<Self> {
let socket = Arc::new(socket);
Ok(Self {
server,
Expand All @@ -151,7 +151,7 @@ impl UdpConnection {
}

#[async_trait::async_trait]
impl<M: Message + Sync + Send> AsyncMavConnection<M> for UdpConnection {
impl<M: Message + Sync + Send> AsyncMavConnection<M> for AsyncUdpConnection {
async fn recv(&self) -> Result<(MavHeader, M), crate::error::MessageReadError> {
let mut reader = self.reader.lock().await;

Expand Down Expand Up @@ -238,7 +238,7 @@ impl<M: Message + Sync + Send> AsyncMavConnection<M> for UdpConnection {
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncReadExt;
use io::AsyncReadExt;

#[tokio::test]
async fn test_datagram_buffering() {
Expand Down

0 comments on commit b2c3cb4

Please sign in to comment.