Skip to content

Commit

Permalink
Merge pull request #88 from mwcproject/v5.3.3/sync
Browse files Browse the repository at this point in the history
Improved sync workflow
  • Loading branch information
bayk authored Dec 16, 2024
2 parents cf8c606 + 5e6fbb4 commit 607bd57
Show file tree
Hide file tree
Showing 35 changed files with 2,315 additions and 1,399 deletions.
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,7 @@ debug = true
#debug = false # Disable debug symbols (if not needed)
#lto = true # Enable Link-Time Optimization
#codegen-units = 1 # Optimize for size/speed
#overflow-checks = false
#overflow-checks = false

#[profile.release]
#debug = true
170 changes: 87 additions & 83 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,6 @@ pub struct Chain {
txhashset: Arc<RwLock<txhashset::TxHashSet>>, // Lock order (with childrer): 2
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>, // Lock order (with childrer): 1
pibd_segmenter: Arc<RwLock<Option<Segmenter>>>,
pibd_desegmenter: Arc<RwLock<Option<Desegmenter>>>,
reset_pibd_desegmenter: Arc<RwLock<bool>>,
// POW verification function
pow_verifier: fn(&BlockHeader) -> Result<(), pow::Error>,
denylist: Arc<RwLock<Vec<Hash>>>,
Expand Down Expand Up @@ -247,8 +245,6 @@ impl Chain {
txhashset: Arc::new(RwLock::new(txhashset)),
header_pmmr: Arc::new(RwLock::new(header_pmmr)),
pibd_segmenter: Arc::new(RwLock::new(None)),
pibd_desegmenter: Arc::new(RwLock::new(None)),
reset_pibd_desegmenter: Arc::new(RwLock::new(false)),
pow_verifier,
denylist: Arc::new(RwLock::new(vec![])),
archive_mode,
Expand Down Expand Up @@ -553,33 +549,44 @@ impl Chain {
}
break;
}
if blocks.len() > 1 {
// good, we can process multiple blocks, it should be faster than one by one
let block_hashes: Vec<(u64, Hash)> =
blocks.iter().map(|b| (b.header.height, b.hash())).collect();
match self.process_block_multiple(blocks, opts) {
Ok(tip) => {
// We are good, let's clean up the orphans
for (height, hash) in block_hashes {
let _ = self.orphans.remove_by_height_header_hash(height, &hash);
}
return Ok(tip); // Done with success
// good, we can process multiple blocks, it should be faster than one by one
let block_hashes: Vec<(u64, Hash)> =
blocks.iter().map(|b| (b.header.height, b.hash())).collect();
match self.process_block_multiple(&blocks, opts) {
Ok(tip) => {
// We are good, let's clean up the orphans
for (height, hash) in block_hashes {
let _ = self.orphans.remove_by_height_header_hash(height, &hash);
}
return Ok(tip); // Done with success
}
Err(e) => {
if e.is_bad_data() {
info!("Failed to process multiple blocks, will try process one by one. {}",e);
} else {
debug!("Failed to process multiple blocks, will try process one by one. {}",e);
}
Err(e) => {
debug!("Failed process_block_multiple with error {}", e);
} // Continue processing one by one
}
}
}
}

// Processing blocks one by one. It is slower, by eny possible error will be caught on block level.
// Processing blocks one by one. It is slower, but any possible error will be caught on block level.
let height = b.header.height;
let res = self.process_block_single(b, opts);
if res.is_ok() {
self.check_orphans(height + 1);
match self.process_block_single(b, opts) {
Ok(tip) => {
self.check_orphans(height + 1);
return Ok(tip);
}
Err(e) => {
if e.is_bad_data() {
error!("process_block_single failed with error: {}", e);
} else {
debug!("process_block_single failed with error: {}", e);
}
return Err(e);
}
}
res
}

/// We plan to support receiving blocks with CommitOnly inputs.
Expand Down Expand Up @@ -717,20 +724,16 @@ impl Chain {
/// Returns true if it has been added to the longest chain
/// or false if it has added to a fork (or orphan?).
fn process_block_single(&self, b: Block, opts: Options) -> Result<Option<Tip>, Error> {
// We can only reliably convert to "v2" if not an orphan (may spend output from previous block).
// We convert from "v3" to "v2" by looking up outputs to be spent.
// This conversion also ensures a block received in "v2" has valid input features (prevents malleability).
let b = self.convert_block_v2(b)?;

let (head, fork_point, prev_head) = {
let (head, fork_point, prev_head, b) = {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch_write()?;
let prev_head = batch.head()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;

let (head, fork_point) = pipe::process_block(
&b,
let mut bv = vec![b.clone()];
let (head, fork_point) = pipe::process_blocks_series(
&bv,
&mut ctx,
&mut *self.cache_header_difficulty.write(),
self.secp(),
Expand All @@ -739,7 +742,7 @@ impl Chain {
ctx.batch.commit()?;

// release the lock and let the batch go before post-processing
(head, fork_point, prev_head)
(head, fork_point, prev_head, bv.remove(0))
};

let prev = self.get_previous_header(&b.header)?;
Expand All @@ -750,6 +753,11 @@ impl Chain {
Tip::from_header(&fork_point),
);

info!(
"Accepted single block {} for height {}",
b.hash(),
b.header.height
);
// notifying other parts of the system of the update
self.adapter.block_accepted(&b, status, opts);

Expand All @@ -761,35 +769,30 @@ impl Chain {
// Since they are orphans - check_block was called to them when they were added to orphan pool.
fn process_block_multiple(
&self,
blocks: Vec<Block>,
blocks: &Vec<Block>,
opts: Options,
) -> Result<Option<Tip>, Error> {
// We can only reliably convert to "v2" if not an orphan (may spend output from previous block).
// We convert from "v3" to "v2" by looking up outputs to be spent.
// This conversion also ensures a block received in "v2" has valid input features (prevents malleability).
let mut blocks_v2 = Vec::new();
for b in blocks {
blocks_v2.push(self.convert_block_v2(b)?);
}
debug_assert!(blocks_v2.len() > 1);

let (head, fork_point, prev_head) = {
let mut header_pmmr = self.header_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch_write()?;
let prev_head = batch.head()?;
let mut ctx = self.new_ctx(opts, batch, &mut header_pmmr, &mut txhashset)?;

let (head, fork_point) =
pipe::process_blocks_series(&blocks_v2, &mut ctx, self.secp())?;
let (head, fork_point) = pipe::process_blocks_series(
&blocks,
&mut ctx,
&mut *self.cache_header_difficulty.write(),
self.secp(),
)?;

ctx.batch.commit()?;

// release the lock and let the batch go before post-processing
(head, fork_point, prev_head)
};

let last_block = blocks_v2.last().unwrap();
let last_block = blocks.last().unwrap();
let prev = self.get_previous_header(&last_block.header)?;
let status = self.determine_status(
head,
Expand All @@ -798,8 +801,15 @@ impl Chain {
Tip::from_header(&fork_point),
);

debug!(
"Accepted multiple {} block from height {} to {}",
blocks.len(),
blocks.first().unwrap().header.height,
blocks.last().unwrap().header.height
);

// notifying other parts of the system of the update
for b in &blocks_v2 {
for b in blocks {
self.adapter.block_accepted(b, status, opts);
}

Expand Down Expand Up @@ -1304,6 +1314,36 @@ impl Chain {
now.elapsed().as_millis()
);

// Let's check if mmr roots are matching the header
#[cfg(debug_assertions)]
{
use mwc_core::core::pmmr::ReadablePMMR;

let txhashset = self.txhashset.read();

let output_pmmr = txhashset.output_pmmr_at(&header);
let output_pmmr_root = output_pmmr.root().unwrap();
assert!(header.output_root == output_pmmr_root);

let rangeproof_pmmr = txhashset.rangeproof_pmmr_at(&header);
let rangeproof_pmmr_root = rangeproof_pmmr.root().unwrap();
assert!(header.range_proof_root == rangeproof_pmmr_root);

let kernel_pmmr = txhashset.kernel_pmmr_at(&header);
let kernel_pmmr_root = kernel_pmmr.root().unwrap();
assert!(header.kernel_root == kernel_pmmr_root);
}

/*{
use mwc_core::core::pmmr::ReadablePMMR;
let txhashset = self.txhashset.read();
let rangeproof_pmmr = txhashset.rangeproof_pmmr_at(&header);
let rangeproof_pmmr_root = rangeproof_pmmr.root().unwrap();
error!("rangeproof_pmmr_root: {} at height: {}, mmr size: {}", rangeproof_pmmr_root, header.height, header.output_mmr_size);
txhashset.dump_rproof_mmrs()
}*/

Ok(Segmenter::new(
Arc::new(RwLock::new(segm_header_pmmr_backend)),
self.txhashset.clone(),
Expand All @@ -1312,46 +1352,10 @@ impl Chain {
))
}

/// instantiate desegmenter for this header. Expected that handshake is done and as a result, header with bitmap_root_hash is known
pub fn create_desegmenter(
&self,
archive_header_height: u64,
bitmap_root_hash: Hash,
) -> Result<(), Error> {
let uploaded_height = self.head()?.height;
if uploaded_height >= archive_header_height {
return Err(Error::DesegmenterCreationError(format!("No need to create desegmenter, data is uploaded until height {}, archive height is {}", uploaded_height, archive_header_height)));
}
self.reset_pibd_chain()?;
let desegmenter = self.init_desegmenter(archive_header_height, bitmap_root_hash)?;
*self.pibd_desegmenter.write() = Some(desegmenter);
*self.reset_pibd_desegmenter.write() = false;
Ok(())
}

/// instantiate desegmenter (in same lazy fashion as segmenter, though this should not be as
/// expensive an operation)
pub fn get_desegmenter(&self) -> Arc<RwLock<Option<Desegmenter>>> {
// Use our cached desegmenter if we have one and the associated header matches.
let mut reset_pibd_desegmenter = self.reset_pibd_desegmenter.write();
if *reset_pibd_desegmenter {
*self.pibd_desegmenter.write() = None;
*reset_pibd_desegmenter = false;
}
return self.pibd_desegmenter.clone();
}

/// Reset desegmenter associated with this seesion
pub fn reset_desegmenter(&self) {
// We can't modify desegmenter here, it is already locked.
//*self.pibd_desegmenter.write() = None
*self.reset_pibd_desegmenter.write() = true;
}

/// initialize a desegmenter, which is capable of extending the hashset by appending
/// PIBD segments of the three PMMR trees + Bitmap PMMR
/// header should be the same header as selected for the txhashset.zip archive
fn init_desegmenter(
pub fn init_desegmenter(
&self,
archive_header_hegiht: u64,
bitmap_root_hash: Hash,
Expand Down
Loading

0 comments on commit 607bd57

Please sign in to comment.