Skip to content

Commit

Permalink
Added (back) buffering to UDP's Read implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
tjrmarques authored and patrickelectric committed Jun 20, 2024
1 parent 96079f4 commit cdd585f
Showing 1 changed file with 58 additions and 4 deletions.
62 changes: 58 additions & 4 deletions mavlink-core/src/connection/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::VecDeque;

use crate::connection::MavConnection;
use crate::peek_reader::PeekReader;
use crate::{read_versioned_msg, write_versioned_msg, MavHeader, MavlinkVersion, Message};
Expand Down Expand Up @@ -57,15 +59,24 @@ pub fn udpin<T: ToSocketAddrs>(address: T) -> io::Result<UdpConnection> {

struct UdpRead {
socket: UdpSocket,
buffer: VecDeque<u8>,
last_recv_address: Option<SocketAddr>,
}

const MTU_SIZE: usize = 1500;
impl Read for UdpRead {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.socket.recv_from(buf).map(|(n, addr)| {
self.last_recv_address = Some(addr);
n
})
if !self.buffer.is_empty() {
self.buffer.read(buf)
} else {
let mut read_buffer = [0u8; MTU_SIZE];
let (n_buffer, address) = self.socket.recv_from(&mut read_buffer)?;
let n = (&read_buffer[0..n_buffer]).read(buf)?;
self.buffer.extend(&read_buffer[n..n_buffer]);

self.last_recv_address = Some(address);
Ok(n)
}
}
}

Expand All @@ -88,6 +99,7 @@ impl UdpConnection {
server,
reader: Mutex::new(PeekReader::new(UdpRead {
socket: socket.try_clone()?,
buffer: VecDeque::new(),
last_recv_address: None,
})),
writer: Mutex::new(UdpWrite {
Expand Down Expand Up @@ -148,3 +160,45 @@ impl<M: Message> MavConnection<M> for UdpConnection {
self.protocol_version
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_datagram_buffering() {
let receiver_socket = UdpSocket::bind("127.0.0.1:5000").unwrap();
let mut udp_reader = UdpRead {
socket: receiver_socket.try_clone().unwrap(),
buffer: VecDeque::new(),
last_recv_address: None,
};
let sender_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
sender_socket.connect("127.0.0.1:5000").unwrap();

let datagram: Vec<u8> = (0..50).collect::<Vec<_>>();

let mut n_sent = sender_socket.send(&datagram).unwrap();
assert_eq!(n_sent, datagram.len());
n_sent = sender_socket.send(&datagram).unwrap();
assert_eq!(n_sent, datagram.len());

let mut buf = [0u8; 30];

let mut n_read = udp_reader.read(&mut buf).unwrap();
assert_eq!(n_read, 30);
assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());

n_read = udp_reader.read(&mut buf).unwrap();
assert_eq!(n_read, 20);
assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());

n_read = udp_reader.read(&mut buf).unwrap();
assert_eq!(n_read, 30);
assert_eq!(&buf[0..n_read], (0..30).collect::<Vec<_>>().as_slice());

n_read = udp_reader.read(&mut buf).unwrap();
assert_eq!(n_read, 20);
assert_eq!(&buf[0..n_read], (30..50).collect::<Vec<_>>().as_slice());
}
}

0 comments on commit cdd585f

Please sign in to comment.