Skip to content

Commit

Permalink
Merge pull request #95 from mwcproject/v5.3.3/sync
Browse files Browse the repository at this point in the history
V5.3.3/sync
  • Loading branch information
bayk authored Dec 24, 2024
2 parents bec0612 + f2a17e6 commit 0634581
Show file tree
Hide file tree
Showing 10 changed files with 380 additions and 71 deletions.
30 changes: 27 additions & 3 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use mwc_core::ser;
use mwc_store::Error::NotFoundErr;
use mwc_util::secp::Secp256k1;
use mwc_util::{secp, ToHex};
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::fs::{self, File};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -164,13 +164,32 @@ impl OrphanBlockPool {
return orphans.remove(&header_hash);
}

fn contains(&self, hash: &Hash) -> bool {
/// Get list of ophan's hashes
pub fn get_orphan_list(&self) -> HashSet<Hash> {
self.orphans
.read()
.iter()
.map(|(k, _v)| k.clone())
.collect()
}

/// Check if orphans is in the list
pub fn contains(&self, hash: &Hash) -> bool {
self.orphans.read().contains_key(hash)
}

fn get_orphan(&self, hash: &Hash) -> Option<Orphan> {
/// Request orphan by hash
pub fn get_orphan(&self, hash: &Hash) -> Option<Orphan> {
self.orphans.read().get(hash).map(|o| o.clone())
}

/// Request orphan height and prev block hash. Alternative to get_orphan without much data copy
pub fn get_orphan_height_prev_hash(&self, hash: &Hash) -> Option<(Hash, u64)> {
self.orphans
.read()
.get(hash)
.map(|o| (o.block.header.prev_hash.clone(), o.block.header.height))
}
}

/// Facade to the blockchain block processing pipeline and storage. Provides
Expand Down Expand Up @@ -877,6 +896,11 @@ impl Chain {
})
}

/// Access to orphan pool
pub fn get_orphans_pool(&self) -> &Arc<OrphanBlockPool> {
&self.orphans
}

