Skip to content

Commit

Permalink
more test work
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Aug 8, 2024
1 parent 60f8782 commit 45fa919
Show file tree
Hide file tree
Showing 39 changed files with 754 additions and 570 deletions.
3 changes: 2 additions & 1 deletion packages/dht/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ handshake = { path = "../handshake" }
util = { path = "../util" }

crc = "3"
log = "0"
futures = "0"
tokio = { version = "1", features = ["full"] }
rand = "0"
chrono = "0"
thiserror = "1"
tracing = "0"
tracing-subscriber = "0"
31 changes: 14 additions & 17 deletions packages/dht/examples/debug.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,16 @@
use std::collections::HashSet;
use std::io::Read;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Once;

use dht::handshaker_trait::HandshakerTrait;
use dht::{DhtBuilder, Router};
use futures::future::BoxFuture;
use futures::StreamExt;
use tracing::level_filters::LevelFilter;
use util::bt::{InfoHash, PeerId};

struct SimpleLogger;

impl log::Log for SimpleLogger {
fn enabled(&self, metadata: &log::Metadata<'_>) -> bool {
metadata.level() <= log::Level::Info
}

fn log(&self, record: &log::Record<'_>) {
if self.enabled(record.metadata()) {
println!("{} - {}", record.level(), record.args());
}
}

fn flush(&self) {}
}
static INIT: Once = Once::new();

struct SimpleHandshaker {
filter: HashSet<SocketAddr>,
Expand Down Expand Up @@ -63,10 +51,19 @@ impl HandshakerTrait for SimpleHandshaker {
fn metadata(&mut self, (): Self::MetadataEnvelope) {}
}

fn tracing_stdout_init(filter: LevelFilter) {
let builder = tracing_subscriber::fmt().with_max_level(filter).with_ansi(true);

builder.pretty().with_file(true).init();

tracing::info!("Logging initialized");
}

#[tokio::main]
async fn main() {
log::set_logger(&SimpleLogger).unwrap();
log::set_max_level(log::LevelFilter::max());
INIT.call_once(|| {
tracing_stdout_init(LevelFilter::INFO);
});

let hash = InfoHash::from_bytes(b"My Unique Info Hash");

Expand Down
9 changes: 4 additions & 5 deletions packages/dht/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::sync::Arc;

use futures::channel::mpsc::{self, Receiver, Sender};
use futures::SinkExt as _;
use log::warn;
use tokio::net::UdpSocket;
use util::bt::InfoHash;
use util::net;
Expand Down Expand Up @@ -49,7 +48,7 @@ impl MainlineDht {
.await
.is_err()
{
warn!("bip_dt: MainlineDht failed to send a start bootstrap message...");
tracing::warn!("bip_dt: MainlineDht failed to send a start bootstrap message...");
}

Ok(MainlineDht { main_task_sender })
Expand All @@ -71,7 +70,7 @@ impl MainlineDht {
.await
.is_err()
{
warn!("bip_dht: MainlineDht failed to send a start lookup message...");
tracing::warn!("bip_dht: MainlineDht failed to send a start lookup message...");
}
}

Expand All @@ -84,7 +83,7 @@ impl MainlineDht {
let (send, recv) = mpsc::channel(1);

if let Err(e) = self.main_task_sender.clone().send(OneshotTask::RegisterSender(send)).await {
warn!("bip_dht: MainlineDht failed to send a register sender message..., {e}");
tracing::warn!("bip_dht: MainlineDht failed to send a register sender message..., {e}");
// TODO: Should we push a Shutdown event through the sender here? We would need
// to know the cause or create a new cause for this specific scenario since the
// client could have been lazy and wasn't monitoring this until after it shutdown!
Expand All @@ -102,7 +101,7 @@ impl Drop for MainlineDht {
.try_send(OneshotTask::Shutdown(ShutdownCause::ClientInitiated))
.is_err()
{
warn!(
tracing::warn!(
"bip_dht: MainlineDht failed to send a shutdown message (may have already been \
shutdown)..."
);
Expand Down
2 changes: 1 addition & 1 deletion packages/dht/src/routing/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl Node {
/// Record that we sent the node a request.
pub fn local_request(&self) {
if self.status() != NodeStatus::Good {
let num_requests = self.refresh_requests.fetch_add(1, Ordering::SeqCst);
let num_requests = self.refresh_requests.fetch_add(1, Ordering::SeqCst) + 1;
}
}

Expand Down
17 changes: 8 additions & 9 deletions packages/dht/src/worker/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex, RwLock};
use futures::channel::mpsc::Sender;
use futures::future::BoxFuture;
use futures::{FutureExt as _, SinkExt as _};
use log::{error, info, warn};
use tokio::time::{sleep, Duration};
use util::bt::{self, NodeId};

Expand Down Expand Up @@ -83,7 +82,7 @@ impl TableBootstrap {
.await
.is_err()
{
error!("bip_dht: Failed to send scheduled task check for bootstrap timeout");
tracing::error!("bip_dht: Failed to send scheduled task check for bootstrap timeout");
}
});

Expand All @@ -97,7 +96,7 @@ impl TableBootstrap {
// Ping all initial routers and nodes
for addr in self.starting_routers.iter().chain(self.starting_nodes.iter()) {
if out.send((find_node_msg.clone(), *addr)).await.is_err() {
error!("bip_dht: Failed to send bootstrap message to router through channel...");
tracing::error!("bip_dht: Failed to send bootstrap message to router through channel...");
return BootstrapStatus::Failed;
}
}
Expand All @@ -123,7 +122,7 @@ impl TableBootstrap {
let _timeout = if let Some(t) = self.active_messages.lock().unwrap().get(&trans_id) {
*t
} else {
warn!(
tracing::warn!(
"bip_dht: Received expired/unsolicited node response for an active table \
bootstrap..."
);
Expand Down Expand Up @@ -157,7 +156,7 @@ impl TableBootstrap {
H: HandshakerTrait + 'static,
{
if self.active_messages.lock().unwrap().remove(&trans_id).is_none() {
warn!(
tracing::warn!(
"bip_dht: Received expired/unsolicited node timeout for an active table \
bootstrap..."
);
Expand Down Expand Up @@ -276,7 +275,7 @@ impl TableBootstrap {
{
let bootstrap_bucket = self.curr_bootstrap_bucket.load(Ordering::Relaxed);

info!("bip_dht: bootstrap::send_bootstrap_requests {}", bootstrap_bucket);
tracing::info!("bip_dht: bootstrap::send_bootstrap_requests {}", bootstrap_bucket);

let mut messages_sent = 0;

Expand All @@ -290,7 +289,7 @@ impl TableBootstrap {

// Send the message to the node
if out.send((find_node_msg, node.addr())).await.is_err() {
error!("bip_dht: Could not send a bootstrap message through the channel...");
tracing::error!("bip_dht: Could not send a bootstrap message through the channel...");
return BootstrapStatus::Failed;
}

Expand All @@ -311,12 +310,12 @@ impl TableBootstrap {
.await
.is_err()
{
error!("bip_dht: Failed to send scheduled task check for bootstrap timeout");
tracing::error!("bip_dht: Failed to send scheduled task check for bootstrap timeout");
}
});
}

let bootstrap_bucket = self.curr_bootstrap_bucket.fetch_add(1, Ordering::Relaxed);
let bootstrap_bucket = self.curr_bootstrap_bucket.fetch_add(1, Ordering::AcqRel) + 1;

if (bootstrap_bucket) == table::MAX_BUCKETS {
BootstrapStatus::Completed
Expand Down
Loading

0 comments on commit 45fa919

Please sign in to comment.