Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated blockchain data recovery. #92

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,8 +1289,10 @@ impl Chain {
}

txhashset::extending_readonly(&mut header_pmmr, &mut txhashset, |ext, batch| {
ext.extension.rewind(header, batch)?;
Ok(ext.extension.build_bitmap_accumulator()?)
let extension = &mut ext.extension;
let header_extension = &mut ext.header_extension;
extension.rewind(header, batch, header_extension)?;
Ok(extension.build_bitmap_accumulator()?)
})?
};

Expand Down Expand Up @@ -2295,6 +2297,11 @@ fn setup_head(
// delete the "bad" block and try again.
let prev_header = batch.get_block_header(&head.prev_block_h)?;

warn!(
"Corrupted MMR. Tryin to recover it by rewinding blocks to height {}",
prev_header.height
);

txhashset::extending(header_pmmr, txhashset, &mut batch, |ext, batch| {
pipe::rewind_and_apply_fork(&prev_header, ext, batch, &|_| Ok(()), secp)
})?;
Expand Down
2 changes: 1 addition & 1 deletion chain/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ pub fn rewind_and_apply_fork(
current = batch.get_previous_header(&current)?;
}
let fork_point = current;
extension.rewind(&fork_point, batch)?;
extension.rewind(&fork_point, batch, header_extension)?;

// Then apply all full blocks since this common ancestor
// to put txhashet extension in a state to accept the new block.
Expand Down
3 changes: 2 additions & 1 deletion chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ impl Desegmenter {
&mut batch,
|ext, batch| {
let extension = &mut ext.extension;
extension.rewind(&self.archive_header, batch)?;
let header_extension = &mut ext.header_extension;
extension.rewind(&self.archive_header, batch, header_extension)?;

// Validate the extension, generating the utxo_sum and kernel_sum.
// Full validation, including rangeproofs and kernel signature verification.
Expand Down
31 changes: 26 additions & 5 deletions chain/src/txhashset/txhashset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,12 @@ impl<'a> Extension<'a> {
/// Rewinds the MMRs to the provided block, rewinding to the last output pos
/// and last kernel pos of that block. If `updated_bitmap` is supplied, the
/// bitmap accumulator will be replaced with its contents
pub fn rewind(&mut self, header: &BlockHeader, batch: &Batch<'_>) -> Result<(), Error> {
pub fn rewind(
&mut self,
header: &BlockHeader,
batch: &Batch<'_>,
header_ext: &HeaderExtension<'_>,
) -> Result<(), Error> {
debug!(
"Rewind extension to {} at {} from {} at {}",
header.hash(),
Expand All @@ -1556,7 +1561,7 @@ impl<'a> Extension<'a> {
let mut current = head_header;
while header.height < current.height {
let block = batch.get_block(&current.hash())?;
self.rewind_single_block(&block, batch)?;
self.rewind_single_block(&block, batch, header_ext)?;
current = batch.get_previous_header(&current)?;
}
}
Expand All @@ -1570,7 +1575,12 @@ impl<'a> Extension<'a> {
// Rewind the MMRs and the output_pos index.
// Returns a vec of "affected_pos" so we can apply the necessary updates to the bitmap
// accumulator in a single pass for all rewound blocks.
fn rewind_single_block(&mut self, block: &Block, batch: &Batch<'_>) -> Result<(), Error> {
fn rewind_single_block(
&mut self,
block: &Block,
batch: &Batch<'_>,
header_ext: &HeaderExtension<'_>,
) -> Result<(), Error> {
let header = &block.header;
let prev_header = batch.get_previous_header(&header)?;

Expand All @@ -1585,8 +1595,19 @@ impl<'a> Extension<'a> {
header.hash(),
header.height
);
let bitmap = batch.get_block_input_bitmap(&header.hash())?;
bitmap.iter().map(|x| x.into()).collect()
if let Ok(bitmap) = batch.get_block_input_bitmap(&header.hash()) {
bitmap.iter().map(|x| x.into()).collect()
} else {
warn!(
"rewind_single_block: fallback to calculating spent inputs for block {} at {}",
header.hash(),
header.height
);
let spent = self
.utxo_view(header_ext)
.validate_inputs(&block.inputs(), batch)?;
spent.into_iter().map(|(_, pos)| pos.pos).collect()
}
};

if header.height == 0 {
Expand Down
4 changes: 4 additions & 0 deletions core/src/core/pmmr/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ where
// position is a leaf. We traverse the MMR to include any parent(s) that
// need to be included for the MMR to be valid.
let leaf_pos = round_up_to_leaf_pos(position);
if leaf_pos > self.size {
warn!("Invalid attempt tp rewind PMMR!!! Data might be corrupted!!!");
return Ok(());
}
self.backend.rewind(leaf_pos, rewind_rm_pos)?;
self.size = leaf_pos;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion p2p/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl PeerStore {
)?;

if peer.flags != new_state {
info!(
debug!(
"Changing peer {:?} state form {:?} to {:?}",
peer_addr, peer.flags, new_state
);
Expand Down
123 changes: 58 additions & 65 deletions servers/src/mwc/seed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use mwc_p2p::{msg::PeerAddrs, Capabilities, P2PConfig};
use rand::prelude::*;
use std::collections::HashMap;
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{mpsc, Arc};
use std::{thread, time};

Expand Down Expand Up @@ -85,9 +84,9 @@ pub fn connect_and_monitor(
libp2p_connection::set_seed_list(&seed_list, true);

let mut prev_ping = Utc::now();
let connections_in_action = Arc::new(AtomicI32::new(0));

let mut listen_q_addrs: Vec<PeerAddr> = Vec::new();
let mut connection_threads: Vec<thread::JoinHandle<()>> = Vec::new();

loop {
if stop_state.is_stopped() {
Expand Down Expand Up @@ -153,7 +152,7 @@ pub fn connect_and_monitor(
&rx,
&mut connecting_history,
use_tor_connection,
&connections_in_action,
&mut connection_threads,
&mut listen_q_addrs,
);
let duration = if is_boost || !listen_q_addrs.is_empty() {
Expand Down Expand Up @@ -384,7 +383,7 @@ fn listen_for_addrs(
rx: &mpsc::Receiver<PeerAddr>,
connecting_history: &mut HashMap<PeerAddr, DateTime<Utc>>,
use_tor_connection: bool,
connections_in_action: &Arc<AtomicI32>,
connection_threads: &mut Vec<thread::JoinHandle<()>>,
listen_q_addrs: &mut Vec<PeerAddr>,
) {
// Pull everything currently on the queue off the queue.
Expand All @@ -410,9 +409,10 @@ fn listen_for_addrs(
// If we have a healthy number of outbound peers then we are done here.
debug_assert!(!peers.enough_outbound_peers());

connection_threads.retain(|h| !h.is_finished());

while !listen_q_addrs.is_empty() {
debug_assert!(connections_in_action.load(Ordering::Relaxed) >= 0);
if connections_in_action.load(Ordering::Relaxed) > PEER_MAX_INITIATE_CONNECTIONS as i32 {
if connection_threads.len() > PEER_MAX_INITIATE_CONNECTIONS {
break;
}

Expand All @@ -425,81 +425,74 @@ fn listen_for_addrs(

connecting_history.insert(addr.clone(), now);

if p2p.socks_port == 0 {
match &addr {
Onion(_) => {
continue;
}
_ => {}
}
}

let addr_c = addr.clone();
let peers_c = peers.clone();
let p2p_c = p2p.clone();
let connections_in_action = connections_in_action.clone();
thread::Builder::new()
let thr = thread::Builder::new()
.name("peer_connect".to_string())
.spawn(move || {
// if we don't have a socks port, and it's onion, don't set as defunct because
// we don't know.
let update_possible = if p2p_c.socks_port == 0 {
match addr_c.clone() {
Onion(_) => false,
_ => true,
}
} else {
true
};

if update_possible {
let _ = connections_in_action.fetch_add(1, Ordering::Relaxed);
match p2p_c.connect(&addr_c) {
Ok(p) => {
debug!(
"New peer {} is connected as outbound! Capability: {:b}",
p.info.addr, p.info.capabilities
);
// If peer advertizes PEER_LIST then ask it for more peers that support PEER_LIST.
// We want to build a local db of possible peers to connect to.
// We do not necessarily care (at this point in time) what other capabilities these peers support.
if p.info.capabilities.contains(Capabilities::PEER_LIST) {
debug!("Sending peer request to {}", addr_c);
match p.send_peer_request(
Capabilities::PEER_LIST
| peers_c.get_boost_peers_capabilities(),
use_tor_connection,
) {
Ok(_) => {
match addr_c {
PeerAddr::Onion(_) => {
if let Err(_) =
libp2p_connection::add_new_peer(&addr_c)
{
error!("Unable to add libp2p peer {}", addr_c);
}
match p2p_c.connect(&addr_c) {
Ok(p) => {
debug!(
"New peer {} is connected as outbound! Capability: {:b}",
p.info.addr, p.info.capabilities
);
// If peer advertizes PEER_LIST then ask it for more peers that support PEER_LIST.
// We want to build a local db of possible peers to connect to.
// We do not necessarily care (at this point in time) what other capabilities these peers support.
if p.info.capabilities.contains(Capabilities::PEER_LIST) {
debug!("Sending peer request to {}", addr_c);
match p.send_peer_request(
Capabilities::PEER_LIST | peers_c.get_boost_peers_capabilities(),
use_tor_connection,
) {
Ok(_) => {
match addr_c {
PeerAddr::Onion(_) => {
if let Err(_) = libp2p_connection::add_new_peer(&addr_c)
{
error!("Unable to add libp2p peer {}", addr_c);
}
_ => (),
};
}
Err(e) => {
error!(
"Failed send_peer_request to {}, Error: {}",
p.info.addr, e
);
}
}
_ => (),
};
}
Err(e) => {
error!(
"Failed send_peer_request to {}, Error: {}",
p.info.addr, e
);
}
}
// Requesting ping as well, need to know the height asap
let total_diff =
peers_c.total_difficulty().unwrap_or(Difficulty::zero());
let total_height = peers_c.total_height().unwrap_or(0);
if let Err(e) = p.send_ping(total_diff, total_height) {
error!("Failed send_ping to {}, Error: {}", p.info.addr, e);
}

let _ = peers_c.update_state(&addr_c, p2p::State::Healthy);
}
Err(e) => {
info!("Connection to the peer {} was rejected, {}", addr_c, e);
let _ = peers_c.update_state(&addr_c, p2p::State::Defunct);
// Requesting ping as well, need to know the height asap
let total_diff = peers_c.total_difficulty().unwrap_or(Difficulty::zero());
let total_height = peers_c.total_height().unwrap_or(0);
if let Err(e) = p.send_ping(total_diff, total_height) {
error!("Failed send_ping to {}, Error: {}", p.info.addr, e);
}

let _ = peers_c.update_state(&addr_c, p2p::State::Healthy);
}
Err(e) => {
debug!("Connection to the peer {} was rejected, {}", addr_c, e);
let _ = peers_c.update_state(&addr_c, p2p::State::Defunct);
}
let _ = connections_in_action.fetch_sub(1, Ordering::Relaxed);
}
})
.expect("failed to launch peer_connect thread");
connection_threads.push(thr);
}
}

Expand Down
Loading