From fd25d7ba4ec2436f099e9af0b6226552ad0df20a Mon Sep 17 00:00:00 2001 From: bayk Date: Sat, 7 Dec 2024 01:34:27 -0800 Subject: [PATCH] Updated Peers discovery --- chain/src/txhashset/headers_desegmenter.rs | 63 ++++++++++++++++++++-- p2p/src/store.rs | 7 ++- 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/chain/src/txhashset/headers_desegmenter.rs b/chain/src/txhashset/headers_desegmenter.rs index affb30ed6..fb8872866 100644 --- a/chain/src/txhashset/headers_desegmenter.rs +++ b/chain/src/txhashset/headers_desegmenter.rs @@ -379,6 +379,8 @@ impl HeadersRecieveCache { .insert(first_header.height, (bhs, peer_info)); // Apply data from cache if possible + let mut headers_all: Vec = Vec::new(); + let mut headers_by_peer: Vec<(Vec, T)> = Vec::new(); let tip = self .chain .header_head() @@ -399,13 +401,66 @@ impl HeadersRecieveCache { if *height > tip_height + 1 { break; } - let (_, (bhs, peer)) = self.main_headers_cache.pop_first().unwrap(); + let (_, (mut bhs, peer)) = self.main_headers_cache.pop_first().unwrap(); tip_height = bhs.last().expect("bhs can't be empty").height; - // Adding headers into the blockchian. Adding by 512 is optimal, DB not design to add large number of headers - match self.chain.sync_block_headers(&bhs, tip, Options::NONE) { + headers_by_peer.push((bhs.clone(), peer)); + headers_all.append(&mut bhs); + + if headers_all.len() > 5000 { + match self + .chain + .sync_block_headers(&headers_all, tip, Options::NONE) + { + Ok(_) => {} + Err(e) => { + warn!( + "add_headers in bulk is failed, will add one by one. Error: {}", + e + ); + // apply one by one + for (hdr, peer) in headers_by_peer { + let tip = self + .chain + .header_head() + .expect("Header head must be always defined"); + + match self.chain.sync_block_headers(&hdr, tip, Options::NONE) { + Ok(_) => {} + Err(e) => return Err((peer, e)), + } + } + } + } + headers_all = Vec::new(); + headers_by_peer = Vec::new(); + } + } + + if !headers_all.is_empty() { + match self + .chain + .sync_block_headers(&headers_all, tip, Options::NONE) + { Ok(_) => {} - Err(e) => return Err((peer, e)), + Err(e) => { + warn!( + "add_headers in bulk is failed, will add one by one. Error: {}", + e + ); + // apply one by one + for (hdr, peer) in headers_by_peer { + let tip = self + .chain + .header_head() + .expect("Header head must be always defined"); + + match self.chain.sync_block_headers(&hdr, tip, Options::NONE) { + Ok(_) => {} + Err(e) => return Err((peer, e)), + } + } + } } } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index 603e38f3f..c3494570f 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -174,9 +174,14 @@ impl PeerStore { cap: Capabilities, count: usize, ) -> Result, Error> { + // All new peers has flags Capabilities::UNKNOWN, that is why we better to return themn as well. + // Node will try to connect to them and find the capability. let mut peers = self .peers_iter()? - .filter(|p| p.flags == state && p.capabilities.contains(cap)) + .filter(|p| { + p.flags == state + && (p.capabilities == Capabilities::UNKNOWN || p.capabilities.contains(cap)) + }) .collect::>(); peers[..].shuffle(&mut thread_rng()); Ok(peers.iter().take(count).cloned().collect())