Skip to content

Commit

Permalink
PIBD fix that broke mmr root calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
bayk committed Dec 16, 2024
1 parent 71370eb commit 5e6fbb4
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 78 deletions.
10 changes: 10 additions & 0 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,16 @@ impl Chain {
assert!(header.kernel_root == kernel_pmmr_root);
}

/*{
use mwc_core::core::pmmr::ReadablePMMR;
let txhashset = self.txhashset.read();
let rangeproof_pmmr = txhashset.rangeproof_pmmr_at(&header);
let rangeproof_pmmr_root = rangeproof_pmmr.root().unwrap();
error!("rangeproof_pmmr_root: {} at height: {}, mmr size: {}", rangeproof_pmmr_root, header.height, header.output_mmr_size);
txhashset.dump_rproof_mmrs()
}*/

Ok(Segmenter::new(
Arc::new(RwLock::new(segm_header_pmmr_backend)),
self.txhashset.clone(),
Expand Down
14 changes: 12 additions & 2 deletions chain/src/txhashset/desegmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,16 @@ impl Desegmenter {
{
let txhashset = self.txhashset.read();
txhashset.roots()?.validate(&self.archive_header)?;
/*match txhashset.roots()?.validate(&self.archive_header) {
Ok(_) => {}
Err(e) => {
error!("validate error: {}", e);
txhashset.dump_rproof_mmrs();
error!("Dump is done. There was Validate error: {}", e);
panic!("Exiting...");
return Err(e);
}
}*/
}

status.update(SyncStatus::ValidatingKernelsHistory);
Expand Down Expand Up @@ -667,7 +677,7 @@ impl Desegmenter {
&mut batch,
|ext, _batch| {
let extension = &mut ext.extension;
extension.apply_output_segments(segm, outputs_bitmap)?;
extension.apply_output_segments(segm)?;
Ok(())
},
)?;
Expand Down Expand Up @@ -720,7 +730,7 @@ impl Desegmenter {
&mut batch,
|ext, _batch| {
let extension = &mut ext.extension;
extension.apply_rangeproof_segments(seg, outputs_bitmap)?;
extension.apply_rangeproof_segments(seg)?;
Ok(())
},
)?;
Expand Down
58 changes: 36 additions & 22 deletions chain/src/txhashset/txhashset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,32 @@ impl TxHashSet {
})
}

/// For debug only, dump for range proof data
pub fn dump_rproof_mmrs(&self) {
info!(
"Generating dump with MMR roots at sizes: Outputs: {} Rangeproofs: {} Kernels: {}",
self.output_pmmr_h.size, self.rproof_pmmr_h.size, self.kernel_pmmr_h.size
);

for i in 0..self.rproof_pmmr_h.size {
let mut s = format!("{} ", i);
if let Some(hash) = self.rproof_pmmr_h.backend.get_hash(i) {
s.push_str(&format!("Hash: {}", hash));
}

if let Some(rp) = self.rproof_pmmr_h.backend.get_data(i) {
s.push_str(&format!(" RP: {:?}", rp));
}

let root = ReadonlyPMMR::at(&self.rproof_pmmr_h.backend, i + 1);
if let Ok(root) = root.root() {
s.push_str(&format!(" ROOT: {}", root));
}

info!("{}", s);
}
}

/// Return Commit's MMR position
pub fn get_output_pos(&self, commit: &Commitment) -> Result<u64, Error> {
Ok(self.commit_index.get_output_pos(&commit)?)
Expand Down Expand Up @@ -1307,8 +1333,10 @@ impl<'a> Extension<'a> {
let flipped = bitmap.flip(0u32..bitmap.maximum().unwrap() + 1);
for spent_pmmr_index in flipped.iter() {
let pos0 = pmmr::insertion_to_pmmr_index(spent_pmmr_index.into());
self.output_pmmr.remove_from_leaf_set(pos0);
self.rproof_pmmr.remove_from_leaf_set(pos0);
// Note, remove_from_leaf_set can;t be used, because the root will be affected
// Some segments might not be pruned, it is very expected.
let _ = self.output_pmmr.prune(pos0);
let _ = self.rproof_pmmr.prune(pos0);
}
Ok(())
}
Expand All @@ -1322,7 +1350,6 @@ impl<'a> Extension<'a> {
pub fn apply_output_segments(
&mut self,
segments: Vec<Segment<OutputIdentifier>>,
bitmap: &Bitmap,
) -> Result<(), Error> {
for segm in segments {
let (_sid, hash_pos, hashes, leaf_pos, leaf_data, _proof) = segm.parts();
Expand Down Expand Up @@ -1350,15 +1377,8 @@ impl<'a> Extension<'a> {
.push(&leaf_data[idx])
.map_err(&Error::TxHashSetErr)?;
}
let pmmr_index = pmmr::pmmr_leaf_to_insertion_index(pos0);
match pmmr_index {
Some(i) => {
if !bitmap.contains(i as u32) {
self.output_pmmr.remove_from_leaf_set(pos0);
}
}
None => {}
};
// Note, extra unproned segments will be upadted later
// Prone will be due
}
}
}
Expand All @@ -1372,11 +1392,12 @@ impl<'a> Extension<'a> {
pub fn apply_rangeproof_segments(
&mut self,
segments: Vec<Segment<RangeProof>>,
bitmap: &Bitmap,
) -> Result<(), Error> {
for segm in segments {
let (_sid, hash_pos, hashes, leaf_pos, leaf_data, _proof) = segm.parts();

//info!("Adding proof segment {}, from mmr pos: {} hashes sz: {} leaf_data sz: {} hash_pos: {:?} hashes: {:?} leaf_pos: {:?} leaf_data: {:?}", sid.idx, self.rproof_pmmr.size, hashes.len(), leaf_data.len(), hash_pos, hashes, leaf_pos, leaf_data );

// insert either leaves or pruned subtrees as we go
for insert in sort_pmmr_hashes_and_leaves(hash_pos, leaf_pos, Some(0)) {
match insert {
Expand All @@ -1400,15 +1421,8 @@ impl<'a> Extension<'a> {
.push(&leaf_data[idx])
.map_err(&Error::TxHashSetErr)?;
}
let pmmr_index = pmmr::pmmr_leaf_to_insertion_index(pos0);
match pmmr_index {
Some(i) => {
if !bitmap.contains(i as u32) {
self.rproof_pmmr.remove_from_leaf_set(pos0);
}
}
None => {}
};
// Note, extra unproned segments will be upadted later
// Prone will be due
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/core/pmmr/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ pub trait Backend<T: PMMRable> {
/// triggered removal).
fn remove(&mut self, position: u64) -> Result<(), String>;

/// Remove a leaf from the leaf set
fn remove_from_leaf_set(&mut self, pos0: u64);
// Remove a leaf from the leaf set.
// DON'T USE IS, use prune instead
//fn remove_from_leaf_set(&mut self, pos0: u64);

/// Release underlying datafiles and locks
fn release_files(&mut self);
Expand Down
7 changes: 4 additions & 3 deletions core/src/core/pmmr/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,11 @@ where
self.backend.reset_prune_list();
}

/// Remove the specified position from the leaf set
pub fn remove_from_leaf_set(&mut self, pos0: u64) {
// Remove the specified position from the leaf set
// DON'T USE IS, use prune instead
/*pub fn remove_from_leaf_set(&mut self, pos0: u64) {
self.backend.remove_from_leaf_set(pos0);
}
}*/

/// Saves a snapshot of the MMR tagged with the block hash.
/// Specifically - snapshots the utxo file as we need this rewound before
Expand Down
2 changes: 1 addition & 1 deletion core/src/core/pmmr/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ where
T: PMMRIndexHashable,
{
/// Calculate root hash of this segment
/// Returns `None` iff the segment is full and completely pruned
/// Returns `None` if the segment is full and completely pruned
pub fn root(
&self,
mmr_size: u64,
Expand Down
6 changes: 3 additions & 3 deletions core/src/core/pmmr/vec_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ impl<T: PMMRable> Backend<T> for VecBackend<T> {
Ok(())
}

fn remove_from_leaf_set(&mut self, _pos0: u64) {
unimplemented!()
}
//fn remove_from_leaf_set(&mut self, _pos0: u64) {
// unimplemented!()
//}

fn reset_prune_list(&mut self) {
unimplemented!()
Expand Down
99 changes: 57 additions & 42 deletions servers/src/mwc/sync/body_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl BodySync {
);

if need_request > 0 {
self.send_requests(&mut need_request, &peers, sync_peers)?;
let mut waiting_requests = self.send_requests(&mut need_request, &peers, sync_peers)?;

// We can send more requests, let's check if we need to update request_series
if need_request > 0 {
Expand Down Expand Up @@ -241,7 +241,11 @@ impl BodySync {
}

// Now we can try to submit more requests...
self.send_requests(&mut need_request, &peers, sync_peers)?;
waiting_requests = self.send_requests(&mut need_request, &peers, sync_peers)?;
}

if need_request > 0 && !waiting_requests.is_empty() {
self.send_waiting_requests(waiting_requests, need_request, &peers, sync_peers)?;
}
}

Expand Down Expand Up @@ -325,26 +329,27 @@ impl BodySync {
retry_expiration_times.len()
}

// return waiting requests
fn send_requests(
&self,
need_request: &mut usize,
peers: &Vec<Arc<Peer>>,
sync_peers: &SyncPeers,
) -> Result<(), chain::Error> {
) -> Result<Vec<(u64, Hash)>, chain::Error> {
// request_series naturally from head to tail, but requesting better to send from tail to the head....
let mut peers = peers.clone();
let mut waiting_heights: Vec<(u64, Hash)> = Vec::new();
// Requests wuth try write because otherwise somebody else is sending, it is mean we are good...
if let Some(request_series) = self.request_series.try_write() {
*need_request = need_request.saturating_sub(self.calc_retry_running_requests());
if *need_request == 0 {
return Ok(());
return Ok(waiting_heights);
}

let mut rng = rand::thread_rng();
let now = Utc::now();

let mut new_requests: Vec<(u64, Hash)> = Vec::new();
let mut waiting_heights: Vec<(u64, Hash)> = Vec::new();

let mut first_in_cache = 0;
let mut last_in_cache = 0;
Expand Down Expand Up @@ -459,7 +464,7 @@ impl BodySync {
peers.retain(|p| p.info.live_info.read().height >= height);
if peers.is_empty() {
*need_request = 0;
return Ok(());
return Ok(waiting_heights);
}

// sending request
Expand All @@ -486,47 +491,57 @@ impl BodySync {
);
}
}
}
Ok(waiting_heights)
}

if *need_request > 0 {
// Free requests, lets duplicated some random from the expected buffer
let duplicate_reqs: Vec<(u64, Hash)> = waiting_heights
.choose_multiple(&mut rng, *need_request)
.cloned()
.collect();
*need_request = 0;
fn send_waiting_requests(
&self,
waiting_heights: Vec<(u64, Hash)>,
need_request: usize,
peers: &Vec<Arc<Peer>>,
sync_peers: &SyncPeers,
) -> Result<(), chain::Error> {
debug_assert!(need_request > 0);

for (height, hash) in duplicate_reqs {
// We don't want to send retry to the peer whom we already send the data
if let Some(requested_peer) = self.request_tracker.get_expected_peer(&hash) {
let dup_peer = peers
.iter()
.filter(|p| p.info.addr != requested_peer)
.choose(&mut rng);
let mut rng = rand::thread_rng();
let now = Utc::now();

if dup_peer.is_none() {
break;
}
let dup_peer = dup_peer.unwrap();
// Free requests, lets duplicated some random from the expected buffer
let duplicate_reqs: Vec<(u64, Hash)> = waiting_heights
.into_iter()
.choose_multiple(&mut rng, need_request);

debug!(
"Processing duplicated request for the block {} at {}, peer {:?}",
hash, height, dup_peer.info.addr
for (height, hash) in duplicate_reqs {
// We don't want to send retry to the peer whom we already send the data
if let Some(requested_peer) = self.request_tracker.get_expected_peer(&hash) {
let dup_peer = peers
.iter()
.filter(|p| p.info.addr != requested_peer)
.choose(&mut rng);

if dup_peer.is_none() {
break;
}
let dup_peer = dup_peer.unwrap();
debug!(
"Processing duplicated request for the block {} at {}, peer {:?}",
hash, height, dup_peer.info.addr
);

match dup_peer.send_block_request(hash, chain::Options::SYNC) {
Ok(_) => self
.retry_expiration_times
.write()
.push_back(now + self.request_tracker.get_average_latency()),
Err(e) => {
let msg = format!(
"Failed to send duplicate block request to peer {}, {}",
dup_peer.info.addr, e
);
match dup_peer.send_block_request(hash, chain::Options::SYNC) {
Ok(_) => self
.retry_expiration_times
.write()
.push_back(now + self.request_tracker.get_average_latency()),
Err(e) => {
let msg = format!(
"Failed to send duplicate block request to peer {}, {}",
dup_peer.info.addr, e
);
warn!("{}", msg);
sync_peers.report_no_response(&dup_peer.info.addr, msg);
break;
}
}
warn!("{}", msg);
sync_peers.report_no_response(&dup_peer.info.addr, msg);
break;
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions store/src/pmmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ impl<T: PMMRable> Backend<T> for PMMRBackend<T> {
self.get_data_from_file(pos0)
}

/// Remove leaf from leaf set
fn remove_from_leaf_set(&mut self, pos0: u64) {
// Remove leaf from leaf set
// DON'T USE IS, use prune instead
/*fn remove_from_leaf_set(&mut self, pos0: u64) {
self.leaf_set.remove(pos0);
}
}*/

/// Returns an iterator over all the leaf positions.
/// for a prunable PMMR this is an iterator over the leaf_set bitmap.
Expand Down

0 comments on commit 5e6fbb4

Please sign in to comment.