Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactorings of p2p crate #150

Merged
merged 14 commits into from
Nov 11, 2023
42 changes: 26 additions & 16 deletions chain/src/block/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl<S: Store<Header = BlockHeader>> BlockCache<S> {

if let Some(branch) = best_branch {
// Stale blocks after potential re-org.
let stale = self.switch_to_fork(branch)?;
let reverted = self.switch_to_fork(branch)?;
let height = self.height();
let hash = branch.tip;
let header = *branch
Expand All @@ -356,9 +356,13 @@ impl<S: Store<Header = BlockHeader>> BlockCache<S> {
)
.expect("BlockCache::import_block: there is always at least one connected block");

Ok(ImportResult::TipChanged(
header, hash, height, stale, connected,
))
Ok(ImportResult::TipChanged {
header,
hash,
height,
reverted,
connected,
})
} else {
Ok(ImportResult::TipUnchanged)
}
Expand Down Expand Up @@ -570,7 +574,13 @@ impl<S: Store<Header = BlockHeader>> BlockTree for BlockCache<S> {

for (i, header) in chain.enumerate() {
match self.import_block(header, context) {
Ok(ImportResult::TipChanged(header, hash, height, r, c)) => {
Ok(ImportResult::TipChanged {
header,
hash,
height,
reverted: r,
connected: c,
}) => {
seen.extend(c.iter().map(|(_, h)| h.block_hash()));
reverted.extend(r.into_iter().map(|(i, h)| ((i, h.block_hash()), h)));
connected.extend(c);
Expand All @@ -593,19 +603,19 @@ impl<S: Store<Header = BlockHeader>> BlockTree for BlockCache<S> {
// Don't return connected blocks if they are not in the main chain.
connected.retain(|_, h| self.contains(&h.block_hash()));

Ok(ImportResult::TipChanged(
best_header,
best_hash,
best_height,
reverted
Ok(ImportResult::TipChanged {
header: best_header,
hash: best_hash,
height: best_height,
reverted: reverted
.into_iter()
.rev()
.map(|((i, _), h)| (i, h))
.collect(),
NonEmpty::from_vec(connected.into_iter().collect()).expect(
connected: NonEmpty::from_vec(connected.into_iter().collect()).expect(
"BlockCache::import_blocks: there is always at least one connected block",
),
))
})
} else {
Ok(ImportResult::TipUnchanged)
}
Expand All @@ -627,13 +637,13 @@ impl<S: Store<Header = BlockHeader>> BlockTree for BlockCache<S> {
self.extend_chain(height, hash, header);
self.store.put(std::iter::once(header))?;

Ok(ImportResult::TipChanged(
Ok(ImportResult::TipChanged {
header,
hash,
height,
vec![],
NonEmpty::new((height, header)),
))
reverted: vec![],
connected: NonEmpty::new((height, header)),
})
} else {
Ok(ImportResult::TipUnchanged)
}
Expand Down
22 changes: 14 additions & 8 deletions chain/src/block/cache/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,8 +852,14 @@ fn prop_cache_import_tree_randomized(tree: Tree) {
match (real_result, model_result) {
(ImportResult::TipUnchanged, ImportResult::TipUnchanged) => {}
(
ImportResult::TipChanged(header, hash, height, reverted, connected),
ImportResult::TipChanged(_, _, _, _, _),
ImportResult::TipChanged {
header,
hash,
height,
reverted,
connected,
},
ImportResult::TipChanged { .. },
) => {
assert_eq!(connected.last(), &(height, header));
assert_eq!(header.block_hash(), hash);
Expand Down Expand Up @@ -946,13 +952,13 @@ fn test_cache_import_height_unchanged() {
assert_eq!(cache.tip().0, b2.hash);
assert_eq!(
result,
ImportResult::TipChanged(
b2.block(),
b2.hash,
ImportResult::TipChanged {
header: b2.block(),
hash: b2.hash,
height,
vec![(2, a2.block())],
NonEmpty::new((2, b2.block()))
)
reverted: vec![(2, a2.block())],
connected: NonEmpty::new((2, b2.block()))
}
);
}

Expand Down
41 changes: 15 additions & 26 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ pub use nakamoto_common::network;
pub use nakamoto_common::network::Network;
pub use nakamoto_common::p2p::Domain;
pub use nakamoto_net::event;
pub use nakamoto_p2p::fsm::{Command, CommandError, Hooks, Limits, Link, Peer};
pub use nakamoto_p2p::fsm::{Command, CommandError, Event, Hooks, Limits, Link, Peer};

pub use crate::error::Error;
pub use crate::event::{Event, Loading};
pub use crate::event::Loading;
pub use crate::handle;
pub use crate::service::Service;

Expand Down Expand Up @@ -201,24 +201,19 @@ impl<R: Reactor> Client<R> {
let (commands_tx, commands_rx) = chan::unbounded::<Command>();
let (event_pub, events) = event::broadcast(|e, p| p.emit(e));
let (blocks_pub, blocks) = event::broadcast(|e, p| {
if let fsm::Event::Inventory(fsm::InventoryEvent::BlockProcessed {
block,
height,
..
}) = e
{
if let fsm::Event::BlockProcessed { block, height, .. } = e {
p.emit((block, height));
}
});
let (filters_pub, filters) = event::broadcast(|e, p| {
if let fsm::Event::Filter(fsm::FilterEvent::FilterReceived {
if let fsm::Event::FilterReceived {
filter,
block_hash,
block,
height,
..
}) = e
} = e
{
p.emit((filter, block_hash, height));
p.emit((filter, block, height));
}
});
let (publisher, subscriber) = event::broadcast({
Expand Down Expand Up @@ -572,21 +567,14 @@ impl<W: Waker> handle::Handle for Handle<W> {
Ok(receive.recv()?)
}

fn query(&self, msg: NetworkMessage) -> Result<Option<net::SocketAddr>, handle::Error> {
let (transmit, receive) = chan::bounded::<Option<net::SocketAddr>>(1);
self.command(Command::Query(msg, transmit))?;

Ok(receive.recv()?)
}

fn connect(&self, addr: net::SocketAddr) -> Result<Link, handle::Error> {
let events = self.events.subscribe();
self.command(Command::Connect(addr))?;

event::wait(
&events,
|e| match e {
fsm::Event::Peer(fsm::PeerEvent::Connected(a, link))
fsm::Event::PeerConnected { addr: a, link, .. }
if a == addr || (addr.ip().is_unspecified() && a.port() == addr.port()) =>
{
Some(link)
Expand All @@ -605,7 +593,7 @@ impl<W: Waker> handle::Handle for Handle<W> {
event::wait(
&events,
|e| match e {
fsm::Event::Peer(fsm::PeerEvent::Disconnected(a, _))
fsm::Event::PeerDisconnected { addr: a, .. }
if a == addr || (addr.ip().is_unspecified() && a.port() == addr.port()) =>
{
Some(())
Expand Down Expand Up @@ -684,12 +672,12 @@ impl<W: Waker> handle::Handle for Handle<W> {
event::wait(
&events,
|e| match e {
fsm::Event::Peer(fsm::PeerEvent::Negotiated {
fsm::Event::PeerNegotiated {
addr,
height,
services,
..
}) => {
} => {
if services.has(required_services) {
negotiated.insert(addr, (height, services));
}
Expand All @@ -715,9 +703,10 @@ impl<W: Waker> handle::Handle for Handle<W> {
None => event::wait(
&events,
|e| match e {
fsm::Event::Chain(fsm::ChainEvent::Synced(hash, height)) if height == h => {
Some(hash)
}
Event::BlockHeadersImported {
result: ImportResult::TipChanged { height, hash, .. },
..
} if height == h => Some(hash),
_ => None,
},
self.timeout,
Expand Down
Loading
Loading