From cdd585f14dcadcc90d3655ca69e2c8cbc7ddd751 Mon Sep 17 00:00:00 2001 From: Tiago Marques Date: Wed, 24 Apr 2024 12:13:45 +0100 Subject: [PATCH] Added (back) buffering to UDP's Read implementation --- mavlink-core/src/connection/udp.rs | 62 ++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/mavlink-core/src/connection/udp.rs b/mavlink-core/src/connection/udp.rs index c0db512da7..b353ea1ba1 100644 --- a/mavlink-core/src/connection/udp.rs +++ b/mavlink-core/src/connection/udp.rs @@ -1,3 +1,5 @@ +use std::collections::VecDeque; + use crate::connection::MavConnection; use crate::peek_reader::PeekReader; use crate::{read_versioned_msg, write_versioned_msg, MavHeader, MavlinkVersion, Message}; @@ -57,15 +59,24 @@ pub fn udpin(address: T) -> io::Result { struct UdpRead { socket: UdpSocket, + buffer: VecDeque, last_recv_address: Option, } +const MTU_SIZE: usize = 1500; impl Read for UdpRead { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.socket.recv_from(buf).map(|(n, addr)| { - self.last_recv_address = Some(addr); - n - }) + if !self.buffer.is_empty() { + self.buffer.read(buf) + } else { + let mut read_buffer = [0u8; MTU_SIZE]; + let (n_buffer, address) = self.socket.recv_from(&mut read_buffer)?; + let n = (&read_buffer[0..n_buffer]).read(buf)?; + self.buffer.extend(&read_buffer[n..n_buffer]); + + self.last_recv_address = Some(address); + Ok(n) + } } } @@ -88,6 +99,7 @@ impl UdpConnection { server, reader: Mutex::new(PeekReader::new(UdpRead { socket: socket.try_clone()?, + buffer: VecDeque::new(), last_recv_address: None, })), writer: Mutex::new(UdpWrite { @@ -148,3 +160,45 @@ impl MavConnection for UdpConnection { self.protocol_version } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_datagram_buffering() { + let receiver_socket = UdpSocket::bind("127.0.0.1:5000").unwrap(); + let mut udp_reader = UdpRead { + socket: receiver_socket.try_clone().unwrap(), + buffer: VecDeque::new(), + last_recv_address: None, + }; + let sender_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + sender_socket.connect("127.0.0.1:5000").unwrap(); + + let datagram: Vec = (0..50).collect::>(); + + let mut n_sent = sender_socket.send(&datagram).unwrap(); + assert_eq!(n_sent, datagram.len()); + n_sent = sender_socket.send(&datagram).unwrap(); + assert_eq!(n_sent, datagram.len()); + + let mut buf = [0u8; 30]; + + let mut n_read = udp_reader.read(&mut buf).unwrap(); + assert_eq!(n_read, 30); + assert_eq!(&buf[0..n_read], (0..30).collect::>().as_slice()); + + n_read = udp_reader.read(&mut buf).unwrap(); + assert_eq!(n_read, 20); + assert_eq!(&buf[0..n_read], (30..50).collect::>().as_slice()); + + n_read = udp_reader.read(&mut buf).unwrap(); + assert_eq!(n_read, 30); + assert_eq!(&buf[0..n_read], (0..30).collect::>().as_slice()); + + n_read = udp_reader.read(&mut buf).unwrap(); + assert_eq!(n_read, 20); + assert_eq!(&buf[0..n_read], (30..50).collect::>().as_slice()); + } +}