Skip to content

Commit

Permalink
feat: wire network primitives to remaining components (#13143)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected authored Dec 4, 2024
1 parent 874cf89 commit 1f6b7d1
Show file tree
Hide file tree
Showing 15 changed files with 87 additions and 64 deletions.
6 changes: 3 additions & 3 deletions crates/net/eth-wire/src/ethstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ mod tests {
async fn can_write_and_read_cleartext() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let test_msg: EthMessage = EthMessage::NewBlockHashes(
let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
vec![
BlockHashNumber { hash: B256::random(), number: 5 },
BlockHashNumber { hash: B256::random(), number: 6 },
Expand Down Expand Up @@ -572,7 +572,7 @@ mod tests {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let server_key = SecretKey::new(&mut rand::thread_rng());
let test_msg: EthMessage = EthMessage::NewBlockHashes(
let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
vec![
BlockHashNumber { hash: B256::random(), number: 5 },
BlockHashNumber { hash: B256::random(), number: 6 },
Expand Down Expand Up @@ -614,7 +614,7 @@ mod tests {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let server_key = SecretKey::new(&mut rand::thread_rng());
let test_msg: EthMessage = EthMessage::NewBlockHashes(
let test_msg = EthMessage::<EthNetworkPrimitives>::NewBlockHashes(
vec![
BlockHashNumber { hash: B256::random(), number: 5 },
BlockHashNumber { hash: B256::random(), number: 6 },
Expand Down
4 changes: 1 addition & 3 deletions crates/net/network/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,13 @@ impl<Tx, Eth, N: NetworkPrimitives> NetworkBuilder<Tx, Eth, N> {
let request_handler = EthRequestHandler::new(client, peers, rx);
NetworkBuilder { network, request_handler, transactions }
}
}

impl<Tx, Eth> NetworkBuilder<Tx, Eth> {
/// Creates a new [`TransactionsManager`] and wires it to the network.
pub fn transactions<Pool: TransactionPool>(
self,
pool: Pool,
transactions_manager_config: TransactionsManagerConfig,
) -> NetworkBuilder<TransactionsManager<Pool>, Eth> {
) -> NetworkBuilder<TransactionsManager<Pool, N>, Eth, N> {
let Self { mut network, request_handler, .. } = self;
let (tx, rx) = mpsc::unbounded_channel();
network.set_transactions(tx);
Expand Down
12 changes: 5 additions & 7 deletions crates/net/network/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,17 @@ where
}
}

impl<C> NetworkConfig<C>
impl<C, N> NetworkConfig<C, N>
where
C: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
> + HeaderProvider
N: NetworkPrimitives,
C: BlockReader<Block = N::Block, Receipt = reth_primitives::Receipt, Header = N::BlockHeader>
+ HeaderProvider
+ Clone
+ Unpin
+ 'static,
{
/// Starts the networking stack given a [`NetworkConfig`] and returns a handle to the network.
pub async fn start_network(self) -> Result<NetworkHandle, NetworkError> {
pub async fn start_network(self) -> Result<NetworkHandle<N>, NetworkError> {
let client = self.client.clone();
let (handle, network, _txpool, eth) = NetworkManager::builder::<C>(self)
.await?
Expand Down
10 changes: 6 additions & 4 deletions crates/net/network/src/eth_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
}
}

impl<C> EthRequestHandler<C>
impl<C, N> EthRequestHandler<C, N>
where
N: NetworkPrimitives,
C: BlockReader + HeaderProvider + ReceiptProvider<Receipt = reth_primitives::Receipt>,
{
/// Returns the list of requested headers
Expand Down Expand Up @@ -222,10 +223,11 @@ where
/// An endless future.
///
/// This should be spawned or used as part of `tokio::select!`.
impl<C> Future for EthRequestHandler<C>
impl<C, N> Future for EthRequestHandler<C, N>
where
C: BlockReader<Block = reth_primitives::Block, Receipt = reth_primitives::Receipt>
+ HeaderProvider<Header = reth_primitives::Header>
N: NetworkPrimitives,
C: BlockReader<Block = N::Block, Receipt = reth_primitives::Receipt>
+ HeaderProvider<Header = N::BlockHeader>
+ Unpin,
{
type Output = ();
Expand Down
15 changes: 10 additions & 5 deletions crates/net/network/src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,8 @@ mod tests {
#[tokio::test(flavor = "multi_thread")]
async fn test_poll_fetcher() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());

poll_fn(move |cx| {
assert!(fetcher.poll(cx).is_pending());
Expand All @@ -497,7 +498,8 @@ mod tests {
#[tokio::test]
async fn test_peer_rotation() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
// Add a few random peers
let peer1 = B512::random();
let peer2 = B512::random();
Expand All @@ -520,7 +522,8 @@ mod tests {
#[tokio::test]
async fn test_peer_prioritization() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
// Add a few random peers
let peer1 = B512::random();
let peer2 = B512::random();
Expand All @@ -545,7 +548,8 @@ mod tests {
#[tokio::test]
async fn test_on_block_headers_response() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
let peer_id = B512::random();

assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
Expand Down Expand Up @@ -575,7 +579,8 @@ mod tests {
#[tokio::test]
async fn test_header_response_outcome() {
let manager = PeersManager::new(PeersConfig::default());
let mut fetcher: StateFetcher = StateFetcher::new(manager.handle(), Default::default());
let mut fetcher =
StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
let peer_id = B512::random();

let request_pair = || {
Expand Down
15 changes: 10 additions & 5 deletions crates/net/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@
//! // The key that's used for encrypting sessions and to identify our node.
//! let local_key = rng_secret_key();
//!
//! let config = NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client);
//! let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
//! .boot_nodes(mainnet_nodes())
//! .build(client);
//!
//! // create the network instance
//! let network = NetworkManager::<EthNetworkPrimitives>::new(config).await.unwrap();
//! let network = NetworkManager::new(config).await.unwrap();
//!
//! // keep a handle to the network and spawn it
//! let handle = network.handle().clone();
Expand All @@ -73,7 +75,9 @@
//! ### Configure all components of the Network with the [`NetworkBuilder`]
//!
//! ```
//! use reth_network::{config::rng_secret_key, NetworkConfig, NetworkManager};
//! use reth_network::{
//! config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
//! };
//! use reth_network_peers::mainnet_nodes;
//! use reth_provider::test_utils::NoopProvider;
//! use reth_transaction_pool::TransactionPool;
Expand All @@ -84,8 +88,9 @@
//! // The key that's used for encrypting sessions and to identify our node.
//! let local_key = rng_secret_key();
//!
//! let config =
//! NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client.clone());
//! let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
//! .boot_nodes(mainnet_nodes())
//! .build(client.clone());
//! let transactions_manager_config = config.transactions_manager_config.clone();
//!
//! // create the network instance
Expand Down
9 changes: 6 additions & 3 deletions crates/net/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,9 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// components of the network
///
/// ```
/// use reth_network::{config::rng_secret_key, NetworkConfig, NetworkManager};
/// use reth_network::{
/// config::rng_secret_key, EthNetworkPrimitives, NetworkConfig, NetworkManager,
/// };
/// use reth_network_peers::mainnet_nodes;
/// use reth_provider::test_utils::NoopProvider;
/// use reth_transaction_pool::TransactionPool;
Expand All @@ -303,8 +305,9 @@ impl<N: NetworkPrimitives> NetworkManager<N> {
/// // The key that's used for encrypting sessions and to identify our node.
/// let local_key = rng_secret_key();
///
/// let config =
/// NetworkConfig::builder(local_key).boot_nodes(mainnet_nodes()).build(client.clone());
/// let config = NetworkConfig::<_, EthNetworkPrimitives>::builder(local_key)
/// .boot_nodes(mainnet_nodes())
/// .build(client.clone());
/// let transactions_manager_config = config.transactions_manager_config.clone();
///
/// // create the network instance
Expand Down
4 changes: 2 additions & 2 deletions crates/net/network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ impl<N: NetworkPrimitives> NetworkPeersEvents for NetworkHandle<N> {
}
}

impl NetworkEventListenerProvider for NetworkHandle<EthNetworkPrimitives> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<EthNetworkPrimitives>>> {
impl<N: NetworkPrimitives> NetworkEventListenerProvider<PeerRequest<N>> for NetworkHandle<N> {
fn event_listener(&self) -> EventStream<NetworkEvent<PeerRequest<N>>> {
self.inner.event_sender.new_listener()
}

Expand Down
9 changes: 5 additions & 4 deletions crates/net/network/src/session/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,16 @@ impl<N: NetworkPrimitives> Sink<EthMessage<N>> for EthRlpxConnection<N> {
mod tests {
use super::*;

const fn assert_eth_stream<St>()
const fn assert_eth_stream<N, St>()
where
St: Stream<Item = Result<EthMessage, EthStreamError>> + Sink<EthMessage>,
N: NetworkPrimitives,
St: Stream<Item = Result<EthMessage<N>, EthStreamError>> + Sink<EthMessage<N>>,
{
}

#[test]
const fn test_eth_stream_variants() {
assert_eth_stream::<EthSatelliteConnection>();
assert_eth_stream::<EthRlpxConnection>();
assert_eth_stream::<EthNetworkPrimitives, EthSatelliteConnection<EthNetworkPrimitives>>();
assert_eth_stream::<EthNetworkPrimitives, EthRlpxConnection<EthNetworkPrimitives>>();
}
}
4 changes: 2 additions & 2 deletions crates/net/network/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ mod tests {

use alloy_consensus::Header;
use alloy_primitives::B256;
use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthVersion};
use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
use reth_network_api::PeerRequestSender;
use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
use reth_network_peers::PeerId;
Expand All @@ -581,7 +581,7 @@ mod tests {
};

/// Returns a testing instance of the [`NetworkState`].
fn state() -> NetworkState {
fn state() -> NetworkState<EthNetworkPrimitives> {
let peers = PeersManager::default();
let handle = peers.handle();
NetworkState {
Expand Down
22 changes: 12 additions & 10 deletions crates/net/network/src/test_utils/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use crate::{
use futures::{FutureExt, StreamExt};
use pin_project::pin_project;
use reth_chainspec::{Hardforks, MAINNET};
use reth_eth_wire::{protocol::Protocol, DisconnectReason, HelloMessageWithProtocols};
use reth_eth_wire::{
protocol::Protocol, DisconnectReason, EthNetworkPrimitives, HelloMessageWithProtocols,
};
use reth_network_api::{
events::{PeerEvent, SessionInfo},
test_utils::{PeersHandle, PeersHandleProvider},
Expand Down Expand Up @@ -140,7 +142,7 @@ where
}

/// Returns all handles to the networks
pub fn handles(&self) -> impl Iterator<Item = NetworkHandle> + '_ {
pub fn handles(&self) -> impl Iterator<Item = NetworkHandle<EthNetworkPrimitives>> + '_ {
self.peers.iter().map(|p| p.handle())
}

Expand Down Expand Up @@ -346,11 +348,11 @@ impl<C, Pool> TestnetHandle<C, Pool> {
#[derive(Debug)]
pub struct Peer<C, Pool = TestPool> {
#[pin]
network: NetworkManager,
network: NetworkManager<EthNetworkPrimitives>,
#[pin]
request_handler: Option<EthRequestHandler<C>>,
request_handler: Option<EthRequestHandler<C, EthNetworkPrimitives>>,
#[pin]
transactions_manager: Option<TransactionsManager<Pool>>,
transactions_manager: Option<TransactionsManager<Pool, EthNetworkPrimitives>>,
pool: Option<Pool>,
client: C,
secret_key: SecretKey,
Expand Down Expand Up @@ -393,12 +395,12 @@ where
}

/// Returns mutable access to the network.
pub fn network_mut(&mut self) -> &mut NetworkManager {
pub fn network_mut(&mut self) -> &mut NetworkManager<EthNetworkPrimitives> {
&mut self.network
}

/// Returns the [`NetworkHandle`] of this peer.
pub fn handle(&self) -> NetworkHandle {
pub fn handle(&self) -> NetworkHandle<EthNetworkPrimitives> {
self.network.handle().clone()
}

Expand Down Expand Up @@ -506,8 +508,8 @@ pub struct PeerConfig<C = NoopProvider> {
/// A handle to a peer in the [`Testnet`].
#[derive(Debug)]
pub struct PeerHandle<Pool> {
network: NetworkHandle,
transactions: Option<TransactionsHandle>,
network: NetworkHandle<EthNetworkPrimitives>,
transactions: Option<TransactionsHandle<EthNetworkPrimitives>>,
pool: Option<Pool>,
}

Expand Down Expand Up @@ -545,7 +547,7 @@ impl<Pool> PeerHandle<Pool> {
}

/// Returns the [`NetworkHandle`] of this peer.
pub const fn network(&self) -> &NetworkHandle {
pub const fn network(&self) -> &NetworkHandle<EthNetworkPrimitives> {
&self.network
}
}
Expand Down
16 changes: 8 additions & 8 deletions crates/net/network/src/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,14 +282,14 @@ pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives
metrics: TransactionsManagerMetrics,
}

impl<Pool: TransactionPool> TransactionsManager<Pool> {
impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
/// Sets up a new instance.
///
/// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
pub fn new(
network: NetworkHandle,
network: NetworkHandle<N>,
pool: Pool,
from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent>,
from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
transactions_manager_config: TransactionsManagerConfig,
) -> Self {
let network_events = network.event_listener();
Expand Down Expand Up @@ -332,9 +332,7 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
metrics,
}
}
}

impl<Pool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
/// Returns a new handle that can send commands to this type.
pub fn handle(&self) -> TransactionsHandle<N> {
TransactionsHandle { manager_tx: self.command_tx.clone() }
Expand Down Expand Up @@ -1928,7 +1926,9 @@ mod tests {
use tests::fetcher::TxFetchMetadata;
use tracing::error;

async fn new_tx_manager() -> (TransactionsManager<TestPool>, NetworkManager) {
async fn new_tx_manager(
) -> (TransactionsManager<TestPool, EthNetworkPrimitives>, NetworkManager<EthNetworkPrimitives>)
{
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();

Expand Down Expand Up @@ -1959,7 +1959,7 @@ mod tests {
pub(super) fn new_mock_session(
peer_id: PeerId,
version: EthVersion,
) -> (PeerMetadata, mpsc::Receiver<PeerRequest>) {
) -> (PeerMetadata<EthNetworkPrimitives>, mpsc::Receiver<PeerRequest>) {
let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);

(
Expand Down Expand Up @@ -1991,7 +1991,7 @@ mod tests {

let client = NoopProvider::default();
let pool = testing_pool();
let config = NetworkConfigBuilder::new(secret_key)
let config = NetworkConfigBuilder::<EthNetworkPrimitives>::new(secret_key)
.disable_discovery()
.listener_port(0)
.build(client);
Expand Down
Loading

0 comments on commit 1f6b7d1

Please sign in to comment.