Skip to content

Commit

Permalink
refactor(autonomi): wait for peers in RT
Browse files Browse the repository at this point in the history
  • Loading branch information
b-zee committed Dec 16, 2024
1 parent 4c03fe6 commit 9c96680
Showing 1 changed file with 24 additions and 29 deletions.
53 changes: 24 additions & 29 deletions autonomi/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,19 @@ pub enum ConnectError {
/// Same as [`ConnectError::TimedOut`] but with a list of incompatible protocols.
#[error("Could not connect to peers due to incompatible protocol: {0:?}")]
TimedOutWithIncompatibleProtocol(HashSet<String>, String),

/// An error occurred while bootstrapping the client.
#[error("Failed to bootstrap the client")]
Bootstrap(#[from] ant_bootstrap::Error),
}

impl Client {
pub async fn init() -> Result<Self, ant_bootstrap::Error> {
pub async fn init() -> Result<Self, ConnectError> {
Self::init_with_config(ClientConfig::default()).await
}

pub async fn init_with_config(config: ClientConfig) -> Result<Self, ant_bootstrap::Error> {
let (network, _event_receiver) = build_client_and_run_swarm(config.local);
pub async fn init_with_config(config: ClientConfig) -> Result<Self, ConnectError> {
let (network, event_receiver) = build_client_and_run_swarm(config.local);

let peers_args = PeersArgs {
disable_mainnet_contacts: config.local,
Expand All @@ -109,34 +113,25 @@ impl Client {

let peers = match peers_args.get_addrs(None, None).await {
Ok(peers) => peers,
Err(e) => return Err(e),
Err(e) => return Err(e.into()),
};

// let network_clone = network.clone();
// let peers = peers.to_vec();
// let _handle = ant_networking::target_arch::spawn(async move {
// for addr in peers {
// if let Err(err) = network_clone.dial(addr.clone()).await {
// error!("Failed to dial addr={addr} with err: {err:?}");
// eprintln!("addr={addr} Failed to dial: {err:?}");
// };
// }
// });
let peers_len = peers.len();
// Add peers to the routing table.
let peers_with_p2p: Vec<_> = peers
.into_iter()
.filter_map(|addr| {
addr.iter().find_map(|p| match p {
libp2p::multiaddr::Protocol::P2p(id) => Some((id, addr.clone())),
_ => None,
})
})
.collect();
if peers_with_p2p.len() < peers_len {
tracing::warn!("Some bootstrap addresses have no peer ID, skipping them");
}
let _ = network.add_peer_addresses(peers_with_p2p).await;
let network_clone = network.clone();
let peers = peers.to_vec();
let _handle = ant_networking::target_arch::spawn(async move {
for addr in peers {
if let Err(err) = network_clone.dial(addr.clone()).await {
error!("Failed to dial addr={addr} with err: {err:?}");
eprintln!("addr={addr} Failed to dial: {err:?}");
};
}
});

// Wait until we have added a few peers to our routing table.
let (sender, receiver) = futures::channel::oneshot::channel();
ant_networking::target_arch::spawn(handle_event_receiver(event_receiver, sender));
receiver.await.expect("sender should not close")?;
debug!("Client is connected to the network");

Ok(Self {
network,
Expand Down

0 comments on commit 9c96680

Please sign in to comment.