From e22f3a47a8b78a7b2b61b5c852ae1165df7499a4 Mon Sep 17 00:00:00 2001 From: Hugo Tunius Date: Thu, 10 Nov 2022 11:05:03 +0000 Subject: [PATCH] Add test for NACK responder max age --- interceptor/src/mock/mock_stream.rs | 11 +- interceptor/src/nack/responder/mod.rs | 23 ++-- .../src/nack/responder/responder_stream.rs | 7 +- .../src/nack/responder/responder_test.rs | 106 +++++++++++++++--- 4 files changed, 120 insertions(+), 27 deletions(-) diff --git a/interceptor/src/mock/mock_stream.rs b/interceptor/src/mock/mock_stream.rs index bd1cec881..0ec2f4641 100644 --- a/interceptor/src/mock/mock_stream.rs +++ b/interceptor/src/mock/mock_stream.rs @@ -210,12 +210,21 @@ impl MockStream { last } - /// written_rtp returns a channel containing rtp packets written, modified by the interceptor + /// Wait for a written RTP packet to appear after traversing interceptor chains. pub async fn written_rtp(&self) -> Option { let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await; rtp_out_modified_rx.recv().await } + /// Assert that a RTP packet has traversed interceptor chains. + /// + /// Like [`writte_rtp`] but does not wait. + pub async fn written_rtp_expected(&self) -> Option { + let mut rtp_out_modified_rx = self.rtp_out_modified_rx.lock().await; + + rtp_out_modified_rx.try_recv().ok() + } + /// read_rtcp returns a channel containing the rtcp batched read, modified by the interceptor pub async fn read_rtcp( &self, diff --git a/interceptor/src/nack/responder/mod.rs b/interceptor/src/nack/responder/mod.rs index 7498985fb..73ef9670f 100644 --- a/interceptor/src/nack/responder/mod.rs +++ b/interceptor/src/nack/responder/mod.rs @@ -89,19 +89,24 @@ impl ResponderInternal { let stream3 = Arc::clone(&stream2); Box::pin(async move { - if let Some(p) = stream3.get(seq).await { - let should_send = max_packet_age - .map(|max_age| p.age() < max_age) - .unwrap_or(true); - + let p = match stream3.get(seq).await { + None => return true, + Some(p) => p, + }; + + if let Some(max_packet_age) = max_packet_age { + let packet_age = p.age(); + let should_send = packet_age < max_packet_age; if !should_send { + log::debug!("Not resending packet {} as it's older than the configured max age {}s. Packet was initially sent {}s ago", p.packet.header.sequence_number, max_packet_age.as_secs_f64(), packet_age.as_secs_f64()); return true; } + } - let a = Attributes::new(); - if let Err(err) = stream3.next_rtp_writer.write(&p.packet, &a).await { - log::warn!("failed resending nacked packet: {}", err); - } + + let a = Attributes::new(); + if let Err(err) = stream3.next_rtp_writer.write(&p.packet, &a).await { + log::warn!("failed resending nacked packet: {}", err); } true diff --git a/interceptor/src/nack/responder/responder_stream.rs b/interceptor/src/nack/responder/responder_stream.rs index f036c1a4a..6619e1ad6 100644 --- a/interceptor/src/nack/responder/responder_stream.rs +++ b/interceptor/src/nack/responder/responder_stream.rs @@ -2,10 +2,12 @@ use crate::error::Result; use crate::nack::UINT16SIZE_HALF; use crate::{Attributes, RTPWriter}; -use async_trait::async_trait; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; + +use async_trait::async_trait; use tokio::sync::Mutex; +use tokio::time::Instant; struct ResponderStreamInternal { packets: Vec>, @@ -101,6 +103,7 @@ impl RTPWriter for ResponderStream { /// A packet that has been sent, or at least been queued to send. pub struct SentPacket { pub(super) packet: rtp::packet::Packet, + // We use tokio's instant because it's mockable. sent_at: Instant, } diff --git a/interceptor/src/nack/responder/responder_test.rs b/interceptor/src/nack/responder/responder_test.rs index b8f308cf7..e93d3ce73 100644 --- a/interceptor/src/nack/responder/responder_test.rs +++ b/interceptor/src/nack/responder/responder_test.rs @@ -1,12 +1,11 @@ use super::*; use crate::mock::mock_stream::MockStream; use crate::stream_info::RTCPFeedback; -use crate::test::timeout_or_fail; use tokio::time::Duration; use rtcp::transport_feedbacks::transport_layer_nack::{NackPair, TransportLayerNack}; -#[tokio::test] +#[tokio::test(start_paused = true)] async fn test_responder_interceptor() -> Result<()> { let icpr: Arc = Responder::builder().with_log2_size(3).build("")?; @@ -35,9 +34,13 @@ async fn test_responder_interceptor() -> Result<()> { }) .await?; - let p = timeout_or_fail(Duration::from_millis(10), stream.written_rtp()) + // Let the packet be pulled through interceptor chains + tokio::task::yield_now().await; + + let p = stream + .written_rtp_expected() .await - .expect("A packet"); + .expect("Packet should have been written"); assert_eq!(seq_num, p.header.sequence_number); } @@ -53,24 +56,97 @@ async fn test_responder_interceptor() -> Result<()> { ], })]) .await; + tokio::time::advance(Duration::from_millis(50)).await; + // Let the NACK task do its thing + tokio::task::yield_now().await; // seq number 13 was never sent, so it can't be resent for seq_num in [11, 12, 15] { - if let Ok(r) = tokio::time::timeout(Duration::from_millis(50), stream.written_rtp()).await { - if let Some(p) = r { - assert_eq!(seq_num, p.header.sequence_number); - } else { - assert!( - false, - "seq_num {} is not sent due to channel closed", - seq_num - ); - } + let p = stream + .written_rtp_expected() + .await + .expect("Packet should have been written"); + assert_eq!(seq_num, p.header.sequence_number); + } + + let result = stream.written_rtp_expected().await; + assert!(result.is_none(), "no more rtp packets expected"); + + stream.close().await?; + + Ok(()) +} + +#[tokio::test(start_paused = true)] +async fn test_responder_interceptor_with_max_age() -> Result<()> { + let icpr: Arc = Responder::builder() + .with_log2_size(3) + .with_max_packet_age(Duration::from_millis(400)) + .build("")?; + + let stream = MockStream::new( + &StreamInfo { + ssrc: 1, + rtcp_feedback: vec![RTCPFeedback { + typ: "nack".to_owned(), + ..Default::default() + }], + ..Default::default() + }, + icpr, + ) + .await; + + for seq_num in [10, 11, 12, 14, 15] { + stream + .write_rtp(&rtp::packet::Packet { + header: rtp::header::Header { + sequence_number: seq_num, + ..Default::default() + }, + ..Default::default() + }) + .await?; + tokio::time::advance(Duration::from_millis(30)).await; + tokio::task::yield_now().await; + + let p = stream.written_rtp().await.expect("A packet"); + assert_eq!(seq_num, p.header.sequence_number); + } + + // Advance time 300ms. Packets 10 and 11 will now have been sent 450ms and 420ms ago + // respectively. + tokio::time::advance(Duration::from_millis(300)).await; + + stream + .receive_rtcp(vec![Box::new(TransportLayerNack { + media_ssrc: 1, + sender_ssrc: 2, + nacks: vec![ + NackPair { + packet_id: 10, + lost_packets: 0b10111, + }, // sequence numbers: 11, 12, 13, 15 + ], + })]) + .await; + tokio::task::yield_now().await; + + // seq number 13 was never sent and seq number 10 and 11 is too late to resend now. + for seq_num in [12, 15] { + if let Some(p) = stream.written_rtp().await { + assert_eq!(seq_num, p.header.sequence_number); } else { - assert!(false, "seq_num {} is not sent yet", seq_num); + assert!( + false, + "seq_num {} is not sent due to channel closed", + seq_num + ); } } + // Resume time + tokio::time::resume(); let result = tokio::time::timeout(Duration::from_millis(10), stream.written_rtp()).await; assert!(result.is_err(), "no more rtp packets expected");