Skip to content

Commit

Permalink
Removed Sync MMR. Warnings are not cleaned
Browse files Browse the repository at this point in the history
  • Loading branch information
bayk committed Jul 2, 2024
1 parent ea21be6 commit abe8ce0
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 376 deletions.
66 changes: 2 additions & 64 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ pub struct Chain {
orphans: Arc<OrphanBlockPool>,
txhashset: Arc<RwLock<txhashset::TxHashSet>>,
header_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
sync_pmmr: Arc<RwLock<txhashset::PMMRHandle<BlockHeader>>>,
pibd_segmenter: Arc<RwLock<Option<Segmenter>>>,
pibd_desegmenter: Arc<RwLock<Option<Desegmenter>>>,
// POW verification function
Expand Down Expand Up @@ -194,21 +193,8 @@ impl Chain {
ProtocolVersion(1),
None,
)?;
let mut sync_pmmr = PMMRHandle::new(
Path::new(&db_root).join("header").join("sync_head"),
false,
ProtocolVersion(1),
None,
)?;

setup_head(
&genesis,
&store,
&mut header_pmmr,
&mut sync_pmmr,
&mut txhashset,
false,
)?;
setup_head(&genesis, &store, &mut header_pmmr, &mut txhashset, false)?;

// Initialize the output_pos index based on UTXO set
// and NRD kernel_pos index based recent kernel history.
Expand All @@ -226,7 +212,6 @@ impl Chain {
orphans: Arc::new(OrphanBlockPool::new()),
txhashset: Arc::new(RwLock::new(txhashset)),
header_pmmr: Arc::new(RwLock::new(header_pmmr)),
sync_pmmr: Arc::new(RwLock::new(sync_pmmr)),
pibd_segmenter: Arc::new(RwLock::new(None)),
pibd_desegmenter: Arc::new(RwLock::new(None)),
pow_verifier,
Expand All @@ -239,9 +224,6 @@ impl Chain {
// Suppress any errors here in case we cannot find
chain.rewind_bad_block()?;

let header_head = chain.header_head()?;
chain.rebuild_sync_mmr(&header_head)?;

chain.log_heads()?;

Ok(chain)
Expand Down Expand Up @@ -306,7 +288,6 @@ impl Chain {
/// restart the output PMMRs from scratch
pub fn reset_chain_head_to_genesis(&self) -> Result<(), Error> {
let mut header_pmmr = self.header_pmmr.write();
let mut sync_pmmr = self.sync_pmmr.write();
let mut txhashset = self.txhashset.write();
let batch = self.store.batch()?;

Expand All @@ -322,7 +303,6 @@ impl Chain {
&self.genesis,
&self.store,
&mut header_pmmr,
&mut sync_pmmr,
&mut txhashset,
true,
)?;
Expand Down Expand Up @@ -482,17 +462,6 @@ impl Chain {
};
log_head("head", self.head()?);
log_head("header_head", self.header_head()?);
log_head("sync_head", self.get_sync_head()?);

// Needed for Node State tracking...
let sync_head = self.get_sync_head()?;
info!(
"init: sync_head: {} @ {} [{}]",
sync_head.total_difficulty.to_num(),
sync_head.height,
sync_head.last_block_h,
);

Ok(())
}

Expand Down Expand Up @@ -1231,21 +1200,6 @@ impl Chain {
Ok(())
}

/// Rebuild the sync MMR based on current header_head.
/// We rebuild the sync MMR when first entering sync mode so ensure we
/// have an MMR we can safely rewind based on the headers received from a peer.
pub fn rebuild_sync_mmr(&self, head: &Tip) -> Result<(), Error> {
let mut sync_pmmr = self.sync_pmmr.write();
let mut batch = self.store.batch()?;
let header = batch.get_block_header(&head.hash())?;
txhashset::header_extending(&mut sync_pmmr, &mut batch, |ext, batch| {
self.rewind_and_apply_header_fork(&header, ext, batch)?;
Ok(())
})?;
batch.commit()?;
Ok(())
}

/// Finds the "fork point" where header chain diverges from full block chain.
/// If we are syncing this will correspond to the last full block where
/// the next header is known but we do not yet have the full block.
Expand Down Expand Up @@ -1914,16 +1868,7 @@ impl Chain {
}
}

/// Get the tip of the current "sync" header chain.
/// This may be significantly different to current header chain.
pub fn get_sync_head(&self) -> Result<Tip, Error> {
let hash = self.sync_pmmr.read().head_hash()?;
let header = self.store.get_block_header(&hash)?;
Ok(Tip::from_header(&header))
}

/// Gets multiple headers at the provided heights.
/// Note: Uses the sync pmmr, not the header pmmr.
/// Note: This is based on the provided sync_head to support syncing against a fork.
pub fn get_locator_hashes(&self, sync_head: Tip, heights: &[u64]) -> Result<Vec<Hash>, Error> {
let mut header_pmmr = self.header_pmmr.write();
Expand Down Expand Up @@ -1961,13 +1906,12 @@ fn setup_head(
genesis: &Block,
store: &store::ChainStore,
header_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
sync_pmmr: &mut txhashset::PMMRHandle<BlockHeader>,
txhashset: &mut txhashset::TxHashSet,
resetting_pibd: bool,
) -> Result<(), Error> {
let mut batch = store.batch()?;

// Apply the genesis header to header and sync MMRs.
// Apply the genesis header to header MMR.
{
if batch.get_block_header(&genesis.hash()).is_err() {
batch.save_block_header(&genesis.header)?;
Expand All @@ -1978,12 +1922,6 @@ fn setup_head(
ext.apply_header(&genesis.header)
})?;
}

if sync_pmmr.size == 0 {
txhashset::header_extending(sync_pmmr, &mut batch, |ext, _| {
ext.apply_header(&genesis.header)
})?;
}
}

// Make sure our header PMMR is consistent with header_head from db if it exists.
Expand Down
21 changes: 3 additions & 18 deletions p2p/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ impl Peer {
info: PeerInfo,
conn: TcpStream,
adapter: Arc<dyn NetAdapter>,
header_cache_size: u64,
server: Server,
) -> std::io::Result<Peer> {
let state = Arc::new(RwLock::new(State::Connected));
Expand All @@ -93,7 +92,6 @@ impl Peer {
Arc::new(tracking_adapter.clone()),
info.clone(),
state_sync_requested.clone(),
header_cache_size,
server,
);
let tracker = Arc::new(conn::Tracker::new());
Expand All @@ -117,13 +115,12 @@ impl Peer {
total_difficulty: Difficulty,
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
header_cache_size: u64,
server: Server,
) -> Result<Peer, Error> {
debug!("accept: handshaking from {:?}", conn.peer_addr());
let info = hs.accept(capab, total_difficulty, &mut conn);
match info {
Ok(info) => Ok(Peer::new(info, conn, adapter, header_cache_size, server)?),
Ok(info) => Ok(Peer::new(info, conn, adapter, server)?),
Err(e) => {
debug!(
"accept: handshaking from {:?} failed with error: {:?}",
Expand All @@ -145,7 +142,6 @@ impl Peer {
self_addr: PeerAddr,
hs: &Handshake,
adapter: Arc<dyn NetAdapter>,
header_cache_size: u64,
peer_addr: Option<PeerAddr>,
server: Server,
) -> Result<Peer, Error> {
Expand All @@ -163,7 +159,7 @@ impl Peer {
hs.initiate(capab, total_difficulty, self_addr, &mut conn, None)
};
match info {
Ok(info) => Ok(Peer::new(info, conn, adapter, header_cache_size, server)?),
Ok(info) => Ok(Peer::new(info, conn, adapter, server)?),
Err(e) => {
if peer_addr.is_some() {
debug!(
Expand Down Expand Up @@ -612,7 +608,6 @@ impl ChainAdapter for TrackingAdapter {
&self,
bh: &[core::BlockHeader],
peer_info: &PeerInfo,
header_sync_cache_size: u64,
) -> Result<bool, chain::Error> {
trace!(
"peer = {:?}, set header sync = false (in headers)",
Expand All @@ -633,17 +628,7 @@ impl ChainAdapter for TrackingAdapter {
peer_info.header_sync_requested.store(0, Ordering::Relaxed);
}
trace!("header sync for {} is {}", peer_info.addr, val);
self.adapter
.headers_received(bh, peer_info, header_sync_cache_size)
}

// note: not needed because adapter is called from headers_received and header_recevied
fn process_add_headers_sync(
&self,
_: &[core::BlockHeader],
_: u64,
) -> Result<bool, chain::Error> {
unimplemented!()
self.adapter.headers_received(bh, peer_info)
}

fn locate_headers(&self, locator: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
Expand Down
15 changes: 1 addition & 14 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,8 @@ impl ChainAdapter for Peers {
&self,
headers: &[core::BlockHeader],
peer_info: &PeerInfo,
header_sync_cache_size: u64,
) -> Result<bool, chain::Error> {
if !self
.adapter
.headers_received(headers, peer_info, header_sync_cache_size)?
{
if !self.adapter.headers_received(headers, peer_info)? {
// if the peer sent us a block header that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
self.ban_peer(peer_info.addr.clone(), ReasonForBan::BadBlockHeader)
Expand All @@ -577,15 +573,6 @@ impl ChainAdapter for Peers {
}
}

// note not needed to implement because adapter is called by headers_received and header_received
fn process_add_headers_sync(
&self,
_: &[core::BlockHeader],
_: u64,
) -> Result<bool, chain::Error> {
unimplemented!()
}

fn locate_headers(&self, hs: &[Hash]) -> Result<Vec<core::BlockHeader>, chain::Error> {
self.adapter.locate_headers(hs)
}
Expand Down
5 changes: 1 addition & 4 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ pub struct Protocol {
adapter: Arc<dyn NetAdapter>,
peer_info: PeerInfo,
state_sync_requested: Arc<AtomicBool>,
header_cache_size: u64,
server: Server,
}

Expand All @@ -41,14 +40,12 @@ impl Protocol {
adapter: Arc<dyn NetAdapter>,
peer_info: PeerInfo,
state_sync_requested: Arc<AtomicBool>,
header_cache_size: u64,
server: Server,
) -> Protocol {
Protocol {
adapter,
peer_info,
state_sync_requested,
header_cache_size,
server,
}
}
Expand Down Expand Up @@ -236,7 +233,7 @@ impl MessageHandler for Protocol {
}

Message::Headers(data) => {
adapter.headers_received(&data.headers, &self.peer_info, self.header_cache_size)?;
adapter.headers_received(&data.headers, &self.peer_info)?;
Consumed::None
}

Expand Down
19 changes: 4 additions & 15 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl Server {

/// Starts a new TCP server and listen to incoming connections. This is a
/// blocking call until the TCP server stops.
pub fn listen(&self, header_cache_size: u64) -> Result<(), Error> {
pub fn listen(&self) -> Result<(), Error> {
// start TCP listener and handle incoming connections
let addr = SocketAddr::new(self.config.host, self.config.port);
let listener = TcpListener::bind(addr)?;
Expand Down Expand Up @@ -139,7 +139,7 @@ impl Server {
}
continue;
}
match self.handle_new_peer(stream, header_cache_size) {
match self.handle_new_peer(stream) {
Err(Error::ConnectionClose(err)) => {
debug!("shutting down, ignoring a new peer, {}", err)
}
Expand Down Expand Up @@ -167,7 +167,7 @@ impl Server {

/// Asks the server to connect to a new peer. Directly returns the peer if
/// we're already connected to the provided address.
pub fn connect(&self, addr: PeerAddr, header_cache_size: u64) -> Result<Arc<Peer>, Error> {
pub fn connect(&self, addr: PeerAddr) -> Result<Arc<Peer>, Error> {
if self.stop_state.is_stopped() {
return Err(Error::ConnectionClose(String::from("node is stopping")));
}
Expand Down Expand Up @@ -281,7 +281,6 @@ impl Server {
self_addr,
&self.handshake,
self.peers.clone(),
header_cache_size,
peer_addr,
(*self).clone(),
)?;
Expand All @@ -302,7 +301,7 @@ impl Server {
}
}

fn handle_new_peer(&self, stream: TcpStream, header_cache_size: u64) -> Result<(), Error> {
fn handle_new_peer(&self, stream: TcpStream) -> Result<(), Error> {
if self.stop_state.is_stopped() {
return Err(Error::ConnectionClose(String::from("Server is stopping")));
}
Expand All @@ -315,7 +314,6 @@ impl Server {
total_diff,
&self.handshake,
self.peers.clone(),
header_cache_size,
self.clone(),
)?;
self.peers.add_connected(Arc::new(peer))?;
Expand Down Expand Up @@ -432,7 +430,6 @@ impl ChainAdapter for DummyAdapter {
&self,
_: &[core::BlockHeader],
_: &PeerInfo,
_: u64,
) -> Result<bool, chain::Error> {
Ok(true)
}
Expand All @@ -450,14 +447,6 @@ impl ChainAdapter for DummyAdapter {
unimplemented!()
}

fn process_add_headers_sync(
&self,
_: &[core::BlockHeader],
_: u64,
) -> Result<bool, chain::Error> {
unimplemented!()
}

fn txhashset_receive_ready(&self) -> bool {
false
}
Expand Down
7 changes: 0 additions & 7 deletions p2p/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -701,20 +701,13 @@ pub trait ChainAdapter: Sync + Send {
peer_info: &PeerInfo,
) -> Result<bool, chain::Error>;

fn process_add_headers_sync(
&self,
bh: &[core::BlockHeader],
header_cache_size: u64,
) -> Result<bool, chain::Error>;

/// A set of block header has been received, typically in response to a
/// block
/// header request.
fn headers_received(
&self,
bh: &[core::BlockHeader],
peer_info: &PeerInfo,
header_sync_cache_size: u64,
) -> Result<bool, chain::Error>;

/// Finds a list of block headers based on the provided locator. Tries to
Expand Down
Loading

0 comments on commit abe8ce0

Please sign in to comment.