Skip to content

Commit

Permalink
Refactor Dialer
Browse files Browse the repository at this point in the history
  • Loading branch information
hmzakhalid committed Jan 6, 2025
1 parent d207422 commit 0d42941
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 36 deletions.
2 changes: 1 addition & 1 deletion packages/ciphernode/net/src/bin/p2p_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async fn main() -> Result<()> {
peer.listen_on(udp_port.unwrap_or(0))?;

let name_clone = name.clone();
let swarm_handle = tokio::spawn(async move {
let swarm_handle = actix::spawn(async move {
println!("{} starting swarm", name_clone);
if let Err(e) = peer.start().await {
println!("{} swarm failed: {}", name_clone, e);
Expand Down
73 changes: 38 additions & 35 deletions packages/ciphernode/net/src/dialer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use libp2p::{
swarm::{dial_opts::DialOpts, ConnectionId, DialError},
Multiaddr,
};
use std::{collections::HashMap, net::ToSocketAddrs, sync::Arc, time::Duration};
use std::{net::ToSocketAddrs, sync::Arc, time::Duration};
use tokio::sync::mpsc;
use tracing::{info, warn};

Expand All @@ -21,6 +21,7 @@ pub struct DialPeer(pub String);

#[derive(Clone)]
struct PendingConnection {
id: ConnectionId,
addr: String,
attempt: u32,
delay_ms: u64,
Expand All @@ -30,18 +31,21 @@ struct PendingConnection {
pub struct Dialer {
net_bus: Addr<EventBus<NetworkPeerEvent>>,
tx: mpsc::Sender<NetworkPeerCommand>,
pending_connection: HashMap<ConnectionId, PendingConnection>,
pending_connection: Option<PendingConnection>,
target_addr: String,
}

impl Dialer {
pub fn new(
net_bus: Addr<EventBus<NetworkPeerEvent>>,
tx: mpsc::Sender<NetworkPeerCommand>,
target_addr: String,
) -> Addr<Self> {
let addr = Self {
net_bus: net_bus.clone(),
tx,
pending_connection: HashMap::new(),
pending_connection: None,
target_addr,
}
.start();

Expand All @@ -58,9 +62,8 @@ impl Dialer {
addr: String,
net_bus: Addr<EventBus<NetworkPeerEvent>>,
tx: mpsc::Sender<NetworkPeerCommand>,
) {
let dialer = Self::new(net_bus, tx);
dialer.do_send(DialPeer(addr));
) -> Addr<Self> {
Self::new(net_bus, tx, addr)
}

async fn attempt_dial(
Expand Down Expand Up @@ -89,14 +92,12 @@ impl Dialer {
match self.tx.send(NetworkPeerCommand::Dial(opts)).await {
Ok(_) => {
info!("Dialing {} with connection {}", addr, connection_id);
self.pending_connection.insert(
connection_id,
PendingConnection {
addr,
attempt,
delay_ms,
},
);
self.pending_connection = Some(PendingConnection {
id: connection_id,
addr,
attempt,
delay_ms,
});
Some(connection_id)
}
Err(e) => {
Expand Down Expand Up @@ -197,6 +198,17 @@ impl Dialer {

impl Actor for Dialer {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Self::Context) {
let mut dialer = self.clone();
let addr = self.target_addr.clone();
ctx.spawn(
async move {
dialer.attempt_dial(addr, 1, BACKOFF_DELAY).await;
}
.into_actor(self),
);
}
}

impl Handler<NetworkPeerEvent> for Dialer {
Expand All @@ -205,42 +217,33 @@ impl Handler<NetworkPeerEvent> for Dialer {
fn handle(&mut self, msg: NetworkPeerEvent, ctx: &mut Context<Self>) {
match msg {
NetworkPeerEvent::ConnectionEstablished { connection_id } => {
if let Some(conn) = self.pending_connection.remove(&connection_id) {
info!("Connection Established for {}", conn.addr);
if let Some(conn) = self.pending_connection.take() {
if conn.id == connection_id {
info!("Connection Established for {}", conn.addr);
}
}
}
NetworkPeerEvent::DialError {
connection_id,
error,
} => {
if let Some(conn) = self.pending_connection.remove(&connection_id) {
self.handle_connection_error(conn, error, ctx);
if let Some(conn) = self.pending_connection.take() {
if conn.id == connection_id {
self.handle_connection_error(conn, error, ctx);
}
}
}
NetworkPeerEvent::OutgoingConnectionError {
connection_id,
error,
} => {
if let Some(conn) = self.pending_connection.remove(&connection_id) {
self.handle_connection_error(conn, error, ctx);
if let Some(conn) = self.pending_connection.take() {
if conn.id == connection_id {
self.handle_connection_error(conn, error, ctx);
}
}
}
_ => {}
}
}
}

impl Handler<DialPeer> for Dialer {
type Result = Result<()>;

fn handle(&mut self, msg: DialPeer, ctx: &mut Context<Self>) -> Self::Result {
let mut dialer = self.clone();
ctx.spawn(
async move {
dialer.attempt_dial(msg.0, 1, BACKOFF_DELAY).await;
}
.into_actor(self),
);
Ok(())
}
}

0 comments on commit 0d42941

Please sign in to comment.