diff --git a/chain/src/chain.rs b/chain/src/chain.rs index f970e07b9..836ad8deb 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -1289,8 +1289,10 @@ impl Chain { } txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| { - ext.extension.rewind(header, batch)?; - Ok(ext.extension.build_bitmap_accumulator()?) + let extension = &mut ext.extension; + let header_extension = &mut ext.header_extension; + extension.rewind(header, batch, header_extension)?; + Ok(extension.build_bitmap_accumulator()?) })? }; @@ -2295,6 +2297,11 @@ fn setup_head( // delete the "bad" block and try again. let prev_header = batch.get_block_header(&head.prev_block_h)?; + warn!( + "Corrupted MMR. Tryin to recover it by rewinding blocks to height {}", + prev_header.height + ); + txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| { pipe::rewind_and_apply_fork(&prev_header, ext, batch, &|_| Ok(()), secp) })?; diff --git a/chain/src/pipe.rs b/chain/src/pipe.rs index 5978a461b..1eb14e94d 100644 --- a/chain/src/pipe.rs +++ b/chain/src/pipe.rs @@ -801,7 +801,7 @@ pub fn rewind_and_apply_fork( current = batch.get_previous_header(¤t)?; } let fork_point = current; - extension.rewind(&fork_point, batch)?; + extension.rewind(&fork_point, batch, header_extension)?; // Then apply all full blocks since this common ancestor // to put txhashet extension in a state to accept the new block. diff --git a/chain/src/txhashset/desegmenter.rs b/chain/src/txhashset/desegmenter.rs index 79e550d6e..2aad0f74e 100644 --- a/chain/src/txhashset/desegmenter.rs +++ b/chain/src/txhashset/desegmenter.rs @@ -358,7 +358,8 @@ impl Desegmenter { &mut batch, |ext, batch| { let extension = &mut ext.extension; - extension.rewind(&self.archive_header, batch)?; + let header_extension = &mut ext.header_extension; + extension.rewind(&self.archive_header, batch, header_extension)?; // Validate the extension, generating the utxo_sum and kernel_sum. // Full validation, including rangeproofs and kernel signature verification. diff --git a/chain/src/txhashset/txhashset.rs b/chain/src/txhashset/txhashset.rs index 40a8b866e..1b30d27cf 100644 --- a/chain/src/txhashset/txhashset.rs +++ b/chain/src/txhashset/txhashset.rs @@ -1532,7 +1532,12 @@ impl<'a> Extension<'a> { /// Rewinds the MMRs to the provided block, rewinding to the last output pos /// and last kernel pos of that block. If `updated_bitmap` is supplied, the /// bitmap accumulator will be replaced with its contents - pub fn rewind(&mut self, header: &BlockHeader, batch: &Batch<'_>) -> Result<(), Error> { + pub fn rewind( + &mut self, + header: &BlockHeader, + batch: &Batch<'_>, + header_ext: &HeaderExtension<'_>, + ) -> Result<(), Error> { debug!( "Rewind extension to {} at {} from {} at {}", header.hash(), @@ -1556,7 +1561,7 @@ impl<'a> Extension<'a> { let mut current = head_header; while header.height < current.height { let block = batch.get_block(¤t.hash())?; - self.rewind_single_block(&block, batch)?; + self.rewind_single_block(&block, batch, header_ext)?; current = batch.get_previous_header(¤t)?; } } @@ -1570,7 +1575,12 @@ impl<'a> Extension<'a> { // Rewind the MMRs and the output_pos index. // Returns a vec of "affected_pos" so we can apply the necessary updates to the bitmap // accumulator in a single pass for all rewound blocks. - fn rewind_single_block(&mut self, block: &Block, batch: &Batch<'_>) -> Result<(), Error> { + fn rewind_single_block( + &mut self, + block: &Block, + batch: &Batch<'_>, + header_ext: &HeaderExtension<'_>, + ) -> Result<(), Error> { let header = &block.header; let prev_header = batch.get_previous_header(&header)?; @@ -1585,8 +1595,19 @@ impl<'a> Extension<'a> { header.hash(), header.height ); - let bitmap = batch.get_block_input_bitmap(&header.hash())?; - bitmap.iter().map(|x| x.into()).collect() + if let Ok(bitmap) = batch.get_block_input_bitmap(&header.hash()) { + bitmap.iter().map(|x| x.into()).collect() + } else { + warn!( + "rewind_single_block: fallback to calculating spent inputs for block {} at {}", + header.hash(), + header.height + ); + let spent = self + .utxo_view(header_ext) + .validate_inputs(&block.inputs(), batch)?; + spent.into_iter().map(|(_, pos)| pos.pos).collect() + } }; if header.height == 0 { diff --git a/core/src/core/pmmr/pmmr.rs b/core/src/core/pmmr/pmmr.rs index 199c0e8c9..f94021924 100644 --- a/core/src/core/pmmr/pmmr.rs +++ b/core/src/core/pmmr/pmmr.rs @@ -307,6 +307,10 @@ where // position is a leaf. We traverse the MMR to include any parent(s) that // need to be included for the MMR to be valid. let leaf_pos = round_up_to_leaf_pos(position); + if leaf_pos > self.size { + warn!("Invalid attempt tp rewind PMMR!!! Data might be corrupted!!!"); + return Ok(()); + } self.backend.rewind(leaf_pos, rewind_rm_pos)?; self.size = leaf_pos; Ok(()) diff --git a/p2p/src/store.rs b/p2p/src/store.rs index b49680e5b..13ade8834 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -222,7 +222,7 @@ impl PeerStore { )?; if peer.flags != new_state { - info!( + debug!( "Changing peer {:?} state form {:?} to {:?}", peer_addr, peer.flags, new_state ); diff --git a/servers/src/mwc/seed.rs b/servers/src/mwc/seed.rs index e4caf6eb6..56dad59a9 100644 --- a/servers/src/mwc/seed.rs +++ b/servers/src/mwc/seed.rs @@ -33,7 +33,6 @@ use mwc_p2p::{msg::PeerAddrs, Capabilities, P2PConfig}; use rand::prelude::*; use std::collections::HashMap; use std::net::ToSocketAddrs; -use std::sync::atomic::{AtomicI32, Ordering}; use std::sync::{mpsc, Arc}; use std::{thread, time}; @@ -85,9 +84,9 @@ pub fn connect_and_monitor( libp2p_connection::set_seed_list(&seed_list, true); let mut prev_ping = Utc::now(); - let connections_in_action = Arc::new(AtomicI32::new(0)); let mut listen_q_addrs: Vec = Vec::new(); + let mut connection_threads: Vec> = Vec::new(); loop { if stop_state.is_stopped() { @@ -153,7 +152,7 @@ pub fn connect_and_monitor( &rx, &mut connecting_history, use_tor_connection, - &connections_in_action, + &mut connection_threads, &mut listen_q_addrs, ); let duration = if is_boost || !listen_q_addrs.is_empty() { @@ -384,7 +383,7 @@ fn listen_for_addrs( rx: &mpsc::Receiver, connecting_history: &mut HashMap>, use_tor_connection: bool, - connections_in_action: &Arc, + connection_threads: &mut Vec>, listen_q_addrs: &mut Vec, ) { // Pull everything currently on the queue off the queue. @@ -410,9 +409,10 @@ fn listen_for_addrs( // If we have a healthy number of outbound peers then we are done here. debug_assert!(!peers.enough_outbound_peers()); + connection_threads.retain(|h| !h.is_finished()); + while !listen_q_addrs.is_empty() { - debug_assert!(connections_in_action.load(Ordering::Relaxed) >= 0); - if connections_in_action.load(Ordering::Relaxed) > PEER_MAX_INITIATE_CONNECTIONS as i32 { + if connection_threads.len() > PEER_MAX_INITIATE_CONNECTIONS { break; } @@ -425,81 +425,74 @@ fn listen_for_addrs( connecting_history.insert(addr.clone(), now); + if p2p.socks_port == 0 { + match &addr { + Onion(_) => { + continue; + } + _ => {} + } + } + let addr_c = addr.clone(); let peers_c = peers.clone(); let p2p_c = p2p.clone(); - let connections_in_action = connections_in_action.clone(); - thread::Builder::new() + let thr = thread::Builder::new() .name("peer_connect".to_string()) .spawn(move || { // if we don't have a socks port, and it's onion, don't set as defunct because // we don't know. - let update_possible = if p2p_c.socks_port == 0 { - match addr_c.clone() { - Onion(_) => false, - _ => true, - } - } else { - true - }; - - if update_possible { - let _ = connections_in_action.fetch_add(1, Ordering::Relaxed); - match p2p_c.connect(&addr_c) { - Ok(p) => { - debug!( - "New peer {} is connected as outbound! Capability: {:b}", - p.info.addr, p.info.capabilities - ); - // If peer advertizes PEER_LIST then ask it for more peers that support PEER_LIST. - // We want to build a local db of possible peers to connect to. - // We do not necessarily care (at this point in time) what other capabilities these peers support. - if p.info.capabilities.contains(Capabilities::PEER_LIST) { - debug!("Sending peer request to {}", addr_c); - match p.send_peer_request( - Capabilities::PEER_LIST - | peers_c.get_boost_peers_capabilities(), - use_tor_connection, - ) { - Ok(_) => { - match addr_c { - PeerAddr::Onion(_) => { - if let Err(_) = - libp2p_connection::add_new_peer(&addr_c) - { - error!("Unable to add libp2p peer {}", addr_c); - } + match p2p_c.connect(&addr_c) { + Ok(p) => { + debug!( + "New peer {} is connected as outbound! Capability: {:b}", + p.info.addr, p.info.capabilities + ); + // If peer advertizes PEER_LIST then ask it for more peers that support PEER_LIST. + // We want to build a local db of possible peers to connect to. + // We do not necessarily care (at this point in time) what other capabilities these peers support. + if p.info.capabilities.contains(Capabilities::PEER_LIST) { + debug!("Sending peer request to {}", addr_c); + match p.send_peer_request( + Capabilities::PEER_LIST | peers_c.get_boost_peers_capabilities(), + use_tor_connection, + ) { + Ok(_) => { + match addr_c { + PeerAddr::Onion(_) => { + if let Err(_) = libp2p_connection::add_new_peer(&addr_c) + { + error!("Unable to add libp2p peer {}", addr_c); } - _ => (), - }; - } - Err(e) => { - error!( - "Failed send_peer_request to {}, Error: {}", - p.info.addr, e - ); - } + } + _ => (), + }; + } + Err(e) => { + error!( + "Failed send_peer_request to {}, Error: {}", + p.info.addr, e + ); } } - // Requesting ping as well, need to know the height asap - let total_diff = - peers_c.total_difficulty().unwrap_or(Difficulty::zero()); - let total_height = peers_c.total_height().unwrap_or(0); - if let Err(e) = p.send_ping(total_diff, total_height) { - error!("Failed send_ping to {}, Error: {}", p.info.addr, e); - } - - let _ = peers_c.update_state(&addr_c, p2p::State::Healthy); } - Err(e) => { - info!("Connection to the peer {} was rejected, {}", addr_c, e); - let _ = peers_c.update_state(&addr_c, p2p::State::Defunct); + // Requesting ping as well, need to know the height asap + let total_diff = peers_c.total_difficulty().unwrap_or(Difficulty::zero()); + let total_height = peers_c.total_height().unwrap_or(0); + if let Err(e) = p.send_ping(total_diff, total_height) { + error!("Failed send_ping to {}, Error: {}", p.info.addr, e); } + + let _ = peers_c.update_state(&addr_c, p2p::State::Healthy); + } + Err(e) => { + debug!("Connection to the peer {} was rejected, {}", addr_c, e); + let _ = peers_c.update_state(&addr_c, p2p::State::Defunct); } - let _ = connections_in_action.fetch_sub(1, Ordering::Relaxed); } }) .expect("failed to launch peer_connect thread"); + connection_threads.push(thr); } }