/// Check if hash is for a known orphan.
pub fn is_orphan(&self, hash: &Hash) -> bool {
self.orphans.contains(hash)
Expand Down
12 changes: 6 additions & 6 deletions p2p/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ impl PeerStore {
let peers_num = peers.len();
if peers_num > 1 {
peers.sort_by_key(|p| -p.last_connected);
// Then shuffle most of them
let shuffle_steps = peers_num / 4;
// Then shuffle every second of them
let mut rng = thread_rng();
for _ in 0..shuffle_steps {
let i1 = rng.gen_range(0, peers_num);
let i2 = rng.gen_range(0, peers_num);
peers.swap(i1, i2);
for i1 in (1..peers_num).step_by(2) {
if i1 + 2 < peers_num {
let i2 = rng.gen_range(i1 + 1, peers_num);
peers.swap(i1, i2);
}
}
}
Ok(peers)
Expand Down
68 changes: 48 additions & 20 deletions servers/src/common/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::core::core::{
use crate::core::pow::Difficulty;
use crate::core::ser::ProtocolVersion;
use crate::core::{core, global};
use crate::mwc::sync::get_locator_heights;
use crate::mwc::sync::sync_manager::SyncManager;
use crate::p2p;
use crate::p2p::types::PeerInfo;
Expand Down Expand Up @@ -362,7 +363,8 @@ where
debug!("header_received, cache for {} OK", bh_hash);
}

if self.chain().block_exists(&bh.hash())? {
let chain = self.chain();
if chain.block_exists(&bh.hash())? {
return Ok(true);
}
if !self.sync_state.is_syncing() {
Expand All @@ -373,7 +375,7 @@ where

// pushing the new block header through the header chain pipeline
// we will go ask for the block if this is a new header
let res = self.chain().process_block_header(&bh, chain::Options::NONE);
let res = chain.process_block_header(&bh, chain::Options::NONE);

if let Err(e) = res {
debug!("Block header {} refused by chain: {:?}", bh.hash(), e);
Expand All @@ -382,6 +384,18 @@ where
} else {
// we got an error when trying to process the block header
// but nothing serious enough to need to ban the peer upstream
// Probably child block doesn't exist, let's request them
if let Some(peer) = self.peers().get_connected_peer(&peer_info.addr) {
let head = chain.head()?;
debug!(
"Got unknown header, requesting headers from the peer {} at height {}",
peer_info.addr, head.height
);
let heights = get_locator_heights(head.height);
let locator = chain.get_locator_hashes(head, &heights)?;
let _ = peer.send_header_request(locator);
let _ = peer.send_block_request(bh.hash(), chain::Options::NONE);
}
return Err(e);
}
}
Expand Down Expand Up @@ -771,28 +785,29 @@ where
opts: chain::Options,
) -> Result<bool, chain::Error> {
// We cannot process blocks earlier than the horizon so check for this here.
{
let head = self.chain().head()?;
let chain = self.chain();
let head = {
let head = chain.head()?;
let horizon = head
.height
.saturating_sub(global::cut_through_horizon() as u64);
if b.header.height < horizon {
debug!("Got block is below horizon from peer {}", peer_info.addr);
return Ok(true);
}
}
head
};

let bhash = b.hash();
let previous = self.chain().get_previous_header(&b.header);

match self.chain().process_block(b, opts) {
match chain.process_block(b.clone(), opts) {
Ok(_) => {
self.validate_chain(&bhash);
self.check_compact();
self.sync_manager.recieve_block_reporting(
true,
&peer_info.addr,
&bhash,
b,
opts,
&self.peers(),
);
Ok(true)
Expand All @@ -803,27 +818,40 @@ where
self.sync_manager.recieve_block_reporting(
false,
&peer_info.addr,
&bhash,
b,
opts,
&self.peers(),
);
Ok(false)
}
Err(e) => {
self.sync_manager.recieve_block_reporting(
let prev_block_hash = b.header.prev_hash.clone();
let previous = chain.get_previous_header(&b.header);
let need_request_prev_block = self.sync_manager.recieve_block_reporting(
!e.is_bad_data(),
&peer_info.addr,
&bhash,
b,
opts,
&self.peers(),
);
match e {
chain::Error::Orphan(orph_msg) => {
if let Ok(previous) = previous {
// make sure we did not miss the parent block
if !self.chain().is_orphan(&previous.hash())
&& !self.sync_state.is_syncing()
{
debug!("process_block: received an orphan block: {}, checking the parent: {:}", orph_msg, previous.hash());
self.request_block(&previous, peer_info, chain::Options::NONE)
chain::Error::StoreErr(_, _) | chain::Error::Orphan(_) => {
if previous.is_err() {
// requesting headers from that peer
if let Some(peer) = self.peers().get_connected_peer(&peer_info.addr) {
debug!("Got block with unknow headers, requesting headers from the peer {} at height {}", peer_info.addr, head.height);
let heights = get_locator_heights(head.height);
let locator = chain.get_locator_hashes(head, &heights)?;
let _ = peer.send_header_request(locator);
}
}
if need_request_prev_block {
// requesting headers from that peer
if let Some(peer) = self.peers().get_connected_peer(&peer_info.addr) {
// requesting prev block from that peer
debug!("Got block with unknow child, requesting prev block {} from the peer {}", prev_block_hash, peer_info.addr);
let _ =
peer.send_block_request(prev_block_hash, chain::Options::NONE);
}
}
Ok(true)
Expand Down
67 changes: 38 additions & 29 deletions servers/src/mwc/seed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,7 @@ pub fn connect_and_monitor(

let mut connecting_history: HashMap<PeerAddr, DateTime<Utc>> = HashMap::new();

connect_to_seeds_and_peers(
peers.clone(),
tx.clone(),
seed_list.clone(),
config.clone(),
);
connect_to_seeds_and_peers(peers.clone(), tx.clone(), &seed_list, config.clone());
seed_connect_time = Utc::now() + Duration::seconds(CONNECT_TO_SEED_INTERVAL);

libp2p_connection::set_seed_list(&seed_list, true);
Expand Down Expand Up @@ -108,7 +103,7 @@ pub fn connect_and_monitor(
connect_to_seeds_and_peers(
peers.clone(),
tx.clone(),
seed_list.clone(),
&seed_list,
config.clone(),
);
seed_connect_time = now + Duration::seconds(CONNECT_TO_SEED_INTERVAL);
Expand All @@ -129,6 +124,7 @@ pub fn connect_and_monitor(
p2p_server.config.clone(),
use_tor_connection,
tx.clone(),
listen_q_addrs.is_empty(),
);

if peers.is_sync_mode() {
Expand All @@ -154,6 +150,7 @@ pub fn connect_and_monitor(
use_tor_connection,
&mut connection_threads,
&mut listen_q_addrs,
&seed_list,
);
let duration = if is_boost || !listen_q_addrs.is_empty() {
PEERS_CHECK_TIME_BOOST
Expand Down Expand Up @@ -186,6 +183,7 @@ fn monitor_peers(
config: p2p::P2PConfig,
use_tor_connection: bool,
tx: mpsc::Sender<PeerAddr>,
load_peers_from_db: bool,
) {
// regularly check if we need to acquire more peers and if so, gets
// them from db
Expand Down Expand Up @@ -289,23 +287,25 @@ fn monitor_peers(
let _ = peers.update_state(&peer.addr, p2p::State::Healthy);
}

// find some peers from our db
// and queue them up for a connection attempt
// intentionally make too many attempts (2x) as some (most?) will fail
// as many nodes in our db are not publicly accessible
let new_peers = peers.find_peers(p2p::State::Healthy, boost_peers_capabilities);

// Only queue up connection attempts for candidate peers where we
// are confident we do not yet know about this peer.
// The call to is_known() may fail due to contention on the peers map.
// Do not attempt any connection where is_known() fails for any reason.
let mut max_addresses = 0;
for p in new_peers {
if let Ok(false) = peers.is_known(&p.addr) {
tx.send(p.addr.clone()).unwrap();
max_addresses += 1;
if max_addresses > 200 {
break;
if load_peers_from_db {
// find some peers from our db
// and queue them up for a connection attempt
// intentionally make too many attempts (2x) as some (most?) will fail
// as many nodes in our db are not publicly accessible
let new_peers = peers.find_peers(p2p::State::Healthy, boost_peers_capabilities);

// Only queue up connection attempts for candidate peers where we
// are confident we do not yet know about this peer.
// The call to is_known() may fail due to contention on the peers map.
// Do not attempt any connection where is_known() fails for any reason.
let mut max_addresses = 0;
for p in new_peers {
if let Ok(false) = peers.is_known(&p.addr) {
tx.send(p.addr.clone()).unwrap();
max_addresses += 1;
if max_addresses > 20 {
break;
}
}
}
}
Expand All @@ -316,7 +316,7 @@ fn monitor_peers(
fn connect_to_seeds_and_peers(
peers: Arc<p2p::Peers>,
tx: mpsc::Sender<PeerAddr>,
seed_list: Vec<PeerAddr>,
seed_list: &Vec<PeerAddr>,
config: P2PConfig,
) {
let peers_deny = config.peers_deny.unwrap_or(PeerAddrs::default());
Expand Down Expand Up @@ -349,12 +349,17 @@ fn connect_to_seeds_and_peers(

// if so, get their addresses, otherwise use our seeds
let peer_addrs = if found_peers.len() > 3 {
found_peers
let mut peer_addrs = found_peers
.iter()
.map(|p| p.addr.clone())
.collect::<Vec<_>>()
.collect::<Vec<_>>();

if let Some(seed_addr) = seed_list.choose(&mut thread_rng()) {
peer_addrs.push(seed_addr.clone());
}
peer_addrs
} else {
seed_list
seed_list.clone()
};

if peer_addrs.is_empty() {
Expand All @@ -367,7 +372,7 @@ fn connect_to_seeds_and_peers(
if !peers_deny.as_slice().contains(&addr) {
let _ = tx.send(addr);
max_addresses += 1;
if max_addresses > 200 {
if max_addresses > 20 {
break;
}
}
Expand All @@ -385,6 +390,7 @@ fn listen_for_addrs(
use_tor_connection: bool,
connection_threads: &mut Vec<thread::JoinHandle<()>>,
listen_q_addrs: &mut Vec<PeerAddr>,
seed_list: &Vec<PeerAddr>,
) {
// Pull everything currently on the queue off the queue.
// Does not block so addrs may be empty.
Expand All @@ -397,6 +403,9 @@ fn listen_for_addrs(
listen_q_addrs.drain(0..listen_q_addrs.len() - PEER_MAX_INITIATE_CONNECTIONS * 5);
}
listen_q_addrs.append(&mut addrs);
if let Some(seed_adr) = seed_list.choose(&mut thread_rng()) {
listen_q_addrs.push(seed_adr.clone());
}
}

let now = Utc::now();
Expand Down
2 changes: 2 additions & 0 deletions servers/src/mwc/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
mod body_sync;
mod header_hashes_sync;
mod header_sync;
mod orphans_sync;
mod state_sync;
pub mod sync_manager;
mod sync_peers;
mod sync_utils;
mod syncer;
pub use header_sync::get_locator_heights;

pub use self::syncer::run_sync;
3 changes: 1 addition & 2 deletions servers/src/mwc/sync/body_sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// Copyright 2019 The Grin Developers
// Copyright 2024 The MWC Developers
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -74,7 +73,7 @@ impl BodySync {
let max_avail_height = cmp::min(best_height, header_head.height);

// Last few blocks no need to sync, new mined blocks will be synced regular way
if head.height > max_avail_height.saturating_sub(3) {
if head.height > max_avail_height.saturating_sub(7) {
// Expected by QT wallet
info!(
"synchronized at {} @ {} [{}]",
Expand Down
Loading

0 comments on commit 0634581

Please sign in to comment.