Skip to content

Commit

Permalink
more work
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Aug 10, 2024
1 parent 632b1e8 commit e48d96c
Show file tree
Hide file tree
Showing 11 changed files with 191 additions and 700 deletions.
1 change: 1 addition & 0 deletions examples/get_metadata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dht = { path = "../../packages/dht" }
handshake = { path = "../../packages/handshake" }
peer = { path = "../../packages/peer" }
select = { path = "../../packages/select" }
metainfo = {path ="../../packages/metainfo" }

clap = "4"
hex = "0"
Expand Down
120 changes: 98 additions & 22 deletions examples/get_metadata/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::fmt::Debug;
use std::fs::File;
use std::io::Write as _;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::{Arc, Once};
use std::time::Duration;

use clap::{Arg, ArgMatches, Command};
Expand All @@ -15,6 +15,7 @@ use handshake::{
DiscoveryInfo, Extension, Extensions, HandshakerBuilder, HandshakerConfig, InfoHash, InitiateMessage, PeerId, Protocol,
};
use hex::FromHex;
use metainfo::Metainfo;
use peer::messages::builders::ExtendedMessageBuilder;
use peer::messages::{BitsExtensionMessage, PeerExtensionProtocolMessage, PeerWireProtocolMessage};
use peer::protocols::{NullProtocol, PeerExtensionProtocol, PeerWireProtocol};
Expand All @@ -23,7 +24,11 @@ use peer::{
};
use select::discovery::{IDiscoveryMessage, ODiscoveryMessage, UtMetadataModule};
use select::{ControlMessage, IExtendedMessage, IUberMessage, OUberMessage, UberModuleBuilder};
use tokio::signal;
use tokio_util::codec::Framed;
use tracing::level_filters::LevelFilter;

pub static INIT: Once = Once::new();

