diff --git a/Cargo.lock b/Cargo.lock index 34ff8c08..7fc834be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1341,9 +1341,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "foca" -version = "0.12.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c5dd570e620d5458c4ab78f0d7dec99e8ec9c203684f257f6ad94b35d7d210f" +checksum = "73879bd4f1b8e2a1a39886488ac24641b20ade95238726325d516ae98d1ed00a" dependencies = [ "anyhow", "bincode", diff --git a/Cargo.toml b/Cargo.toml index 4f4c6cc2..637f6937 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ crc32fast = "1.3.2" enquote = "1.1.0" eyre = "0.6.8" fallible-iterator = "0.2.0" -foca = { version = "0.12.0", features = ["std", "tracing", "bincode-codec", "serde"] } +foca = { version = "0.15.0", features = ["std", "tracing", "bincode-codec", "serde"] } futures = "0.3.28" futures-util = "0.3.28" hex = "0.4.3" diff --git a/crates/corro-agent/src/api/peer.rs b/crates/corro-agent/src/api/peer.rs index 8e1dd3ee..b7a59ae6 100644 --- a/crates/corro-agent/src/api/peer.rs +++ b/crates/corro-agent/src/api/peer.rs @@ -1,5 +1,5 @@ use std::cmp; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::net::SocketAddr; use std::ops::RangeInclusive; use std::sync::Arc; @@ -337,6 +337,8 @@ async fn process_range( }; for (versions, known_version) in overlapping { + debug!("got overlapping range {versions:?} in {range:?}"); + // optimization, cleared versions can't be revived... sending a single batch! if let KnownDbVersion::Cleared = &known_version { sender @@ -345,9 +347,11 @@ async fn process_range( changeset: Changeset::Empty { versions }, }))) .await?; - return Ok(()); + continue; } + let mut processed = BTreeSet::new(); + for version in versions { let bw = booked.write().await; if let Some(known_version) = bw.get(&version) { @@ -361,8 +365,11 @@ async fn process_range( sender, ) .await?; + processed.insert(version); } } + + debug!("processed versions {processed:?}"); } Ok(())