Skip to content

Commit

Permalink
Fix issues with checksum reports arriving out of order
Browse files Browse the repository at this point in the history
  • Loading branch information
johanhelsing committed Oct 23, 2023
1 parent 7444e3e commit 06f4f30
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 18 deletions.
24 changes: 17 additions & 7 deletions src/network/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::network::messages::{
QualityReply, QualityReport, SyncReply, SyncRequest,
};
use crate::time_sync::TimeSync;
use crate::{Config, Frame, GGRSError, NonBlockingSocket, PlayerHandle, NULL_FRAME};
use crate::{
Config, DesyncDetection, Frame, GGRSError, NonBlockingSocket, PlayerHandle, NULL_FRAME,
};

use instant::{Duration, Instant};
use std::collections::vec_deque::Drain;
Expand Down Expand Up @@ -175,7 +177,8 @@ where
last_recv_time: Instant,

// debug desync
pub(crate) pending_checksums: VecDeque<(Frame, u128)>,
pub(crate) pending_checksums: HashMap<Frame, u128>,
desync_detection: DesyncDetection,
}

impl<T: Config> PartialEq for UdpProtocol<T> {
Expand All @@ -194,6 +197,7 @@ impl<T: Config> UdpProtocol<T> {
disconnect_timeout: Duration,
disconnect_notify_start: Duration,
fps: usize,
desync_detection: DesyncDetection,
) -> Self {
let mut magic = rand::random::<u16>();
while magic == 0 {
Expand Down Expand Up @@ -260,7 +264,8 @@ impl<T: Config> UdpProtocol<T> {
last_recv_time: Instant::now(),

// debug desync
pending_checksums: VecDeque::new(),
pending_checksums: HashMap::new(),
desync_detection,
}
}

Expand Down Expand Up @@ -708,10 +713,15 @@ impl<T: Config> UdpProtocol<T> {

/// Upon receiving a `ChecksumReport`, add it to the checksum history
fn on_checksum_report(&mut self, body: &ChecksumReport) {
self.pending_checksums
.truncate(MAX_CHECKSUM_HISTORY_SIZE - 1);
self.pending_checksums
.push_front((body.frame, body.checksum));
if self.pending_checksums.len() >= MAX_CHECKSUM_HISTORY_SIZE {
if let DesyncDetection::On { interval } = self.desync_detection {
let oldest_frame_to_keep =
body.frame - (MAX_CHECKSUM_HISTORY_SIZE as i32 - 1) * interval as i32;
self.pending_checksums
.retain(|&frame, _| frame >= oldest_frame_to_keep);
}
}
self.pending_checksums.insert(body.frame, body.checksum);
}

/// Returns the frame of the last received input
Expand Down
2 changes: 2 additions & 0 deletions src/sessions/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ impl<T: Config> SessionBuilder<T> {
self.disconnect_timeout,
self.disconnect_notify_start,
self.fps,
DesyncDetection::Off,
);
host.synchronize();
SpectatorSession::new(
Expand Down Expand Up @@ -369,6 +370,7 @@ impl<T: Config> SessionBuilder<T> {
self.disconnect_timeout,
self.disconnect_notify_start,
self.fps,
self.desync_detection,
);
// start the synchronization
endpoint.synchronize();
Expand Down
23 changes: 12 additions & 11 deletions src/sessions/p2p_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,18 +877,16 @@ impl<T: Config> P2PSession<T> {
match self.desync_detection {
DesyncDetection::On { .. } => {
for remote in self.player_reg.remotes.values_mut() {
while remote.pending_checksums.len() > 0 {
// todo: clean up when/if a drain filter variant is added for VecDeque
let (remote_frame, _) = remote.pending_checksums.back().unwrap();
if *remote_frame >= self.sync_layer.last_confirmed_frame() {
break;
let mut checked_frames = Vec::new();

for (&remote_frame, &remote_checksum) in &remote.pending_checksums {
if remote_frame >= self.sync_layer.last_confirmed_frame() {
// we're still waiting for inputs for this frame
continue;
}
if let Some(&local_checksum) =
self.local_checksum_history.get(&remote_frame)
{
let (remote_frame, remote_checksum) =
remote.pending_checksums.pop_back().unwrap();

if local_checksum != remote_checksum {
self.event_queue.push_back(GGRSEvent::DesyncDetected {
frame: remote_frame,
Expand All @@ -897,10 +895,13 @@ impl<T: Config> P2PSession<T> {
addr: remote.peer_addr(),
});
}
} else {
break;
checked_frames.push(remote_frame);
}
}

for frame in checked_frames {
remote.pending_checksums.remove_entry(&frame);
}
}
}
DesyncDetection::Off => (),
Expand Down Expand Up @@ -937,7 +938,7 @@ impl<T: Config> P2PSession<T> {
let oldest_frame_to_keep = frame_to_send
- (MAX_CHECKSUM_HISTORY_SIZE as i32 - 1) * interval as i32;
self.local_checksum_history
.retain(|&frame, _| frame >= oldest_frame_to_keep as i32);
.retain(|&frame, _| frame >= oldest_frame_to_keep);
}
}
}
Expand Down

0 comments on commit 06f4f30

Please sign in to comment.