// Legacy Handshaker, when bip_dht is migrated, it will accept S directly
struct LegacyHandshaker<S> {
Expand Down Expand Up @@ -103,9 +108,39 @@ fn extract_arguments(matches: &ArgMatches) -> (String, String) {
(hash, output)
}

pub fn tracing_stdout_init(filter: LevelFilter) {
let builder = tracing_subscriber::fmt()
.with_max_level(filter)
.with_ansi(true)
.with_writer(std::io::stdout);

builder.pretty().with_file(true).init();

tracing::info!("Logging initialized");
}

async fn ctrl_c() {
signal::ctrl_c().await.expect("failed to listen for event");
tracing::warn!("Ctrl-C received, shutting down...");
}

enum SendUber {
Finished(Result<(), select::error::Error>),
Interrupted,
}

enum MainDht {
Finished(Box<Metainfo>),
Interrupted,
}

#[allow(clippy::too_many_lines)]
#[tokio::main]
async fn main() {
INIT.call_once(|| {
tracing_stdout_init(LevelFilter::TRACE);
});

// Parse command-line arguments
let matches = parse_arguments();
let (hash, output) = extract_arguments(&matches);
Expand Down Expand Up @@ -170,12 +205,33 @@ async fn main() {
.into_parts();

// Tell the uber module we want to download metainfo for the given hash
uber_send
let send_to_uber = uber_send
.send(IUberMessage::Discovery(Box::new(IDiscoveryMessage::DownloadMetainfo(
info_hash,
))))
.await
.expect("it should send the instruction");
.boxed();

// Await either the sending to uber or the Ctrl-C signal
let send_to_uber = tokio::select! {
res = send_to_uber => SendUber::Finished(res),
() = ctrl_c() => SendUber::Interrupted,
};

let () = match send_to_uber {
SendUber::Finished(Ok(())) => (),

SendUber::Finished(Err(e)) => {
tracing::warn!("send to uber failed with error: {e}");
tasks.shutdown().await;
return;
}

SendUber::Interrupted => {
tracing::warn!("setup was canceled...");
tasks.shutdown().await;
return;
}
};

let timer = futures::stream::unfold(tokio::time::interval(Duration::from_millis(100)), |mut interval| async move {
interval.tick().await;
Expand All @@ -192,11 +248,11 @@ async fn main() {
let message = if let Either::Left(message) = item {
match message {
Ok(PeerManagerOutputMessage::PeerAdded(info)) => {
println!("Connected To Peer: {info:?}");
tracing::info!("Connected To Peer: {info:?}");
IUberMessage::Control(Box::new(ControlMessage::PeerConnected(info)))
}
Ok(PeerManagerOutputMessage::PeerRemoved(info)) => {
println!("We Removed Peer {info:?} From The Peer Manager");
tracing::info!("We Removed Peer {info:?} From The Peer Manager");
IUberMessage::Control(Box::new(ControlMessage::PeerDisconnected(info)))
}
Ok(PeerManagerOutputMessage::SentMessage(_, _)) => todo!(),
Expand All @@ -216,7 +272,7 @@ async fn main() {
_ => unimplemented!(),
},
Ok(PeerManagerOutputMessage::PeerDisconnect(info)) => {
println!("Peer {info:?} Disconnected From Us");
tracing::info!("Peer {info:?} Disconnected From Us");
IUberMessage::Control(Box::new(ControlMessage::PeerDisconnected(info)))
}
Err(e) => {
Expand All @@ -227,7 +283,7 @@ async fn main() {
| PeerManagerOutputError::PeerDisconnectedAndMissing(info) => info,
};

println!("Peer {info:?} Disconnected With Error: {e:?}");
tracing::info!("Peer {info:?} Disconnected With Error: {e:?}");
IUberMessage::Control(Box::new(ControlMessage::PeerDisconnected(info)))
}
}
Expand All @@ -241,25 +297,45 @@ async fn main() {

// Setup the dht which will be the only peer discovery service we use in this example
let legacy_handshaker = LegacyHandshaker::new(handshaker_send);
let dht = DhtBuilder::with_router(Router::uTorrent)
.set_read_only(false)
.start_mainline(legacy_handshaker)
.await
.expect("it should start the dht mainline");

println!("Bootstrapping Dht...");
while let Some(message) = dht.events().await.next().await {
if let DhtEvent::BootstrapCompleted = message {
break;
let main_dht = async move {
let dht = DhtBuilder::with_router(Router::uTorrent)
.set_read_only(false)
.start_mainline(legacy_handshaker)
.await
.expect("it should start the dht mainline");

tracing::info!("Bootstrapping Dht...");
while let Some(message) = dht.events().await.next().await {
if let DhtEvent::BootstrapCompleted = message {
break;
}
}
tracing::info!("Bootstrap Complete...");

dht.search(info_hash, true).await;

loop {
if let Some(Ok(OUberMessage::Discovery(ODiscoveryMessage::DownloadedMetainfo(metainfo)))) = uber_recv.next().await {
break metainfo;
}
}
}
println!("Bootstrap Complete...");
.boxed();

// Await either the sending to uber or the Ctrl-C signal
let main_dht = tokio::select! {
res = main_dht => MainDht::Finished(Box::new(res)),
() = ctrl_c() => MainDht::Interrupted,
};

dht.search(info_hash, true).await;
let metainfo = match main_dht {
MainDht::Finished(metainfo) => metainfo,

let metainfo = loop {
if let Some(Ok(OUberMessage::Discovery(ODiscoveryMessage::DownloadedMetainfo(metainfo)))) = uber_recv.next().await {
break metainfo;
MainDht::Interrupted => {
tracing::warn!("setup was canceled...");
tasks.shutdown().await;
return;
}
};

Expand Down
2 changes: 1 addition & 1 deletion examples/simple_torrent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub fn tracing_stdout_init(filter: LevelFilter) {

async fn ctrl_c() {
signal::ctrl_c().await.expect("failed to listen for event");
println!("Ctrl-C received, shutting down...");
tracing::warn!("Ctrl-C received, shutting down...");
}

#[tokio::main]
Expand Down
Loading

0 comments on commit e48d96c

Please sign in to comment.