diff --git a/packages/ciphernode/net/src/bin/p2p_test.rs b/packages/ciphernode/net/src/bin/p2p_test.rs index ea89fccb..f5533618 100644 --- a/packages/ciphernode/net/src/bin/p2p_test.rs +++ b/packages/ciphernode/net/src/bin/p2p_test.rs @@ -59,7 +59,7 @@ async fn main() -> Result<()> { peer.listen_on(udp_port.unwrap_or(0))?; let name_clone = name.clone(); - let swarm_handle = tokio::spawn(async move { + let swarm_handle = actix::spawn(async move { println!("{} starting swarm", name_clone); if let Err(e) = peer.start().await { println!("{} swarm failed: {}", name_clone, e); diff --git a/packages/ciphernode/net/src/dialer.rs b/packages/ciphernode/net/src/dialer.rs index b8d6cbe0..2f532337 100644 --- a/packages/ciphernode/net/src/dialer.rs +++ b/packages/ciphernode/net/src/dialer.rs @@ -7,7 +7,7 @@ use libp2p::{ swarm::{dial_opts::DialOpts, ConnectionId, DialError}, Multiaddr, }; -use std::{collections::HashMap, net::ToSocketAddrs, sync::Arc, time::Duration}; +use std::{net::ToSocketAddrs, sync::Arc, time::Duration}; use tokio::sync::mpsc; use tracing::{info, warn}; @@ -21,6 +21,7 @@ pub struct DialPeer(pub String); #[derive(Clone)] struct PendingConnection { + id: ConnectionId, addr: String, attempt: u32, delay_ms: u64, @@ -30,18 +31,21 @@ struct PendingConnection { pub struct Dialer { net_bus: Addr>, tx: mpsc::Sender, - pending_connection: HashMap, + pending_connection: Option, + target_addr: String, } impl Dialer { pub fn new( net_bus: Addr>, tx: mpsc::Sender, + target_addr: String, ) -> Addr { let addr = Self { net_bus: net_bus.clone(), tx, - pending_connection: HashMap::new(), + pending_connection: None, + target_addr, } .start(); @@ -58,9 +62,8 @@ impl Dialer { addr: String, net_bus: Addr>, tx: mpsc::Sender, - ) { - let dialer = Self::new(net_bus, tx); - dialer.do_send(DialPeer(addr)); + ) -> Addr { + Self::new(net_bus, tx, addr) } async fn attempt_dial( @@ -89,14 +92,12 @@ impl Dialer { match self.tx.send(NetworkPeerCommand::Dial(opts)).await { Ok(_) => { info!("Dialing {} with connection {}", addr, connection_id); - self.pending_connection.insert( - connection_id, - PendingConnection { - addr, - attempt, - delay_ms, - }, - ); + self.pending_connection = Some(PendingConnection { + id: connection_id, + addr, + attempt, + delay_ms, + }); Some(connection_id) } Err(e) => { @@ -197,6 +198,17 @@ impl Dialer { impl Actor for Dialer { type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + let mut dialer = self.clone(); + let addr = self.target_addr.clone(); + ctx.spawn( + async move { + dialer.attempt_dial(addr, 1, BACKOFF_DELAY).await; + } + .into_actor(self), + ); + } } impl Handler for Dialer { @@ -205,42 +217,33 @@ impl Handler for Dialer { fn handle(&mut self, msg: NetworkPeerEvent, ctx: &mut Context) { match msg { NetworkPeerEvent::ConnectionEstablished { connection_id } => { - if let Some(conn) = self.pending_connection.remove(&connection_id) { - info!("Connection Established for {}", conn.addr); + if let Some(conn) = self.pending_connection.take() { + if conn.id == connection_id { + info!("Connection Established for {}", conn.addr); + } } } NetworkPeerEvent::DialError { connection_id, error, } => { - if let Some(conn) = self.pending_connection.remove(&connection_id) { - self.handle_connection_error(conn, error, ctx); + if let Some(conn) = self.pending_connection.take() { + if conn.id == connection_id { + self.handle_connection_error(conn, error, ctx); + } } } NetworkPeerEvent::OutgoingConnectionError { connection_id, error, } => { - if let Some(conn) = self.pending_connection.remove(&connection_id) { - self.handle_connection_error(conn, error, ctx); + if let Some(conn) = self.pending_connection.take() { + if conn.id == connection_id { + self.handle_connection_error(conn, error, ctx); + } } } _ => {} } } } - -impl Handler for Dialer { - type Result = Result<()>; - - fn handle(&mut self, msg: DialPeer, ctx: &mut Context) -> Self::Result { - let mut dialer = self.clone(); - ctx.spawn( - async move { - dialer.attempt_dial(msg.0, 1, BACKOFF_DELAY).await; - } - .into_actor(self), - ); - Ok(()) - } -}