From 3706d7706f3c217e9565f2dc91182891eb4e27e9 Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Sun, 11 Aug 2024 14:32:44 +0200 Subject: [PATCH 1/6] clippy: fix unneeded match in filters.rs --- packages/handshake/src/filter/filters.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/packages/handshake/src/filter/filters.rs b/packages/handshake/src/filter/filters.rs index 8af1cf151..177e41f6f 100644 --- a/packages/handshake/src/filter/filters.rs +++ b/packages/handshake/src/filter/filters.rs @@ -21,11 +21,8 @@ impl Filters { self.write_filters(|mut_filters| { let opt_found = check_index(&mut_filters[..], &filter); - match opt_found { - Some(_) => (), - None => { - mut_filters.push(Box::new(filter)); - } + if opt_found.is_none() { + mut_filters.push(Box::new(filter)); } }); } @@ -93,9 +90,8 @@ where .downcast_ref::() .map(|downcast_filter| downcast_filter == filter); - match opt_match { - Some(true) => return Some(index), - Some(false) | None => (), + if let Some(true) = opt_match { + return Some(index); } } From 672b86939eb493250384c1236bc95664317bb35c Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Sun, 11 Aug 2024 13:06:33 +0200 Subject: [PATCH 2/6] refactor: move tests into tests folder --- .cargo/config.toml | 6 + .gitignore | 1 + .vscode/settings.json | 15 +- Cargo.toml | 18 +- cSpell.json | 1 + examples/get_metadata/Cargo.toml | 8 +- examples/simple_torrent/Cargo.toml | 4 +- packages/bencode/Cargo.toml | 8 +- packages/bencode/{test => tests}/mod.rs | 0 packages/dht/Cargo.toml | 8 +- packages/disk/Cargo.toml | 14 +- packages/disk/{test => tests}/add_torrent.rs | 11 +- packages/disk/{test => tests/common}/mod.rs | 22 +-- .../disk/{test => tests}/complete_torrent.rs | 11 +- .../disk_manager_send_backpressure.rs | 9 +- packages/disk/{test => tests}/load_block.rs | 9 +- .../disk/{test => tests}/process_block.rs | 9 +- .../disk/{test => tests}/remove_torrent.rs | 11 +- .../disk/{test => tests}/resume_torrent.rs | 15 +- packages/handshake/Cargo.toml | 9 +- packages/handshake/test/mod.rs | 15 -- packages/handshake/tests/common/mod.rs | 8 + .../test_byte_after_handshake.rs | 2 + .../test_bytes_after_handshake.rs | 2 + .../handshake/{test => tests}/test_connect.rs | 2 + .../{test => tests}/test_filter_allow_all.rs | 3 +- .../{test => tests}/test_filter_block_all.rs | 3 +- .../test_filter_whitelist_diff_data.rs | 3 +- .../test_filter_whitelist_same_data.rs | 3 +- packages/htracker/Cargo.toml | 4 +- packages/lpd/Cargo.toml | 4 +- packages/magnet/Cargo.toml | 7 +- packages/metainfo/Cargo.toml | 13 +- packages/peer/Cargo.toml | 13 +- packages/peer/{test => tests/common}/mod.rs | 2 - .../peer_manager_send_backpressure.rs | 5 +- packages/select/Cargo.toml | 8 +- packages/select/src/revelation/honest.rs | 180 ------------------ packages/select/tests/select_tests.rs | 174 +++++++++++++++++ packages/util/Cargo.toml | 3 +- packages/utp/Cargo.toml | 6 +- packages/utracker/Cargo.toml | 8 +- .../utracker/{test => tests/common}/mod.rs | 20 +- .../{test => tests}/test_announce_start.rs | 3 +- .../{test => tests}/test_announce_stop.rs | 3 +- .../{test => tests}/test_client_drop.rs | 3 +- .../{test => tests}/test_client_full.rs | 3 +- .../utracker/{test => tests}/test_connect.rs | 3 +- .../{test => tests}/test_connect_cache.rs | 3 +- .../utracker/{test => tests}/test_scrape.rs | 3 +- .../{test => tests}/test_server_drop.rs | 3 +- 51 files changed, 350 insertions(+), 351 deletions(-) rename packages/bencode/{test => tests}/mod.rs (100%) rename packages/disk/{test => tests}/add_torrent.rs (87%) rename packages/disk/{test => tests/common}/mod.rs (94%) rename packages/disk/{test => tests}/complete_torrent.rs (94%) rename packages/disk/{test => tests}/disk_manager_send_backpressure.rs (88%) rename packages/disk/{test => tests}/load_block.rs (89%) rename packages/disk/{test => tests}/process_block.rs (90%) rename packages/disk/{test => tests}/remove_torrent.rs (86%) rename packages/disk/{test => tests}/resume_torrent.rs (95%) delete mode 100644 packages/handshake/test/mod.rs create mode 100644 packages/handshake/tests/common/mod.rs rename packages/handshake/{test => tests}/test_byte_after_handshake.rs (99%) rename packages/handshake/{test => tests}/test_bytes_after_handshake.rs (99%) rename packages/handshake/{test => tests}/test_connect.rs (99%) rename packages/handshake/{test => tests}/test_filter_allow_all.rs (98%) rename packages/handshake/{test => tests}/test_filter_block_all.rs (98%) rename packages/handshake/{test => tests}/test_filter_whitelist_diff_data.rs (98%) rename packages/handshake/{test => tests}/test_filter_whitelist_same_data.rs (98%) rename packages/peer/{test => tests/common}/mod.rs (97%) rename packages/peer/{test => tests}/peer_manager_send_backpressure.rs (96%) create mode 100644 packages/select/tests/select_tests.rs rename packages/utracker/{test => tests/common}/mod.rs (94%) rename packages/utracker/{test => tests}/test_announce_start.rs (96%) rename packages/utracker/{test => tests}/test_announce_stop.rs (97%) rename packages/utracker/{test => tests}/test_client_drop.rs (97%) rename packages/utracker/{test => tests}/test_client_full.rs (97%) rename packages/utracker/{test => tests}/test_connect.rs (95%) rename packages/utracker/{test => tests}/test_connect_cache.rs (95%) rename packages/utracker/{test => tests}/test_scrape.rs (95%) rename packages/utracker/{test => tests}/test_server_drop.rs (95%) diff --git a/.cargo/config.toml b/.cargo/config.toml index ab9e050b0..a88455f3d 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,5 @@ +[env] + [build] rustflags = [ "-D", @@ -17,3 +19,7 @@ rustflags = [ "-D", "unused", ] + +[term] +quiet = false +verbose = true diff --git a/.gitignore b/.gitignore index 45da52020..755d4bcb1 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ # Generated by Cargo target/ Cargo.lock +rustc-ice* # Generated by Rustfmt *.bk diff --git a/.vscode/settings.json b/.vscode/settings.json index 1c04abd07..ccc6bd226 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,6 +2,14 @@ "[rust]": { "editor.formatOnSave": true }, + "[ignore]": { + "rust-analyzer.cargo.extraEnv": { + "RUSTFLAGS": "-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests", + "RUSTDOCFLAGS": "-Z profile -C codegen-units=1 -C inline-threshold=0 -C link-dead-code -C overflow-checks=off -C panic=abort -Z panic_abort_tests", + "CARGO_INCREMENTAL": "0", + "RUST_BACKTRACE": "1" + } + }, "rust-analyzer.checkOnSave": true, "rust-analyzer.check.command": "clippy", "rust-analyzer.check.allTargets": true, @@ -19,5 +27,10 @@ "clippy::style", "-W", "clippy::pedantic" - ] + ], + "evenBetterToml.formatter.allowedBlankLines": 1, + "evenBetterToml.formatter.columnWidth": 130, + "evenBetterToml.formatter.trailingNewline": true, + "evenBetterToml.formatter.reorderKeys": true, + "evenBetterToml.formatter.reorderArrays": true, } \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 6f29c0b69..e8fd1d4f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,7 @@ [workspace] members = [ + "examples/get_metadata", + "examples/simple_torrent", "packages/bencode", "packages/dht", "packages/disk", @@ -13,16 +15,12 @@ members = [ "packages/util", "packages/utp", "packages/utracker", - "examples/get_metadata", - "examples/simple_torrent", ] resolver = "2" [workspace.package] -authors = [ - "Nautilus Cyberneering , Andrew ", -] +authors = ["Nautilus Cyberneering , Andrew "] categories = ["network-programming", "web-programming"] description = "A collection of crates for building applications using bittorrent technologies." documentation = "https://github.com/torrust/bittorrent-infrastructure-project" @@ -30,16 +28,16 @@ edition = "2021" homepage = "https://github.com/torrust/bittorrent-infrastructure-project" keywords = ["bittorrent"] license = "Apache-2.0" -publish = false # until we decide where to publish. +publish = false # until we decide where to publish. repository = "https://github.com/torrust/bittorrent-infrastructure-project" rust-version = "1.71" version = "1.0.0-alpha.1" [profile.bench] -opt-level = 3 +codegen-units = 1 debug = false -rpath = false -lto = false debug-assertions = false -codegen-units = 1 +lto = false +opt-level = 3 panic = 'unwind' +rpath = false diff --git a/cSpell.json b/cSpell.json index 3bd08ddb0..97205b8f0 100644 --- a/cSpell.json +++ b/cSpell.json @@ -75,6 +75,7 @@ "subsecond", "Swatinem", "taiki", + "thiserror", "transmissionbt", "umio", "unchoke", diff --git a/examples/get_metadata/Cargo.toml b/examples/get_metadata/Cargo.toml index fdf3bf912..d8ea5f419 100644 --- a/examples/get_metadata/Cargo.toml +++ b/examples/get_metadata/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "get_metadata" description = "Examples For bip-rs" +name = "get_metadata" readme = "README.md" authors.workspace = true @@ -21,7 +21,7 @@ select = { path = "../../packages/select" } clap = "3" futures = "0.1" -tokio-core = "0.1" -tokio-codec = "0.1" -pendulum = "0.3" hex = "0.4" +pendulum = "0.3" +tokio-codec = "0.1" +tokio-core = "0.1" diff --git a/examples/simple_torrent/Cargo.toml b/examples/simple_torrent/Cargo.toml index f8e87ba2b..6e4d18046 100644 --- a/examples/simple_torrent/Cargo.toml +++ b/examples/simple_torrent/Cargo.toml @@ -1,6 +1,6 @@ [package] -name = "simple_torrent" description = "Examples For bip-rs" +name = "simple_torrent" readme = "README.md" authors.workspace = true @@ -22,5 +22,5 @@ peer = { path = "../../packages/peer" } clap = "3" futures = "0.1" -tokio-core = "0.1" tokio-codec = "0.1" +tokio-core = "0.1" diff --git a/packages/bencode/Cargo.toml b/packages/bencode/Cargo.toml index 4768893ab..62e701a10 100644 --- a/packages/bencode/Cargo.toml +++ b/packages/bencode/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "bencode" description = "Efficient decoding and encoding for bencode." keywords = ["bencode"] +name = "bencode" readme = "README.md" authors.workspace = true @@ -21,10 +21,6 @@ error-chain = "0.12" [dev-dependencies] criterion = "0.5" -[[test]] -name = "test" -path = "test/mod.rs" - [[bench]] +harness = false name = "bencode_benchmark" -harness = false \ No newline at end of file diff --git a/packages/bencode/test/mod.rs b/packages/bencode/tests/mod.rs similarity index 100% rename from packages/bencode/test/mod.rs rename to packages/bencode/tests/mod.rs diff --git a/packages/dht/Cargo.toml b/packages/dht/Cargo.toml index 90bbf4e36..e00b8e08f 100644 --- a/packages/dht/Cargo.toml +++ b/packages/dht/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "dht" description = "Implementation of the bittorrent mainline DHT" -keywords = ["dht", "distributed", "hash", "mainline", "bittorrent"] +keywords = ["bittorrent", "dht", "distributed", "hash", "mainline"] +name = "dht" readme = "README.md" authors.workspace = true @@ -20,9 +20,9 @@ bencode = { path = "../bencode" } handshake = { path = "../handshake" } util = { path = "../util" } +chrono = "0.4" crc = "3" +error-chain = "0.12" log = "0.4" mio = "0.5" rand = "0.8" -chrono = "0.4" -error-chain = "0.12" diff --git a/packages/disk/Cargo.toml b/packages/disk/Cargo.toml index 666b4ef10..bd5c14171 100644 --- a/packages/disk/Cargo.toml +++ b/packages/disk/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "disk" description = "Bittorrent Infrastructure Project Disk Module" -keywords = ["filesystem", "fs", "disk"] +keywords = ["disk", "filesystem", "fs"] +name = "disk" readme = "README.md" authors.workspace = true @@ -21,21 +21,17 @@ util = { path = "../util" } bytes = "0.4" crossbeam = "0.8" +error-chain = "0.12" futures = "0.1" futures-cpupool = "0.1" -error-chain = "0.12" log = "0.4" lru-cache = "0.1" [dev-dependencies] +criterion = "0.5" rand = "0.8" tokio-core = "0.1" -criterion = "0.5" - -[[test]] -name = "test" -path = "test/mod.rs" [[bench]] +harness = false name = "disk_benchmark" -harness = false \ No newline at end of file diff --git a/packages/disk/test/add_torrent.rs b/packages/disk/tests/add_torrent.rs similarity index 87% rename from packages/disk/test/add_torrent.rs rename to packages/disk/tests/add_torrent.rs index 74150dbcd..10755c320 100644 --- a/packages/disk/test/add_torrent.rs +++ b/packages/disk/tests/add_torrent.rs @@ -1,3 +1,4 @@ +use common::{core_loop_with_timeout, random_buffer, InMemoryFileSystem, MultiFileDirectAccessor}; use disk::{DiskManagerBuilder, FileSystem, IDiskMessage, ODiskMessage}; use futures::future::{Future, Loop}; use futures::sink::Sink; @@ -5,14 +6,14 @@ use futures::stream::Stream; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; use tokio_core::reactor::Core; -use crate::{InMemoryFileSystem, MultiFileDirectAccessor}; +mod common; #[test] fn positive_add_torrent() { // Create some "files" as random bytes - let data_a = (crate::random_buffer(50), "/path/to/file/a".into()); - let data_b = (crate::random_buffer(2000), "/path/to/file/b".into()); - let data_c = (crate::random_buffer(0), "/path/to/file/c".into()); + let data_a = (random_buffer(50), "/path/to/file/a".into()); + let data_b = (random_buffer(2000), "/path/to/file/b".into()); + let data_c = (random_buffer(0), "/path/to/file/c".into()); // Create our accessor for our in memory files and create a torrent file for them let files_accessor = @@ -34,7 +35,7 @@ fn positive_add_torrent() { let mut core = Core::new().unwrap(); // Run a core loop until we get the TorrentAdded message - let good_pieces = crate::core_loop_with_timeout(&mut core, 500, (0, recv), |good_pieces, recv, msg| match msg { + let good_pieces = core_loop_with_timeout(&mut core, 500, (0, recv), |good_pieces, recv, msg| match msg { ODiskMessage::TorrentAdded(_) => Loop::Break(good_pieces), ODiskMessage::FoundGoodPiece(_, _) => Loop::Continue((good_pieces + 1, recv)), unexpected => panic!("Unexpected Message: {unexpected:?}"), diff --git a/packages/disk/test/mod.rs b/packages/disk/tests/common/mod.rs similarity index 94% rename from packages/disk/test/mod.rs rename to packages/disk/tests/common/mod.rs index f96e09f4c..017dea4c8 100644 --- a/packages/disk/test/mod.rs +++ b/packages/disk/tests/common/mod.rs @@ -14,16 +14,8 @@ use metainfo::{Accessor, IntoAccessor, PieceAccess}; use tokio_core::reactor::{Core, Timeout}; use util::bt::InfoHash; -mod add_torrent; -mod complete_torrent; -mod disk_manager_send_backpressure; -mod load_block; -mod process_block; -mod remove_torrent; -mod resume_torrent; - /// Generate buffer of size random bytes. -fn random_buffer(size: usize) -> Vec { +pub fn random_buffer(size: usize) -> Vec { let mut buffer = vec![0u8; size]; rand::Rng::fill(&mut rand::thread_rng(), buffer.as_mut_slice()); @@ -34,7 +26,8 @@ fn random_buffer(size: usize) -> Vec { /// Initiate a core loop with the given timeout, state, and closure. /// /// Returns R or panics if an error occurred in the loop (including a timeout). -fn core_loop_with_timeout(core: &mut Core, timeout_ms: u64, state: (I, S), call: F) -> R +#[allow(dead_code)] +pub fn core_loop_with_timeout(core: &mut Core, timeout_ms: u64, state: (I, S), call: F) -> R where F: FnMut(I, S, S::Item) -> Loop, S: Stream, @@ -63,7 +56,8 @@ where } /// Send block with the given metadata and entire data given. -fn send_block( +#[allow(dead_code)] +pub fn send_block( blocking_send: &mut Wait, data: &[u8], hash: InfoHash, @@ -90,7 +84,7 @@ fn send_block( //----------------------------------------------------------------------------// /// Allow us to mock out multi file torrents. -struct MultiFileDirectAccessor { +pub struct MultiFileDirectAccessor { dir: PathBuf, files: Vec<(Vec, PathBuf)>, } @@ -144,7 +138,7 @@ impl Accessor for MultiFileDirectAccessor { /// Allow us to mock out the file system. #[derive(Clone)] -struct InMemoryFileSystem { +pub struct InMemoryFileSystem { files: Arc>>>, } @@ -165,7 +159,7 @@ impl InMemoryFileSystem { } } -struct InMemoryFile { +pub struct InMemoryFile { path: PathBuf, } diff --git a/packages/disk/test/complete_torrent.rs b/packages/disk/tests/complete_torrent.rs similarity index 94% rename from packages/disk/test/complete_torrent.rs rename to packages/disk/tests/complete_torrent.rs index 02088e3aa..27a38ab4b 100644 --- a/packages/disk/test/complete_torrent.rs +++ b/packages/disk/tests/complete_torrent.rs @@ -1,3 +1,4 @@ +use common::{core_loop_with_timeout, random_buffer, send_block, InMemoryFileSystem, MultiFileDirectAccessor}; use disk::{DiskManagerBuilder, IDiskMessage, ODiskMessage}; use futures::future::Loop; use futures::sink::Sink; @@ -5,14 +6,14 @@ use futures::stream::Stream; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; use tokio_core::reactor::Core; -use crate::{InMemoryFileSystem, MultiFileDirectAccessor}; +mod common; #[allow(clippy::too_many_lines)] #[test] fn positive_complete_torrent() { // Create some "files" as random bytes - let data_a = (crate::random_buffer(1023), "/path/to/file/a".into()); - let data_b = (crate::random_buffer(2000), "/path/to/file/b".into()); + let data_a = (random_buffer(1023), "/path/to/file/a".into()); + let data_b = (random_buffer(2000), "/path/to/file/b".into()); // Create our accessor for our in memory files and create a torrent file for them let files_accessor = MultiFileDirectAccessor::new("/my/downloads/".into(), vec![data_a.clone(), data_b.clone()]); @@ -34,7 +35,7 @@ fn positive_complete_torrent() { let mut core = Core::new().unwrap(); // Run a core loop until we get the TorrentAdded message - let (good_pieces, recv) = crate::core_loop_with_timeout(&mut core, 500, (0, recv), |good_pieces, recv, msg| match msg { + let (good_pieces, recv) = core_loop_with_timeout(&mut core, 500, (0, recv), |good_pieces, recv, msg| match msg { ODiskMessage::TorrentAdded(_) => Loop::Break((good_pieces, recv)), ODiskMessage::FoundGoodPiece(_, _) => Loop::Continue((good_pieces + 1, recv)), unexpected => panic!("Unexpected Message: {unexpected:?}"), @@ -49,7 +50,7 @@ fn positive_complete_torrent() { files_bytes.extend_from_slice(&data_b.0); // Send piece 0 with a bad last block - crate::send_block( + send_block( &mut blocking_send, &files_bytes[0..500], metainfo_file.info().info_hash(), diff --git a/packages/disk/test/disk_manager_send_backpressure.rs b/packages/disk/tests/disk_manager_send_backpressure.rs similarity index 88% rename from packages/disk/test/disk_manager_send_backpressure.rs rename to packages/disk/tests/disk_manager_send_backpressure.rs index b40aff5d3..03e282aff 100644 --- a/packages/disk/test/disk_manager_send_backpressure.rs +++ b/packages/disk/tests/disk_manager_send_backpressure.rs @@ -1,3 +1,4 @@ +use common::{random_buffer, InMemoryFileSystem, MultiFileDirectAccessor}; use disk::{DiskManagerBuilder, IDiskMessage}; use futures::future::Future; use futures::sink::Sink; @@ -6,14 +7,14 @@ use futures::{future, AsyncSink}; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; use tokio_core::reactor::Core; -use crate::{InMemoryFileSystem, MultiFileDirectAccessor}; +mod common; #[test] fn positive_disk_manager_send_backpressure() { // Create some "files" as random bytes - let data_a = (crate::random_buffer(50), "/path/to/file/a".into()); - let data_b = (crate::random_buffer(2000), "/path/to/file/b".into()); - let data_c = (crate::random_buffer(0), "/path/to/file/c".into()); + let data_a = (random_buffer(50), "/path/to/file/a".into()); + let data_b = (random_buffer(2000), "/path/to/file/b".into()); + let data_c = (random_buffer(0), "/path/to/file/c".into()); // Create our accessor for our in memory files and create a torrent file for them let files_accessor = diff --git a/packages/disk/test/load_block.rs b/packages/disk/tests/load_block.rs similarity index 89% rename from packages/disk/test/load_block.rs rename to packages/disk/tests/load_block.rs index 3f91d0d10..78cfe8eee 100644 --- a/packages/disk/test/load_block.rs +++ b/packages/disk/tests/load_block.rs @@ -1,4 +1,5 @@ use bytes::BytesMut; +use common::{core_loop_with_timeout, random_buffer, InMemoryFileSystem, MultiFileDirectAccessor}; use disk::{Block, BlockMetadata, BlockMut, DiskManagerBuilder, IDiskMessage, ODiskMessage}; use futures::future::Loop; use futures::sink::Sink; @@ -6,13 +7,13 @@ use futures::stream::Stream; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; use tokio_core::reactor::Core; -use crate::{InMemoryFileSystem, MultiFileDirectAccessor}; +mod common; #[test] fn positive_load_block() { // Create some "files" as random bytes - let data_a = (crate::random_buffer(1023), "/path/to/file/a".into()); - let data_b = (crate::random_buffer(2000), "/path/to/file/b".into()); + let data_a = (random_buffer(1023), "/path/to/file/a".into()); + let data_b = (random_buffer(2000), "/path/to/file/b".into()); // Create our accessor for our in memory files and create a torrent file for them let files_accessor = MultiFileDirectAccessor::new("/my/downloads/".into(), vec![data_a.clone(), data_b.clone()]); @@ -43,7 +44,7 @@ fn positive_load_block() { blocking_send.send(IDiskMessage::AddTorrent(metainfo_file)).unwrap(); let mut core = Core::new().unwrap(); - let (pblock, lblock) = crate::core_loop_with_timeout( + let (pblock, lblock) = core_loop_with_timeout( &mut core, 500, ((blocking_send, Some(process_block), Some(load_block)), recv), diff --git a/packages/disk/test/process_block.rs b/packages/disk/tests/process_block.rs similarity index 90% rename from packages/disk/test/process_block.rs rename to packages/disk/tests/process_block.rs index 67db96690..b6d4f2132 100644 --- a/packages/disk/test/process_block.rs +++ b/packages/disk/tests/process_block.rs @@ -1,4 +1,5 @@ use bytes::BytesMut; +use common::{core_loop_with_timeout, random_buffer, InMemoryFileSystem, MultiFileDirectAccessor}; use disk::{Block, BlockMetadata, DiskManagerBuilder, FileSystem, IDiskMessage, ODiskMessage}; use futures::future::Loop; use futures::sink::Sink; @@ -6,13 +7,13 @@ use futures::stream::Stream; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; use tokio_core::reactor::Core; -use crate::{InMemoryFileSystem, MultiFileDirectAccessor}; +mod common; #[test] fn positive_process_block() { // Create some "files" as random bytes - let data_a = (crate::random_buffer(1023), "/path/to/file/a".into()); - let data_b = (crate::random_buffer(2000), "/path/to/file/b".into()); + let data_a = (random_buffer(1023), "/path/to/file/a".into()); + let data_b = (random_buffer(2000), "/path/to/file/b".into()); // Create our accessor for our in memory files and create a torrent file for them let files_accessor = MultiFileDirectAccessor::new("/my/downloads/".into(), vec![data_a.clone(), data_b.clone()]); @@ -39,7 +40,7 @@ fn positive_process_block() { blocking_send.send(IDiskMessage::AddTorrent(metainfo_file)).unwrap(); let mut core = Core::new().unwrap(); - crate::core_loop_with_timeout( + core_loop_with_timeout( &mut core, 500, ((blocking_send, Some(process_block)), recv), diff --git a/packages/disk/test/remove_torrent.rs b/packages/disk/tests/remove_torrent.rs similarity index 86% rename from packages/disk/test/remove_torrent.rs rename to packages/disk/tests/remove_torrent.rs index b225d2c48..877497cf1 100644 --- a/packages/disk/test/remove_torrent.rs +++ b/packages/disk/tests/remove_torrent.rs @@ -1,4 +1,5 @@ use bytes::BytesMut; +use common::{core_loop_with_timeout, random_buffer, InMemoryFileSystem, MultiFileDirectAccessor}; use disk::{Block, BlockMetadata, DiskManagerBuilder, IDiskMessage, ODiskMessage}; use futures::future::Loop; use futures::sink::Sink; @@ -6,14 +7,14 @@ use futures::stream::Stream; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; use tokio_core::reactor::Core; -use crate::{InMemoryFileSystem, MultiFileDirectAccessor}; +mod common; #[test] fn positive_remove_torrent() { // Create some "files" as random bytes - let data_a = (crate::random_buffer(50), "/path/to/file/a".into()); - let data_b = (crate::random_buffer(2000), "/path/to/file/b".into()); - let data_c = (crate::random_buffer(0), "/path/to/file/c".into()); + let data_a = (random_buffer(50), "/path/to/file/a".into()); + let data_b = (random_buffer(2000), "/path/to/file/b".into()); + let data_c = (random_buffer(0), "/path/to/file/c".into()); // Create our accessor for our in memory files and create a torrent file for them let files_accessor = @@ -36,7 +37,7 @@ fn positive_remove_torrent() { // Verify that zero pieces are marked as good let mut core = Core::new().unwrap(); - let (mut blocking_send, good_pieces, recv) = crate::core_loop_with_timeout( + let (mut blocking_send, good_pieces, recv) = core_loop_with_timeout( &mut core, 500, ((blocking_send, 0), recv), diff --git a/packages/disk/test/resume_torrent.rs b/packages/disk/tests/resume_torrent.rs similarity index 95% rename from packages/disk/test/resume_torrent.rs rename to packages/disk/tests/resume_torrent.rs index 1485fa8fe..c724cf714 100644 --- a/packages/disk/test/resume_torrent.rs +++ b/packages/disk/tests/resume_torrent.rs @@ -1,3 +1,4 @@ +use common::{core_loop_with_timeout, random_buffer, send_block, InMemoryFileSystem, MultiFileDirectAccessor}; use disk::{DiskManagerBuilder, IDiskMessage, ODiskMessage}; use futures::future::Loop; use futures::sink::Sink; @@ -5,14 +6,14 @@ use futures::stream::Stream; use metainfo::{Metainfo, MetainfoBuilder, PieceLength}; use tokio_core::reactor::Core; -use crate::{InMemoryFileSystem, MultiFileDirectAccessor}; +mod common; #[allow(clippy::too_many_lines)] #[test] fn positive_complete_torrent() { // Create some "files" as random bytes - let data_a = (crate::random_buffer(1023), "/path/to/file/a".into()); - let data_b = (crate::random_buffer(2000), "/path/to/file/b".into()); + let data_a = (random_buffer(1023), "/path/to/file/a".into()); + let data_b = (random_buffer(2000), "/path/to/file/b".into()); // Create our accessor for our in memory files and create a torrent file for them let files_accessor = MultiFileDirectAccessor::new("/my/downloads/".into(), vec![data_a.clone(), data_b.clone()]); @@ -50,7 +51,7 @@ fn positive_complete_torrent() { files_bytes.extend_from_slice(&data_b.0); // Send piece 0 - crate::send_block( + send_block( &mut blocking_send, &files_bytes[0..500], metainfo_file.info().info_hash(), @@ -59,7 +60,7 @@ fn positive_complete_torrent() { 500, |_| (), ); - crate::send_block( + send_block( &mut blocking_send, &files_bytes[500..1000], metainfo_file.info().info_hash(), @@ -68,7 +69,7 @@ fn positive_complete_torrent() { 500, |_| (), ); - crate::send_block( + send_block( &mut blocking_send, &files_bytes[1000..1024], metainfo_file.info().info_hash(), @@ -79,7 +80,7 @@ fn positive_complete_torrent() { ); // Verify that piece 0 is good - let (recv, piece_zero_good) = crate::core_loop_with_timeout( + let (recv, piece_zero_good) = core_loop_with_timeout( &mut core, 500, ((false, 0), recv), diff --git a/packages/handshake/Cargo.toml b/packages/handshake/Cargo.toml index e202d1dcf..1f6c3ace5 100644 --- a/packages/handshake/Cargo.toml +++ b/packages/handshake/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "handshake" description = "Common handshaking interface as well as a default handshake implementation" -keywords = ["handshake", "bittorrent"] +keywords = ["bittorrent", "handshake"] +name = "handshake" readme = "README.md" authors.workspace = true @@ -17,6 +17,7 @@ version.workspace = true [dependencies] util = { path = "../util" } + bytes = "0.4" futures = "0.1" nom = "3" @@ -24,7 +25,3 @@ rand = "0.8" tokio-core = "0.1" tokio-io = "0.1" tokio-timer = "0.1" - -[[test]] -name = "test" -path = "test/mod.rs" diff --git a/packages/handshake/test/mod.rs b/packages/handshake/test/mod.rs deleted file mode 100644 index 9dfddd013..000000000 --- a/packages/handshake/test/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -mod test_byte_after_handshake; -mod test_bytes_after_handshake; -mod test_connect; -mod test_filter_allow_all; -mod test_filter_block_all; -mod test_filter_whitelist_diff_data; -mod test_filter_whitelist_same_data; - -//----------------------------------------------------------------------------------// - -#[derive(PartialEq, Eq, Debug)] -pub enum TimeoutResult { - TimedOut, - GotResult, -} diff --git a/packages/handshake/tests/common/mod.rs b/packages/handshake/tests/common/mod.rs new file mode 100644 index 000000000..b9b0494d5 --- /dev/null +++ b/packages/handshake/tests/common/mod.rs @@ -0,0 +1,8 @@ +//----------------------------------------------------------------------------------// + +#[allow(dead_code)] +#[derive(PartialEq, Eq, Debug)] +pub enum TimeoutResult { + TimedOut, + GotResult, +} diff --git a/packages/handshake/test/test_byte_after_handshake.rs b/packages/handshake/tests/test_byte_after_handshake.rs similarity index 99% rename from packages/handshake/test/test_byte_after_handshake.rs rename to packages/handshake/tests/test_byte_after_handshake.rs index c399cf171..14e5018e1 100644 --- a/packages/handshake/test/test_byte_after_handshake.rs +++ b/packages/handshake/tests/test_byte_after_handshake.rs @@ -10,6 +10,8 @@ use tokio_core::reactor::Core; use tokio_io::io; use util::bt::{self}; +mod common; + #[test] fn positive_recover_bytes() { let mut core = Core::new().unwrap(); diff --git a/packages/handshake/test/test_bytes_after_handshake.rs b/packages/handshake/tests/test_bytes_after_handshake.rs similarity index 99% rename from packages/handshake/test/test_bytes_after_handshake.rs rename to packages/handshake/tests/test_bytes_after_handshake.rs index 2e4d852f5..75782195e 100644 --- a/packages/handshake/test/test_bytes_after_handshake.rs +++ b/packages/handshake/tests/test_bytes_after_handshake.rs @@ -10,6 +10,8 @@ use tokio_core::reactor::Core; use tokio_io::io; use util::bt::{self}; +mod common; + #[test] fn positive_recover_bytes() { let mut core = Core::new().unwrap(); diff --git a/packages/handshake/test/test_connect.rs b/packages/handshake/tests/test_connect.rs similarity index 99% rename from packages/handshake/test/test_connect.rs rename to packages/handshake/tests/test_connect.rs index a37097955..43173dd41 100644 --- a/packages/handshake/test/test_connect.rs +++ b/packages/handshake/tests/test_connect.rs @@ -6,6 +6,8 @@ use handshake::{DiscoveryInfo, HandshakerBuilder, InitiateMessage, Protocol}; use tokio_core::reactor::Core; use util::bt::{self}; +mod common; + #[test] fn positive_connect() { let mut core = Core::new().unwrap(); diff --git a/packages/handshake/test/test_filter_allow_all.rs b/packages/handshake/tests/test_filter_allow_all.rs similarity index 98% rename from packages/handshake/test/test_filter_allow_all.rs rename to packages/handshake/tests/test_filter_allow_all.rs index 5bb6591dc..db6cbb1a9 100644 --- a/packages/handshake/test/test_filter_allow_all.rs +++ b/packages/handshake/tests/test_filter_allow_all.rs @@ -2,6 +2,7 @@ use std::any::Any; use std::net::SocketAddr; use std::time::Duration; +use common::TimeoutResult; use futures::sink::Sink; use futures::stream::Stream; use futures::Future; @@ -12,7 +13,7 @@ use handshake::{ use tokio_core::reactor::{Core, Timeout}; use util::bt::{self, InfoHash, PeerId}; -use crate::TimeoutResult; +mod common; #[derive(PartialEq, Eq)] pub struct FilterAllowAll; diff --git a/packages/handshake/test/test_filter_block_all.rs b/packages/handshake/tests/test_filter_block_all.rs similarity index 98% rename from packages/handshake/test/test_filter_block_all.rs rename to packages/handshake/tests/test_filter_block_all.rs index 8378b75f5..ba434c473 100644 --- a/packages/handshake/test/test_filter_block_all.rs +++ b/packages/handshake/tests/test_filter_block_all.rs @@ -2,6 +2,7 @@ use std::any::Any; use std::net::SocketAddr; use std::time::Duration; +use common::TimeoutResult; use futures::sink::Sink; use futures::stream::Stream; use futures::Future; @@ -12,7 +13,7 @@ use handshake::{ use tokio_core::reactor::{Core, Timeout}; use util::bt::{self, InfoHash, PeerId}; -use crate::TimeoutResult; +mod common; #[derive(PartialEq, Eq)] pub struct FilterBlockAll; diff --git a/packages/handshake/test/test_filter_whitelist_diff_data.rs b/packages/handshake/tests/test_filter_whitelist_diff_data.rs similarity index 98% rename from packages/handshake/test/test_filter_whitelist_diff_data.rs rename to packages/handshake/tests/test_filter_whitelist_diff_data.rs index 86ec118c9..86a09f449 100644 --- a/packages/handshake/test/test_filter_whitelist_diff_data.rs +++ b/packages/handshake/tests/test_filter_whitelist_diff_data.rs @@ -1,6 +1,7 @@ use std::any::Any; use std::time::Duration; +use common::TimeoutResult; use futures::sink::Sink; use futures::stream::Stream; use futures::Future; @@ -9,7 +10,7 @@ use handshake::{DiscoveryInfo, FilterDecision, HandshakeFilter, HandshakeFilters use tokio_core::reactor::{Core, Timeout}; use util::bt::{self, InfoHash}; -use crate::TimeoutResult; +mod common; #[derive(PartialEq, Eq)] pub struct FilterBlockAllHash; diff --git a/packages/handshake/test/test_filter_whitelist_same_data.rs b/packages/handshake/tests/test_filter_whitelist_same_data.rs similarity index 98% rename from packages/handshake/test/test_filter_whitelist_same_data.rs rename to packages/handshake/tests/test_filter_whitelist_same_data.rs index 092675be8..21064f516 100644 --- a/packages/handshake/test/test_filter_whitelist_same_data.rs +++ b/packages/handshake/tests/test_filter_whitelist_same_data.rs @@ -1,6 +1,7 @@ use std::any::Any; use std::time::Duration; +use common::TimeoutResult; use futures::sink::Sink; use futures::stream::Stream; use futures::Future; @@ -9,7 +10,7 @@ use handshake::{DiscoveryInfo, FilterDecision, HandshakeFilter, HandshakeFilters use tokio_core::reactor::{Core, Timeout}; use util::bt::{self, InfoHash}; -use crate::TimeoutResult; +mod common; #[derive(PartialEq, Eq)] pub struct FilterBlockAllHash; diff --git a/packages/htracker/Cargo.toml b/packages/htracker/Cargo.toml index 186471aa6..b04e63ae5 100644 --- a/packages/htracker/Cargo.toml +++ b/packages/htracker/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "htracker" description = "Communication with bittorrent HTTP trackers" -keywords = ["tracker", "bittorrent", "http"] +keywords = ["bittorrent", "http", "tracker"] +name = "htracker" readme = "README.md" authors.workspace = true diff --git a/packages/lpd/Cargo.toml b/packages/lpd/Cargo.toml index 9a23278eb..c8a17f951 100644 --- a/packages/lpd/Cargo.toml +++ b/packages/lpd/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "lpd" description = "Implementation of the bittorrent Local Peer/Service Discovery mechanism" -keywords = ["peer", "discovery", "local", "service"] +keywords = ["discovery", "local", "peer", "service"] +name = "lpd" readme = "README.md" authors.workspace = true diff --git a/packages/magnet/Cargo.toml b/packages/magnet/Cargo.toml index d3c42c900..2f49cd965 100644 --- a/packages/magnet/Cargo.toml +++ b/packages/magnet/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "magnet" description = "Magnet link parsing and construction" -keywords = ["magnet", "bittorrent"] +keywords = ["bittorrent", "magnet"] +name = "magnet" readme = "README.md" authors.workspace = true @@ -17,5 +17,6 @@ version.workspace = true [dependencies] util = { path = "../util" } -url = "2" + base32 = "0.4" +url = "2" diff --git a/packages/metainfo/Cargo.toml b/packages/metainfo/Cargo.toml index c015e3e74..50978b2e3 100644 --- a/packages/metainfo/Cargo.toml +++ b/packages/metainfo/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "metainfo" description = "Parsing and building of bittorrent metainfo files" -keywords = ["metainfo", "torrent", "file", "bittorrent"] +keywords = ["bittorrent", "file", "metainfo", "torrent"] +name = "metainfo" readme = "README.md" authors.workspace = true @@ -18,16 +18,17 @@ version.workspace = true [dependencies] bencode = { path = "../bencode" } util = { path = "../util" } + crossbeam = "0.8" -walkdir = "2" error-chain = "0.12" +walkdir = "2" [dev-dependencies] chrono = "0.4" -rand = "0.8" -pbr = "1" criterion = "0.5" +pbr = "1" +rand = "0.8" [[bench]] +harness = false name = "metainfo_benchmark" -harness = false \ No newline at end of file diff --git a/packages/peer/Cargo.toml b/packages/peer/Cargo.toml index 05f2cbbff..dfcd04a9a 100644 --- a/packages/peer/Cargo.toml +++ b/packages/peer/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "peer" description = "Communication with bittorrent peers via peer wire protocol" -keywords = ["peer", "wire", "protocol", "pwp", "bittorrent"] +keywords = ["bittorrent", "peer", "protocol", "pwp", "wire"] +name = "peer" readme = "README.md" authors.workspace = true @@ -15,22 +15,17 @@ publish.workspace = true repository.workspace = true version.workspace = true - [dependencies] bencode = { path = "../bencode" } handshake = { path = "../handshake" } util = { path = "../util" } -bytes = "0.4" byteorder = "1" +bytes = "0.4" crossbeam = "0.8" error-chain = "0.12" futures = "0.1" +nom = "3" tokio-core = "0.1" tokio-io = "0.1" tokio-timer = "0.1" -nom = "3" - -[[test]] -name = "test" -path = "test/mod.rs" diff --git a/packages/peer/test/mod.rs b/packages/peer/tests/common/mod.rs similarity index 97% rename from packages/peer/test/mod.rs rename to packages/peer/tests/common/mod.rs index fcbaff09d..42af63826 100644 --- a/packages/peer/test/mod.rs +++ b/packages/peer/tests/common/mod.rs @@ -5,8 +5,6 @@ use futures::stream::Stream; use futures::sync::mpsc::{self, Receiver, Sender}; use futures::{Poll, StartSend}; -mod peer_manager_send_backpressure; - pub struct ConnectedChannel { send: Sender, recv: Receiver, diff --git a/packages/peer/test/peer_manager_send_backpressure.rs b/packages/peer/tests/peer_manager_send_backpressure.rs similarity index 96% rename from packages/peer/test/peer_manager_send_backpressure.rs rename to packages/peer/tests/peer_manager_send_backpressure.rs index 5f33ac2b9..dea17e95a 100644 --- a/packages/peer/test/peer_manager_send_backpressure.rs +++ b/packages/peer/tests/peer_manager_send_backpressure.rs @@ -1,3 +1,4 @@ +use common::{connected_channel, ConnectedChannel}; use futures::sink::Sink; use futures::stream::Stream; use futures::{future, AsyncSink, Future}; @@ -8,7 +9,7 @@ use peer::{IPeerManagerMessage, OPeerManagerMessage, PeerInfo, PeerManagerBuilde use tokio_core::reactor::Core; use util::bt; -use crate::ConnectedChannel; +mod common; #[test] fn positive_peer_manager_send_backpressure() { @@ -18,7 +19,7 @@ fn positive_peer_manager_send_backpressure() { let manager = PeerManagerBuilder::new().with_peer_capacity(1).build(core.handle()); // Create two peers - let (peer_one, peer_two): (Peer, Peer) = crate::connected_channel(5); + let (peer_one, peer_two): (Peer, Peer) = connected_channel(5); let peer_one_info = PeerInfo::new( "127.0.0.1:0".parse().unwrap(), [0u8; bt::PEER_ID_LEN].into(), diff --git a/packages/select/Cargo.toml b/packages/select/Cargo.toml index 3b65b1100..142fd54b0 100644 --- a/packages/select/Cargo.toml +++ b/packages/select/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "select" description = "Bittorrent Infrastructure Project Piece Selection Module" keywords = ["piece", "selection"] +name = "select" readme = "README.md" authors.workspace = true @@ -17,17 +17,17 @@ version.workspace = true [dependencies] handshake = { path = "../handshake" } -util = { path = "../util" } -peer = { path = "../peer" } metainfo = { path = "../metainfo" } +peer = { path = "../peer" } +util = { path = "../util" } utracker = { path = "../utracker" } bit-set = "0.5" bytes = "0.4" error-chain = "0.12" futures = "0.1" -rand = "0.8" log = "0.4" +rand = "0.8" [dev-dependencies] futures-test = { git = "https://github.com/carllerche/better-future.git" } diff --git a/packages/select/src/revelation/honest.rs b/packages/select/src/revelation/honest.rs index ffa6dfb96..f1041d6f0 100644 --- a/packages/select/src/revelation/honest.rs +++ b/packages/select/src/revelation/honest.rs @@ -222,183 +222,3 @@ impl Stream for HonestRevealModule { }) } } - -#[cfg(test)] -mod tests { - use futures::{Async, Sink, Stream}; - use futures_test::harness::Harness; - use handshake::Extensions; - use metainfo::{DirectAccessor, Metainfo, MetainfoBuilder, PieceLength}; - use peer::PeerInfo; - use util::bt; - use util::bt::InfoHash; - - use super::HonestRevealModule; - use crate::revelation::error::RevealErrorKind; - use crate::revelation::{IRevealMessage, ORevealMessage}; - use crate::ControlMessage; - - fn metainfo(num_pieces: usize) -> Metainfo { - let data = vec![0u8; num_pieces]; - - let accessor = DirectAccessor::new("MyFile.txt", &data); - let bytes = MetainfoBuilder::new() - .set_piece_length(PieceLength::Custom(1)) - .build(1, accessor, |_| ()) - .unwrap(); - - Metainfo::from_bytes(bytes).unwrap() - } - - fn peer_info(hash: InfoHash) -> PeerInfo { - PeerInfo::new( - "0.0.0.0:0".parse().unwrap(), - [0u8; bt::PEER_ID_LEN].into(), - hash, - Extensions::new(), - ) - } - - #[test] - fn positive_add_and_remove_metainfo() { - let (send, _recv) = HonestRevealModule::new().split(); - let metainfo = metainfo(1); - - let mut block_send = send.wait(); - - block_send - .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo.clone()))) - .unwrap(); - block_send - .send(IRevealMessage::Control(ControlMessage::RemoveTorrent(metainfo.clone()))) - .unwrap(); - } - - #[test] - fn positive_send_bitfield_single_piece() { - let (send, recv) = HonestRevealModule::new().split(); - let metainfo = metainfo(8); - let info_hash = metainfo.info().info_hash(); - let peer_info = peer_info(info_hash); - - let mut block_send = send.wait(); - let mut block_recv = recv.wait(); - - block_send - .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) - .unwrap(); - block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 0)).unwrap(); - block_send - .send(IRevealMessage::Control(ControlMessage::PeerConnected(peer_info))) - .unwrap(); - - let ORevealMessage::SendBitField(info, bitfield) = block_recv.next().unwrap().unwrap() else { - panic!("Received Unexpected Message") - }; - - assert_eq!(peer_info, info); - assert_eq!(1, bitfield.bitfield().len()); - assert_eq!(0x80, bitfield.bitfield()[0]); - } - - #[test] - fn positive_send_bitfield_multiple_pieces() { - let (send, recv) = HonestRevealModule::new().split(); - let metainfo = metainfo(16); - let info_hash = metainfo.info().info_hash(); - let peer_info = peer_info(info_hash); - - let mut block_send = send.wait(); - let mut block_recv = recv.wait(); - - block_send - .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) - .unwrap(); - block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 0)).unwrap(); - block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 8)).unwrap(); - block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 15)).unwrap(); - block_send - .send(IRevealMessage::Control(ControlMessage::PeerConnected(peer_info))) - .unwrap(); - - let ORevealMessage::SendBitField(info, bitfield) = block_recv.next().unwrap().unwrap() else { - panic!("Received Unexpected Message") - }; - - assert_eq!(peer_info, info); - assert_eq!(2, bitfield.bitfield().len()); - assert_eq!(0x80, bitfield.bitfield()[0]); - assert_eq!(0x81, bitfield.bitfield()[1]); - } - - #[test] - fn negative_do_not_send_empty_bitfield() { - let (send, recv) = HonestRevealModule::new().split(); - let metainfo = metainfo(16); - let info_hash = metainfo.info().info_hash(); - let peer_info = peer_info(info_hash); - - let mut block_send = send.wait(); - let mut non_block_recv = Harness::new(recv); - - block_send - .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) - .unwrap(); - block_send - .send(IRevealMessage::Control(ControlMessage::PeerConnected(peer_info))) - .unwrap(); - - assert!(non_block_recv.poll_next().as_ref().map(Async::is_not_ready).unwrap_or(false)); - } - - #[test] - fn negative_found_good_piece_out_of_range() { - let (send, _recv) = HonestRevealModule::new().split(); - let metainfo = metainfo(8); - let info_hash = metainfo.info().info_hash(); - - let mut block_send = send.wait(); - - block_send - .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) - .unwrap(); - - let error = block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 8)).unwrap_err(); - match error.kind() { - &RevealErrorKind::InvalidPieceOutOfRange { hash, index } => { - assert_eq!(info_hash, hash); - assert_eq!(8, index); - } - _ => { - panic!("Received Unexpected Message") - } - }; - } - - #[test] - fn negative_all_pieces_good_found_good_piece_out_of_range() { - let (send, _recv) = HonestRevealModule::new().split(); - let metainfo = metainfo(3); - let info_hash = metainfo.info().info_hash(); - - let mut block_send = send.wait(); - - block_send - .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) - .unwrap(); - block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 0)).unwrap(); - block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 1)).unwrap(); - block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 2)).unwrap(); - - let error = block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 3)).unwrap_err(); - match error.kind() { - &RevealErrorKind::InvalidPieceOutOfRange { hash, index } => { - assert_eq!(info_hash, hash); - assert_eq!(3, index); - } - _ => { - panic!("Received Unexpected Message") - } - }; - } -} diff --git a/packages/select/tests/select_tests.rs b/packages/select/tests/select_tests.rs new file mode 100644 index 000000000..f03bfb90e --- /dev/null +++ b/packages/select/tests/select_tests.rs @@ -0,0 +1,174 @@ +use futures::{Async, Sink, Stream}; +use futures_test::harness::Harness; +use handshake::Extensions; +use metainfo::{DirectAccessor, Metainfo, MetainfoBuilder, PieceLength}; +use peer::PeerInfo; +use select::revelation::error::RevealErrorKind; +use select::revelation::{HonestRevealModule, IRevealMessage, ORevealMessage}; +use select::ControlMessage; +use util::bt; +use util::bt::InfoHash; + +fn metainfo(num_pieces: usize) -> Metainfo { + let data = vec![0u8; num_pieces]; + + let accessor = DirectAccessor::new("MyFile.txt", &data); + let bytes = MetainfoBuilder::new() + .set_piece_length(PieceLength::Custom(1)) + .build(1, accessor, |_| ()) + .unwrap(); + + Metainfo::from_bytes(bytes).unwrap() +} + +fn peer_info(hash: InfoHash) -> PeerInfo { + PeerInfo::new( + "0.0.0.0:0".parse().unwrap(), + [0u8; bt::PEER_ID_LEN].into(), + hash, + Extensions::new(), + ) +} + +#[test] +fn positive_add_and_remove_metainfo() { + let (send, _recv) = HonestRevealModule::new().split(); + let metainfo = metainfo(1); + + let mut block_send = send.wait(); + + block_send + .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo.clone()))) + .unwrap(); + block_send + .send(IRevealMessage::Control(ControlMessage::RemoveTorrent(metainfo.clone()))) + .unwrap(); +} + +#[test] +fn positive_send_bitfield_single_piece() { + let (send, recv) = HonestRevealModule::new().split(); + let metainfo = metainfo(8); + let info_hash = metainfo.info().info_hash(); + let peer_info = peer_info(info_hash); + + let mut block_send = send.wait(); + let mut block_recv = recv.wait(); + + block_send + .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) + .unwrap(); + block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 0)).unwrap(); + block_send + .send(IRevealMessage::Control(ControlMessage::PeerConnected(peer_info))) + .unwrap(); + + let ORevealMessage::SendBitField(info, bitfield) = block_recv.next().unwrap().unwrap() else { + panic!("Received Unexpected Message") + }; + + assert_eq!(peer_info, info); + assert_eq!(1, bitfield.bitfield().len()); + assert_eq!(0x80, bitfield.bitfield()[0]); +} + +#[test] +fn positive_send_bitfield_multiple_pieces() { + let (send, recv) = HonestRevealModule::new().split(); + let metainfo = metainfo(16); + let info_hash = metainfo.info().info_hash(); + let peer_info = peer_info(info_hash); + + let mut block_send = send.wait(); + let mut block_recv = recv.wait(); + + block_send + .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) + .unwrap(); + block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 0)).unwrap(); + block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 8)).unwrap(); + block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 15)).unwrap(); + block_send + .send(IRevealMessage::Control(ControlMessage::PeerConnected(peer_info))) + .unwrap(); + + let ORevealMessage::SendBitField(info, bitfield) = block_recv.next().unwrap().unwrap() else { + panic!("Received Unexpected Message") + }; + + assert_eq!(peer_info, info); + assert_eq!(2, bitfield.bitfield().len()); + assert_eq!(0x80, bitfield.bitfield()[0]); + assert_eq!(0x81, bitfield.bitfield()[1]); +} + +#[test] +fn negative_do_not_send_empty_bitfield() { + let (send, recv) = HonestRevealModule::new().split(); + let metainfo = metainfo(16); + let info_hash = metainfo.info().info_hash(); + let peer_info = peer_info(info_hash); + + let mut block_send = send.wait(); + let mut non_block_recv = Harness::new(recv); + + block_send + .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) + .unwrap(); + block_send + .send(IRevealMessage::Control(ControlMessage::PeerConnected(peer_info))) + .unwrap(); + + assert!(non_block_recv.poll_next().as_ref().map(Async::is_not_ready).unwrap_or(false)); +} + +#[test] +fn negative_found_good_piece_out_of_range() { + let (send, _recv) = HonestRevealModule::new().split(); + let metainfo = metainfo(8); + let info_hash = metainfo.info().info_hash(); + + let mut block_send = send.wait(); + + block_send + .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) + .unwrap(); + + let error = block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 8)).unwrap_err(); + match error.kind() { + &RevealErrorKind::InvalidPieceOutOfRange { hash, index } => { + assert_eq!(info_hash, hash); + assert_eq!(8, index); + } + _ => { + panic!("Received Unexpected Message") + } + }; +} + +#[test] +fn negative_all_pieces_good_found_good_piece_out_of_range() { + let (send, _recv) = HonestRevealModule::new().split(); + let metainfo = metainfo(3); + let info_hash = metainfo.info().info_hash(); + + let mut block_send = send.wait(); + + block_send + .send(IRevealMessage::Control(ControlMessage::AddTorrent(metainfo))) + .unwrap(); + block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 0)).unwrap(); + block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 1)).unwrap(); + block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 2)).unwrap(); + + let error = block_send.send(IRevealMessage::FoundGoodPiece(info_hash, 3)).unwrap_err(); + match error.kind() { + &RevealErrorKind::InvalidPieceOutOfRange { hash, index } => { + assert_eq!(info_hash, hash); + assert_eq!(3, index); + } + _ => { + panic!("Received Unexpected Message") + } + }; +} diff --git a/packages/util/Cargo.toml b/packages/util/Cargo.toml index 6fb865eb8..0daedc47a 100644 --- a/packages/util/Cargo.toml +++ b/packages/util/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "util" description = "Utilities for the Bittorrent Infrastructure Project" keywords = ["utility"] +name = "util" readme = "README.md" authors.workspace = true @@ -15,7 +15,6 @@ publish.workspace = true repository.workspace = true version.workspace = true - [dependencies] chrono = "0.4" num = "0.4" diff --git a/packages/utp/Cargo.toml b/packages/utp/Cargo.toml index 20ea2fce5..9235233d3 100644 --- a/packages/utp/Cargo.toml +++ b/packages/utp/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "utp" description = "uTorrent Transport Protocol" -keywords = ["utp", "bittorrent", "transport"] +keywords = ["bittorrent", "transport", "utp"] +name = "utp" readme = "README.md" authors.workspace = true @@ -13,4 +13,4 @@ license.workspace = true publish.workspace = true repository.workspace = true -version.workspace = true \ No newline at end of file +version.workspace = true diff --git a/packages/utracker/Cargo.toml b/packages/utracker/Cargo.toml index f85b2a376..0d6a78d72 100644 --- a/packages/utracker/Cargo.toml +++ b/packages/utracker/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "utracker" description = "Communication with bittorrent UDP trackers" -keywords = ["tracker", "bittorrent", "udp"] +keywords = ["bittorrent", "tracker", "udp"] +name = "utracker" readme = "README.md" authors.workspace = true @@ -25,7 +25,3 @@ futures = "0.1" nom = "3" rand = "0.8" umio = "0.3" - -[[test]] -name = "test" -path = "test/mod.rs" diff --git a/packages/utracker/test/mod.rs b/packages/utracker/tests/common/mod.rs similarity index 94% rename from packages/utracker/test/mod.rs rename to packages/utracker/tests/common/mod.rs index 02a723ece..7c7995275 100644 --- a/packages/utracker/test/mod.rs +++ b/packages/utracker/tests/common/mod.rs @@ -15,19 +15,10 @@ use utracker::contact::{CompactPeers, CompactPeersV4, CompactPeersV6}; use utracker::scrape::{ScrapeRequest, ScrapeResponse, ScrapeStats}; use utracker::{ClientMetadata, ServerHandler, ServerResult}; -mod test_announce_start; -mod test_announce_stop; -mod test_client_drop; -mod test_client_full; -mod test_connect; -mod test_connect_cache; -mod test_scrape; -mod test_server_drop; - const NUM_PEERS_RETURNED: usize = 20; #[derive(Clone)] -struct MockTrackerHandler { +pub struct MockTrackerHandler { inner: Arc>, } @@ -38,6 +29,7 @@ struct InnerMockTrackerHandler { } impl MockTrackerHandler { + #[allow(dead_code)] pub fn new() -> MockTrackerHandler { MockTrackerHandler { inner: Arc::new(Mutex::new(InnerMockTrackerHandler { @@ -48,6 +40,7 @@ impl MockTrackerHandler { } } + #[allow(dead_code)] pub fn num_active_connect_ids(&self) -> usize { self.inner.lock().unwrap().cids.len() } @@ -158,18 +151,19 @@ impl ServerHandler for MockTrackerHandler { //----------------------------------------------------------------------------// -fn handshaker() -> (MockHandshakerSink, MockHandshakerStream) { +#[allow(dead_code)] +pub fn handshaker() -> (MockHandshakerSink, MockHandshakerStream) { let (send, recv) = mpsc::unbounded(); (MockHandshakerSink { send }, MockHandshakerStream { recv }) } #[derive(Clone)] -struct MockHandshakerSink { +pub struct MockHandshakerSink { send: UnboundedSender>, } -struct MockHandshakerStream { +pub struct MockHandshakerStream { recv: UnboundedReceiver>, } diff --git a/packages/utracker/test/test_announce_start.rs b/packages/utracker/tests/test_announce_start.rs similarity index 96% rename from packages/utracker/test/test_announce_start.rs rename to packages/utracker/tests/test_announce_start.rs index 7ba11cbed..33c337c15 100644 --- a/packages/utracker/test/test_announce_start.rs +++ b/packages/utracker/tests/test_announce_start.rs @@ -2,6 +2,7 @@ use std::net::SocketAddr; use std::thread::{self}; use std::time::Duration; +use common::{handshaker, MockTrackerHandler}; use futures::future::Either; use futures::stream::Stream; use handshake::Protocol; @@ -9,7 +10,7 @@ use util::bt::{self}; use utracker::announce::{AnnounceEvent, ClientState}; use utracker::{ClientRequest, TrackerClient, TrackerServer}; -use crate::{handshaker, MockTrackerHandler}; +mod common; #[test] #[allow(unused)] diff --git a/packages/utracker/test/test_announce_stop.rs b/packages/utracker/tests/test_announce_stop.rs similarity index 97% rename from packages/utracker/test/test_announce_stop.rs rename to packages/utracker/tests/test_announce_stop.rs index 8bc6a34aa..f9ade00ef 100644 --- a/packages/utracker/test/test_announce_stop.rs +++ b/packages/utracker/tests/test_announce_stop.rs @@ -1,13 +1,14 @@ use std::thread::{self}; use std::time::Duration; +use common::{handshaker, MockTrackerHandler}; use futures::future::Either; use futures::stream::Stream; use util::bt::{self}; use utracker::announce::{AnnounceEvent, ClientState}; use utracker::{ClientRequest, TrackerClient, TrackerServer}; -use crate::{handshaker, MockTrackerHandler}; +mod common; #[test] #[allow(unused)] diff --git a/packages/utracker/test/test_client_drop.rs b/packages/utracker/tests/test_client_drop.rs similarity index 97% rename from packages/utracker/test/test_client_drop.rs rename to packages/utracker/tests/test_client_drop.rs index fc8760c9b..8f4c94985 100644 --- a/packages/utracker/test/test_client_drop.rs +++ b/packages/utracker/tests/test_client_drop.rs @@ -1,10 +1,11 @@ +use common::handshaker; use futures::future::Either; use futures::stream::Stream; use util::bt::{self}; use utracker::announce::{AnnounceEvent, ClientState}; use utracker::{ClientError, ClientRequest, TrackerClient}; -use crate::handshaker; +mod common; #[test] #[allow(unused)] diff --git a/packages/utracker/test/test_client_full.rs b/packages/utracker/tests/test_client_full.rs similarity index 97% rename from packages/utracker/test/test_client_full.rs rename to packages/utracker/tests/test_client_full.rs index f1fe81ee6..79d6c85fd 100644 --- a/packages/utracker/test/test_client_full.rs +++ b/packages/utracker/tests/test_client_full.rs @@ -1,12 +1,13 @@ use std::mem; +use common::handshaker; use futures::stream::Stream; use futures::Future; use util::bt::{self}; use utracker::announce::{AnnounceEvent, ClientState}; use utracker::{ClientRequest, TrackerClient}; -use crate::handshaker; +mod common; #[test] #[allow(unused)] diff --git a/packages/utracker/test/test_connect.rs b/packages/utracker/tests/test_connect.rs similarity index 95% rename from packages/utracker/test/test_connect.rs rename to packages/utracker/tests/test_connect.rs index a24688534..4d0833c09 100644 --- a/packages/utracker/test/test_connect.rs +++ b/packages/utracker/tests/test_connect.rs @@ -1,13 +1,14 @@ use std::thread::{self}; use std::time::Duration; +use common::{handshaker, MockTrackerHandler}; use futures::future::Either; use futures::stream::Stream; use util::bt::{self}; use utracker::announce::{AnnounceEvent, ClientState}; use utracker::{ClientRequest, TrackerClient, TrackerServer}; -use crate::{handshaker, MockTrackerHandler}; +mod common; #[test] #[allow(unused)] diff --git a/packages/utracker/test/test_connect_cache.rs b/packages/utracker/tests/test_connect_cache.rs similarity index 95% rename from packages/utracker/test/test_connect_cache.rs rename to packages/utracker/tests/test_connect_cache.rs index c6fd541ac..d2ec46864 100644 --- a/packages/utracker/test/test_connect_cache.rs +++ b/packages/utracker/tests/test_connect_cache.rs @@ -1,11 +1,12 @@ use std::thread::{self}; use std::time::Duration; +use common::{handshaker, MockTrackerHandler}; use futures::stream::Stream; use util::bt::{self}; use utracker::{ClientRequest, TrackerClient, TrackerServer}; -use crate::{handshaker, MockTrackerHandler}; +mod common; #[test] #[allow(unused)] diff --git a/packages/utracker/test/test_scrape.rs b/packages/utracker/tests/test_scrape.rs similarity index 95% rename from packages/utracker/test/test_scrape.rs rename to packages/utracker/tests/test_scrape.rs index 3d1d8aa2e..af245f466 100644 --- a/packages/utracker/test/test_scrape.rs +++ b/packages/utracker/tests/test_scrape.rs @@ -1,12 +1,13 @@ use std::thread::{self}; use std::time::Duration; +use common::{handshaker, MockTrackerHandler}; use futures::future::Either; use futures::stream::Stream; use util::bt::{self}; use utracker::{ClientRequest, TrackerClient, TrackerServer}; -use crate::{handshaker, MockTrackerHandler}; +mod common; #[test] #[allow(unused)] diff --git a/packages/utracker/test/test_server_drop.rs b/packages/utracker/tests/test_server_drop.rs similarity index 95% rename from packages/utracker/test/test_server_drop.rs rename to packages/utracker/tests/test_server_drop.rs index 84e85f879..1e4ee51a5 100644 --- a/packages/utracker/test/test_server_drop.rs +++ b/packages/utracker/tests/test_server_drop.rs @@ -1,10 +1,11 @@ use std::net::UdpSocket; use std::time::Duration; +use common::MockTrackerHandler; use utracker::request::{self, RequestType, TrackerRequest}; use utracker::TrackerServer; -use crate::MockTrackerHandler; +mod common; #[test] #[allow(unused)] From 863536ea0e1fc590b93a1e05183683f98a5eb56e Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Sun, 11 Aug 2024 13:25:55 +0200 Subject: [PATCH 3/6] refactor: handshake sink, stream and builder --- packages/handshake/src/handshake/builder.rs | 109 ++++++ .../handshake/src/handshake/handshaker.rs | 347 ------------------ packages/handshake/src/handshake/mod.rs | 163 +++++++- packages/handshake/src/handshake/sink.rs | 72 ++++ packages/handshake/src/handshake/stream.rs | 26 ++ packages/handshake/src/lib.rs | 5 +- 6 files changed, 373 insertions(+), 349 deletions(-) create mode 100644 packages/handshake/src/handshake/builder.rs delete mode 100644 packages/handshake/src/handshake/handshaker.rs create mode 100644 packages/handshake/src/handshake/sink.rs create mode 100644 packages/handshake/src/handshake/stream.rs diff --git a/packages/handshake/src/handshake/builder.rs b/packages/handshake/src/handshake/builder.rs new file mode 100644 index 000000000..a0a9e05e7 --- /dev/null +++ b/packages/handshake/src/handshake/builder.rs @@ -0,0 +1,109 @@ +//! Build configuration for `Handshaker` object creation. + +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + +use rand::Rng as _; +use tokio_core::reactor::Handle; +use util::bt::PeerId; +use util::convert; + +use super::Handshaker; +use crate::handshake::config::HandshakerConfig; +use crate::message::extensions::Extensions; +use crate::transport::Transport; + +#[allow(clippy::module_name_repetitions)] +#[derive(Copy, Clone)] +pub struct HandshakerBuilder { + pub(super) bind: SocketAddr, + pub(super) port: u16, + pub(super) pid: PeerId, + pub(super) ext: Extensions, + pub(super) config: HandshakerConfig, +} + +impl Default for HandshakerBuilder { + fn default() -> Self { + let default_v4_addr = Ipv4Addr::new(0, 0, 0, 0); + let default_v4_port = 0; + + let bind = SocketAddr::V4(SocketAddrV4::new(default_v4_addr, default_v4_port)); + + let seed = rand::thread_rng().gen(); + let pid = PeerId::from_bytes(&convert::four_bytes_to_array(seed)); + + Self { + bind, + port: Default::default(), + pid, + ext: Extensions::default(), + config: HandshakerConfig::default(), + } + } +} + +impl HandshakerBuilder { + /// Create a new `HandshakerBuilder`. + #[must_use] + pub fn new() -> HandshakerBuilder { + Self::default() + } + + /// Address that the host will listen on. + /// + /// Defaults to `IN_ADDR_ANY` using port 0 (any free port). + pub fn with_bind_addr(&mut self, addr: SocketAddr) -> &mut HandshakerBuilder { + self.bind = addr; + + self + } + + /// Port that external peers should connect on. + /// + /// Defaults to the port that is being listened on (will only work if the + /// host is not natted). + pub fn with_open_port(&mut self, port: u16) -> &mut HandshakerBuilder { + self.port = port; + + self + } + + /// Peer id that will be advertised when handshaking with other peers. + /// + /// Defaults to a random SHA-1 hash; official clients should use an encoding scheme. + /// + /// See [BEP 0020](http://www.bittorrent.org/beps/bep_0020.html). + pub fn with_peer_id(&mut self, peer_id: PeerId) -> &mut HandshakerBuilder { + self.pid = peer_id; + + self + } + + /// Extensions supported by our client, advertised to the peer when handshaking. + pub fn with_extensions(&mut self, ext: Extensions) -> &mut HandshakerBuilder { + self.ext = ext; + + self + } + + /// Configuration that will be used to alter the internal behavior of handshaking. + /// + /// This will typically not need to be set unless you know what you are doing. + pub fn with_config(&mut self, config: HandshakerConfig) -> &mut HandshakerBuilder { + self.config = config; + + self + } + + /// Build a `Handshaker` over the given `Transport` with a `Remote` instance. + /// + /// # Errors + /// + /// Returns a IO error if unable to build. + pub fn build(&self, transport: T, handle: &Handle) -> std::io::Result> + where + T: Transport + 'static, + { + Handshaker::with_builder(self, transport, handle) + } +} diff --git a/packages/handshake/src/handshake/handshaker.rs b/packages/handshake/src/handshake/handshaker.rs deleted file mode 100644 index 69e83d6bc..000000000 --- a/packages/handshake/src/handshake/handshaker.rs +++ /dev/null @@ -1,347 +0,0 @@ -use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; -use std::time::Duration; -use std::{cmp, io}; - -use futures::sink::Sink; -use futures::stream::Stream; -use futures::sync::mpsc::{self, Receiver, SendError, Sender}; -use futures::{Poll, StartSend}; -use rand::{self, Rng}; -use tokio_core::reactor::Handle; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_timer::{self}; -use util::bt::PeerId; -use util::convert; - -use crate::discovery::DiscoveryInfo; -use crate::filter::filters::Filters; -use crate::filter::{HandshakeFilter, HandshakeFilters}; -use crate::handshake::config::HandshakerConfig; -use crate::handshake::handler; -use crate::handshake::handler::listener::ListenerHandler; -use crate::handshake::handler::timer::HandshakeTimer; -use crate::handshake::handler::{handshaker, initiator}; -use crate::local_addr::LocalAddr; -use crate::message::complete::CompleteMessage; -use crate::message::extensions::Extensions; -use crate::message::initiate::InitiateMessage; -use crate::transport::Transport; - -/// Build configuration for `Handshaker` object creation. -#[allow(clippy::module_name_repetitions)] -#[derive(Copy, Clone)] -pub struct HandshakerBuilder { - bind: SocketAddr, - port: u16, - pid: PeerId, - ext: Extensions, - config: HandshakerConfig, -} - -impl Default for HandshakerBuilder { - fn default() -> Self { - let default_v4_addr = Ipv4Addr::new(0, 0, 0, 0); - let default_v4_port = 0; - - let bind = SocketAddr::V4(SocketAddrV4::new(default_v4_addr, default_v4_port)); - - let seed = rand::thread_rng().gen(); - let pid = PeerId::from_bytes(&convert::four_bytes_to_array(seed)); - - Self { - bind, - port: Default::default(), - pid, - ext: Extensions::default(), - config: HandshakerConfig::default(), - } - } -} - -impl HandshakerBuilder { - /// Create a new `HandshakerBuilder`. - #[must_use] - pub fn new() -> HandshakerBuilder { - Self::default() - } - - /// Address that the host will listen on. - /// - /// Defaults to `IN_ADDR_ANY` using port 0 (any free port). - pub fn with_bind_addr(&mut self, addr: SocketAddr) -> &mut HandshakerBuilder { - self.bind = addr; - - self - } - - /// Port that external peers should connect on. - /// - /// Defaults to the port that is being listened on (will only work if the - /// host is not natted). - pub fn with_open_port(&mut self, port: u16) -> &mut HandshakerBuilder { - self.port = port; - - self - } - - /// Peer id that will be advertised when handshaking with other peers. - /// - /// Defaults to a random SHA-1 hash; official clients should use an encoding scheme. - /// - /// See [BEP 0020](http://www.bittorrent.org/beps/bep_0020.html). - pub fn with_peer_id(&mut self, peer_id: PeerId) -> &mut HandshakerBuilder { - self.pid = peer_id; - - self - } - - /// Extensions supported by our client, advertised to the peer when handshaking. - pub fn with_extensions(&mut self, ext: Extensions) -> &mut HandshakerBuilder { - self.ext = ext; - - self - } - - /// Configuration that will be used to alter the internal behavior of handshaking. - /// - /// This will typically not need to be set unless you know what you are doing. - pub fn with_config(&mut self, config: HandshakerConfig) -> &mut HandshakerBuilder { - self.config = config; - - self - } - - /// Build a `Handshaker` over the given `Transport` with a `Remote` instance. - /// - /// # Errors - /// - /// Returns a IO error if unable to build. - pub fn build(&self, transport: T, handle: &Handle) -> io::Result> - where - T: Transport + 'static, - { - Handshaker::with_builder(self, transport, handle) - } -} - -//----------------------------------------------------------------------------------// - -/// Handshaker which is both `Stream` and `Sink`. -pub struct Handshaker { - sink: HandshakerSink, - stream: HandshakerStream, -} - -impl Handshaker { - /// Splits the `Handshaker` into its parts. - /// - /// This is an enhanced version of `Stream::split` in that the returned `Sink` implements - /// `DiscoveryInfo` so it can be cloned and passed in to different peer discovery services. - #[must_use] - pub fn into_parts(self) -> (HandshakerSink, HandshakerStream) { - (self.sink, self.stream) - } -} - -impl DiscoveryInfo for Handshaker { - fn port(&self) -> u16 { - self.sink.port() - } - - fn peer_id(&self) -> PeerId { - self.sink.peer_id() - } -} - -impl Handshaker -where - S: AsyncRead + AsyncWrite + 'static, -{ - fn with_builder(builder: &HandshakerBuilder, transport: T, handle: &Handle) -> io::Result> - where - T: Transport + 'static, - { - let listener = transport.listen(&builder.bind, handle)?; - - // Resolve our "real" public port - let open_port = if builder.port == 0 { - listener.local_addr()?.port() - } else { - builder.port - }; - - let config = builder.config; - let (addr_send, addr_recv) = mpsc::channel(config.sink_buffer_size()); - let (hand_send, hand_recv) = mpsc::channel(config.wait_buffer_size()); - let (sock_send, sock_recv) = mpsc::channel(config.done_buffer_size()); - - let filters = Filters::new(); - let (handshake_timer, initiate_timer) = configured_handshake_timers(config.handshake_timeout(), config.connect_timeout()); - - // Hook up our pipeline of handlers which will take some connection info, process it, and forward it - handler::loop_handler( - addr_recv, - initiator::initiator_handler, - hand_send.clone(), - (transport, filters.clone(), handle.clone(), initiate_timer), - handle, - ); - handler::loop_handler(listener, ListenerHandler::new, hand_send, filters.clone(), handle); - handler::loop_handler( - hand_recv.map(Result::Ok).buffer_unordered(100), - handshaker::execute_handshake, - sock_send, - (builder.ext, builder.pid, filters.clone(), handshake_timer), - handle, - ); - - let sink = HandshakerSink::new(addr_send, open_port, builder.pid, filters); - let stream = HandshakerStream::new(sock_recv); - - Ok(Handshaker { sink, stream }) - } -} - -/// Configure a timer wheel and create a `HandshakeTimer`. -fn configured_handshake_timers(duration_one: Duration, duration_two: Duration) -> (HandshakeTimer, HandshakeTimer) { - let timer = tokio_timer::wheel() - .num_slots(64) - .max_timeout(cmp::max(duration_one, duration_two)) - .build(); - - ( - HandshakeTimer::new(timer.clone(), duration_one), - HandshakeTimer::new(timer, duration_two), - ) -} - -impl Sink for Handshaker { - type SinkItem = InitiateMessage; - type SinkError = SendError; - - fn start_send(&mut self, item: InitiateMessage) -> StartSend> { - self.sink.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), SendError> { - self.sink.poll_complete() - } -} - -impl Stream for Handshaker { - type Item = CompleteMessage; - type Error = (); - - fn poll(&mut self) -> Poll>, ()> { - self.stream.poll() - } -} - -impl HandshakeFilters for Handshaker { - fn add_filter(&self, filter: F) - where - F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static, - { - self.sink.add_filter(filter); - } - - fn remove_filter(&self, filter: F) - where - F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static, - { - self.sink.remove_filter(filter); - } - - fn clear_filters(&self) { - self.sink.clear_filters(); - } -} - -//----------------------------------------------------------------------------------// - -/// `Sink` portion of the `Handshaker` for initiating handshakes. -#[allow(clippy::module_name_repetitions)] -#[derive(Clone)] -pub struct HandshakerSink { - send: Sender, - port: u16, - pid: PeerId, - filters: Filters, -} - -impl HandshakerSink { - fn new(send: Sender, port: u16, pid: PeerId, filters: Filters) -> HandshakerSink { - HandshakerSink { - send, - port, - pid, - filters, - } - } -} - -impl DiscoveryInfo for HandshakerSink { - fn port(&self) -> u16 { - self.port - } - - fn peer_id(&self) -> PeerId { - self.pid - } -} - -impl Sink for HandshakerSink { - type SinkItem = InitiateMessage; - type SinkError = SendError; - - fn start_send(&mut self, item: InitiateMessage) -> StartSend> { - self.send.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), SendError> { - self.send.poll_complete() - } -} - -impl HandshakeFilters for HandshakerSink { - fn add_filter(&self, filter: F) - where - F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static, - { - self.filters.add_filter(filter); - } - - fn remove_filter(&self, filter: F) - where - F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static, - { - self.filters.remove_filter(&filter); - } - - fn clear_filters(&self) { - self.filters.clear_filters(); - } -} - -//----------------------------------------------------------------------------------// - -/// `Stream` portion of the `Handshaker` for completed handshakes. -#[allow(clippy::module_name_repetitions)] -pub struct HandshakerStream { - recv: Receiver>, -} - -impl HandshakerStream { - fn new(recv: Receiver>) -> HandshakerStream { - HandshakerStream { recv } - } -} - -impl Stream for HandshakerStream { - type Item = CompleteMessage; - type Error = (); - - fn poll(&mut self) -> Poll>, ()> { - self.recv.poll() - } -} diff --git a/packages/handshake/src/handshake/mod.rs b/packages/handshake/src/handshake/mod.rs index f9b09e325..a89cefab0 100644 --- a/packages/handshake/src/handshake/mod.rs +++ b/packages/handshake/src/handshake/mod.rs @@ -1,3 +1,164 @@ +use std::time::Duration; +use std::{cmp, io}; + +use builder::HandshakerBuilder; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::sync::mpsc::{self, SendError}; +use futures::{Poll, StartSend}; +use sink::HandshakerSink; +use stream::HandshakerStream; +use tokio_core::reactor::Handle; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_timer::{self}; +use util::bt::PeerId; + +use crate::discovery::DiscoveryInfo; +use crate::filter::filters::Filters; +use crate::filter::{HandshakeFilter, HandshakeFilters}; +use crate::handshake::handler::listener::ListenerHandler; +use crate::handshake::handler::timer::HandshakeTimer; +use crate::handshake::handler::{handshaker, initiator}; +use crate::local_addr::LocalAddr; +use crate::message::complete::CompleteMessage; +use crate::message::initiate::InitiateMessage; +use crate::transport::Transport; + +pub mod builder; pub mod config; pub mod handler; -pub mod handshaker; +pub mod sink; +pub mod stream; + +//----------------------------------------------------------------------------------// + +/// Handshaker which is both `Stream` and `Sink`. +pub struct Handshaker { + sink: HandshakerSink, + stream: HandshakerStream, +} + +impl Handshaker { + /// Splits the `Handshaker` into its parts. + /// + /// This is an enhanced version of `Stream::split` in that the returned `Sink` implements + /// `DiscoveryInfo` so it can be cloned and passed in to different peer discovery services. + #[must_use] + pub fn into_parts(self) -> (HandshakerSink, HandshakerStream) { + (self.sink, self.stream) + } +} + +impl DiscoveryInfo for Handshaker { + fn port(&self) -> u16 { + self.sink.port() + } + + fn peer_id(&self) -> PeerId { + self.sink.peer_id() + } +} + +impl Handshaker +where + S: AsyncRead + AsyncWrite + 'static, +{ + fn with_builder(builder: &HandshakerBuilder, transport: T, handle: &Handle) -> io::Result> + where + T: Transport + 'static, + { + let listener = transport.listen(&builder.bind, handle)?; + + // Resolve our "real" public port + let open_port = if builder.port == 0 { + listener.local_addr()?.port() + } else { + builder.port + }; + + let config = builder.config; + let (addr_send, addr_recv) = mpsc::channel(config.sink_buffer_size()); + let (hand_send, hand_recv) = mpsc::channel(config.wait_buffer_size()); + let (sock_send, sock_recv) = mpsc::channel(config.done_buffer_size()); + + let filters = Filters::new(); + let (handshake_timer, initiate_timer) = configured_handshake_timers(config.handshake_timeout(), config.connect_timeout()); + + // Hook up our pipeline of handlers which will take some connection info, process it, and forward it + handler::loop_handler( + addr_recv, + initiator::initiator_handler, + hand_send.clone(), + (transport, filters.clone(), handle.clone(), initiate_timer), + handle, + ); + handler::loop_handler(listener, ListenerHandler::new, hand_send, filters.clone(), handle); + handler::loop_handler( + hand_recv.map(Result::Ok).buffer_unordered(100), + handshaker::execute_handshake, + sock_send, + (builder.ext, builder.pid, filters.clone(), handshake_timer), + handle, + ); + + let sink = HandshakerSink::new(addr_send, open_port, builder.pid, filters); + let stream = HandshakerStream::new(sock_recv); + + Ok(Handshaker { sink, stream }) + } +} + +/// Configure a timer wheel and create a `HandshakeTimer`. +fn configured_handshake_timers(duration_one: Duration, duration_two: Duration) -> (HandshakeTimer, HandshakeTimer) { + let timer = tokio_timer::wheel() + .num_slots(64) + .max_timeout(cmp::max(duration_one, duration_two)) + .build(); + + ( + HandshakeTimer::new(timer.clone(), duration_one), + HandshakeTimer::new(timer, duration_two), + ) +} + +impl Sink for Handshaker { + type SinkItem = InitiateMessage; + type SinkError = SendError; + + fn start_send(&mut self, item: InitiateMessage) -> StartSend> { + self.sink.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), SendError> { + self.sink.poll_complete() + } +} + +impl Stream for Handshaker { + type Item = CompleteMessage; + type Error = (); + + fn poll(&mut self) -> Poll>, ()> { + self.stream.poll() + } +} + +impl HandshakeFilters for Handshaker { + fn add_filter(&self, filter: F) + where + F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static, + { + self.sink.add_filter(filter); + } + + fn remove_filter(&self, filter: F) + where + F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static, + { + self.sink.remove_filter(filter); + } + + fn clear_filters(&self) { + self.sink.clear_filters(); + } +} diff --git a/packages/handshake/src/handshake/sink.rs b/packages/handshake/src/handshake/sink.rs new file mode 100644 index 000000000..191d16451 --- /dev/null +++ b/packages/handshake/src/handshake/sink.rs @@ -0,0 +1,72 @@ +//! `Sink` portion of the `Handshaker` for initiating handshakes. + +use futures::sink::Sink; +use futures::sync::mpsc::{SendError, Sender}; +use futures::{Poll, StartSend}; +use util::bt::PeerId; + +use crate::filter::filters::Filters; +use crate::{DiscoveryInfo, HandshakeFilter, HandshakeFilters, InitiateMessage}; + +#[allow(clippy::module_name_repetitions)] +#[derive(Clone)] +pub struct HandshakerSink { + send: Sender, + port: u16, + pid: PeerId, + filters: Filters, +} + +impl HandshakerSink { + pub(super) fn new(send: Sender, port: u16, pid: PeerId, filters: Filters) -> HandshakerSink { + HandshakerSink { + send, + port, + pid, + filters, + } + } +} + +impl DiscoveryInfo for HandshakerSink { + fn port(&self) -> u16 { + self.port + } + + fn peer_id(&self) -> PeerId { + self.pid + } +} + +impl Sink for HandshakerSink { + type SinkItem = InitiateMessage; + type SinkError = SendError; + + fn start_send(&mut self, item: InitiateMessage) -> StartSend> { + self.send.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), SendError> { + self.send.poll_complete() + } +} + +impl HandshakeFilters for HandshakerSink { + fn add_filter(&self, filter: F) + where + F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static, + { + self.filters.add_filter(filter); + } + + fn remove_filter(&self, filter: F) + where + F: HandshakeFilter + PartialEq + Eq + Send + Sync + 'static, + { + self.filters.remove_filter(&filter); + } + + fn clear_filters(&self) { + self.filters.clear_filters(); + } +} diff --git a/packages/handshake/src/handshake/stream.rs b/packages/handshake/src/handshake/stream.rs new file mode 100644 index 000000000..9b862ae69 --- /dev/null +++ b/packages/handshake/src/handshake/stream.rs @@ -0,0 +1,26 @@ +//! `Stream` portion of the `Handshaker` for completed handshakes. + +use futures::sync::mpsc::Receiver; +use futures::{Poll, Stream}; + +use crate::CompleteMessage; + +#[allow(clippy::module_name_repetitions)] +pub struct HandshakerStream { + recv: Receiver>, +} + +impl HandshakerStream { + pub(super) fn new(recv: Receiver>) -> HandshakerStream { + HandshakerStream { recv } + } +} + +impl Stream for HandshakerStream { + type Item = CompleteMessage; + type Error = (); + + fn poll(&mut self) -> Poll>, ()> { + self.recv.poll() + } +} diff --git a/packages/handshake/src/lib.rs b/packages/handshake/src/lib.rs index 749e38276..ab4ab9744 100644 --- a/packages/handshake/src/lib.rs +++ b/packages/handshake/src/lib.rs @@ -8,8 +8,11 @@ mod transport; pub use crate::discovery::DiscoveryInfo; pub use crate::filter::{FilterDecision, HandshakeFilter, HandshakeFilters}; +pub use crate::handshake::builder::HandshakerBuilder; pub use crate::handshake::config::HandshakerConfig; -pub use crate::handshake::handshaker::{Handshaker, HandshakerBuilder, HandshakerSink, HandshakerStream}; +pub use crate::handshake::sink::HandshakerSink; +pub use crate::handshake::stream::HandshakerStream; +pub use crate::handshake::Handshaker; pub use crate::local_addr::LocalAddr; pub use crate::message::complete::CompleteMessage; pub use crate::message::extensions::{Extension, Extensions}; From a930df5890d51e7eac3d189262e1fb3918daae71 Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Sun, 11 Aug 2024 13:38:50 +0200 Subject: [PATCH 4/6] refactor: disk manager sink, stream and builder --- packages/disk/src/disk/manager.rs | 228 ------------------ .../disk/src/disk/{ => manager}/builder.rs | 0 packages/disk/src/disk/manager/mod.rs | 83 +++++++ packages/disk/src/disk/manager/sink.rs | 106 ++++++++ packages/disk/src/disk/manager/stream.rs | 68 ++++++ packages/disk/src/disk/mod.rs | 1 - packages/disk/src/lib.rs | 6 +- 7 files changed, 261 insertions(+), 231 deletions(-) delete mode 100644 packages/disk/src/disk/manager.rs rename packages/disk/src/disk/{ => manager}/builder.rs (100%) create mode 100644 packages/disk/src/disk/manager/mod.rs create mode 100644 packages/disk/src/disk/manager/sink.rs create mode 100644 packages/disk/src/disk/manager/stream.rs diff --git a/packages/disk/src/disk/manager.rs b/packages/disk/src/disk/manager.rs deleted file mode 100644 index 17c46b377..000000000 --- a/packages/disk/src/disk/manager.rs +++ /dev/null @@ -1,228 +0,0 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; - -use crossbeam::queue::SegQueue; -use futures::sync::mpsc::{self, Receiver}; -use futures::task::{self, Task}; -use futures::{Async, AsyncSink, Poll, Sink, StartSend, Stream}; -use futures_cpupool::CpuPool; -use log::info; - -use crate::disk::builder::DiskManagerBuilder; -use crate::disk::fs::FileSystem; -use crate::disk::tasks::context::DiskManagerContext; -use crate::disk::{tasks, IDiskMessage, ODiskMessage}; - -/// `DiskManager` object which handles the storage of `Blocks` to the `FileSystem`. -#[allow(clippy::module_name_repetitions)] -#[derive(Debug)] -pub struct DiskManager { - sink: DiskManagerSink, - stream: DiskManagerStream, -} - -impl DiskManager { - /// Create a `DiskManager` from the given `DiskManagerBuilder`. - pub fn from_builder(mut builder: DiskManagerBuilder, fs: F) -> DiskManager { - let cur_sink_capacity = Arc::new(AtomicUsize::new(0)); - let sink_capacity = builder.sink_buffer_capacity(); - let stream_capacity = builder.stream_buffer_capacity(); - let pool_builder = builder.worker_config(); - - let (out_send, out_recv) = mpsc::channel(stream_capacity); - let context = DiskManagerContext::new(out_send, fs); - let task_queue = Arc::new(SegQueue::new()); - - let sink = DiskManagerSink::new( - pool_builder.create(), - context, - sink_capacity, - cur_sink_capacity.clone(), - task_queue.clone(), - ); - let stream = DiskManagerStream::new(out_recv, cur_sink_capacity, task_queue.clone()); - - DiskManager { sink, stream } - } - - /// Break the `DiskManager` into a sink and stream. - /// - /// The returned sink implements `Clone`. - #[must_use] - pub fn into_parts(self) -> (DiskManagerSink, DiskManagerStream) { - (self.sink, self.stream) - } -} - -impl Sink for DiskManager -where - F: FileSystem + Send + Sync + 'static, -{ - type SinkItem = IDiskMessage; - type SinkError = (); - - fn start_send(&mut self, item: IDiskMessage) -> StartSend { - self.sink.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), ()> { - self.sink.poll_complete() - } -} - -impl Stream for DiskManager { - type Item = ODiskMessage; - type Error = (); - - fn poll(&mut self) -> Poll, ()> { - self.stream.poll() - } -} - -//----------------------------------------------------------------------------// - -/// `DiskManagerSink` which is the sink portion of a `DiskManager`. -#[derive(Debug)] -pub struct DiskManagerSink { - pool: CpuPool, - context: DiskManagerContext, - max_capacity: usize, - cur_capacity: Arc, - task_queue: Arc>, -} - -impl Clone for DiskManagerSink { - fn clone(&self) -> DiskManagerSink { - DiskManagerSink { - pool: self.pool.clone(), - context: self.context.clone(), - max_capacity: self.max_capacity, - cur_capacity: self.cur_capacity.clone(), - task_queue: self.task_queue.clone(), - } - } -} - -impl DiskManagerSink { - fn new( - pool: CpuPool, - context: DiskManagerContext, - max_capacity: usize, - cur_capacity: Arc, - task_queue: Arc>, - ) -> DiskManagerSink { - DiskManagerSink { - pool, - context, - max_capacity, - cur_capacity, - task_queue, - } - } - - fn try_submit_work(&self) -> bool { - let cur_capacity = self.cur_capacity.fetch_add(1, Ordering::SeqCst); - - if cur_capacity < self.max_capacity { - true - } else { - self.cur_capacity.fetch_sub(1, Ordering::SeqCst); - - false - } - } -} - -impl Sink for DiskManagerSink -where - F: FileSystem + Send + Sync + 'static, -{ - type SinkItem = IDiskMessage; - type SinkError = (); - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - info!("Starting Send For DiskManagerSink With IDiskMessage"); - - if self.try_submit_work() { - info!("DiskManagerSink Submitted Work On First Attempt"); - tasks::execute_on_pool(item, &self.pool, self.context.clone()); - - return Ok(AsyncSink::Ready); - } - - // We split the sink and stream, which means these could be polled in different event loops (I think), - // so we need to add our task, but then try to submit work again, in case the receiver processed work - // right after we tried to submit the first time. - info!("DiskManagerSink Failed To Submit Work On First Attempt, Adding Task To Queue"); - self.task_queue.push(task::current()); - - if self.try_submit_work() { - // Receiver will look at the queue but wake us up, even though we don't need it to now... - info!("DiskManagerSink Submitted Work On Second Attempt"); - tasks::execute_on_pool(item, &self.pool, self.context.clone()); - - Ok(AsyncSink::Ready) - } else { - // Receiver will look at the queue eventually... - Ok(AsyncSink::NotReady(item)) - } - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - Ok(Async::Ready(())) - } -} - -//----------------------------------------------------------------------------// - -/// `DiskManagerStream` which is the stream portion of a `DiskManager`. -#[derive(Debug)] -pub struct DiskManagerStream { - recv: Receiver, - cur_capacity: Arc, - task_queue: Arc>, -} - -impl DiskManagerStream { - fn new(recv: Receiver, cur_capacity: Arc, task_queue: Arc>) -> DiskManagerStream { - DiskManagerStream { - recv, - cur_capacity, - task_queue, - } - } - - fn complete_work(&self) { - self.cur_capacity.fetch_sub(1, Ordering::SeqCst); - } -} - -impl Stream for DiskManagerStream { - type Item = ODiskMessage; - type Error = (); - - fn poll(&mut self) -> Poll, ()> { - info!("Polling DiskManagerStream For ODiskMessage"); - - match self.recv.poll() { - res @ Ok(Async::Ready(Some( - ODiskMessage::TorrentAdded(_) - | ODiskMessage::TorrentRemoved(_) - | ODiskMessage::TorrentSynced(_) - | ODiskMessage::BlockLoaded(_) - | ODiskMessage::BlockProcessed(_), - ))) => { - self.complete_work(); - - info!("Notifying DiskManager That We Can Submit More Work"); - - while let Some(task) = self.task_queue.pop() { - task.notify(); - } - - res - } - other => other, - } - } -} diff --git a/packages/disk/src/disk/builder.rs b/packages/disk/src/disk/manager/builder.rs similarity index 100% rename from packages/disk/src/disk/builder.rs rename to packages/disk/src/disk/manager/builder.rs diff --git a/packages/disk/src/disk/manager/mod.rs b/packages/disk/src/disk/manager/mod.rs new file mode 100644 index 000000000..92d27b670 --- /dev/null +++ b/packages/disk/src/disk/manager/mod.rs @@ -0,0 +1,83 @@ +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; + +use crossbeam::queue::SegQueue; +use futures::sync::mpsc; +use futures::{Poll, Sink, StartSend, Stream}; +use sink::DiskManagerSink; +use stream::DiskManagerStream; + +use crate::disk::fs::FileSystem; +use crate::disk::tasks::context::DiskManagerContext; +use crate::disk::{IDiskMessage, ODiskMessage}; +use crate::DiskManagerBuilder; + +pub mod builder; +pub mod sink; +pub mod stream; + +/// `DiskManager` object which handles the storage of `Blocks` to the `FileSystem`. +#[allow(clippy::module_name_repetitions)] +#[derive(Debug)] +pub struct DiskManager { + sink: DiskManagerSink, + stream: DiskManagerStream, +} + +impl DiskManager { + /// Create a `DiskManager` from the given `DiskManagerBuilder`. + pub fn from_builder(mut builder: DiskManagerBuilder, fs: F) -> DiskManager { + let cur_sink_capacity = Arc::new(AtomicUsize::new(0)); + let sink_capacity = builder.sink_buffer_capacity(); + let stream_capacity = builder.stream_buffer_capacity(); + let pool_builder = builder.worker_config(); + + let (out_send, out_recv) = mpsc::channel(stream_capacity); + let context = DiskManagerContext::new(out_send, fs); + let task_queue = Arc::new(SegQueue::new()); + + let sink = DiskManagerSink::new( + pool_builder.create(), + context, + sink_capacity, + cur_sink_capacity.clone(), + task_queue.clone(), + ); + let stream = DiskManagerStream::new(out_recv, cur_sink_capacity, task_queue.clone()); + + DiskManager { sink, stream } + } + + /// Break the `DiskManager` into a sink and stream. + /// + /// The returned sink implements `Clone`. + #[must_use] + pub fn into_parts(self) -> (DiskManagerSink, DiskManagerStream) { + (self.sink, self.stream) + } +} + +impl Sink for DiskManager +where + F: FileSystem + Send + Sync + 'static, +{ + type SinkItem = IDiskMessage; + type SinkError = (); + + fn start_send(&mut self, item: IDiskMessage) -> StartSend { + self.sink.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), ()> { + self.sink.poll_complete() + } +} + +impl Stream for DiskManager { + type Item = ODiskMessage; + type Error = (); + + fn poll(&mut self) -> Poll, ()> { + self.stream.poll() + } +} diff --git a/packages/disk/src/disk/manager/sink.rs b/packages/disk/src/disk/manager/sink.rs new file mode 100644 index 000000000..194d46a2c --- /dev/null +++ b/packages/disk/src/disk/manager/sink.rs @@ -0,0 +1,106 @@ +//! `DiskManagerSink` which is the sink portion of a `DiskManager`. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use crossbeam::queue::SegQueue; +use futures::task::{self, Task}; +use futures::{Async, AsyncSink, Poll, Sink, StartSend}; +use futures_cpupool::CpuPool; +use log::info; + +use crate::disk::tasks; +use crate::disk::tasks::context::DiskManagerContext; +use crate::{FileSystem, IDiskMessage}; + +#[allow(clippy::module_name_repetitions)] +#[derive(Debug)] +pub struct DiskManagerSink { + pool: CpuPool, + context: DiskManagerContext, + max_capacity: usize, + cur_capacity: Arc, + task_queue: Arc>, +} + +impl Clone for DiskManagerSink { + fn clone(&self) -> DiskManagerSink { + DiskManagerSink { + pool: self.pool.clone(), + context: self.context.clone(), + max_capacity: self.max_capacity, + cur_capacity: self.cur_capacity.clone(), + task_queue: self.task_queue.clone(), + } + } +} + +impl DiskManagerSink { + pub(super) fn new( + pool: CpuPool, + context: DiskManagerContext, + max_capacity: usize, + cur_capacity: Arc, + task_queue: Arc>, + ) -> DiskManagerSink { + DiskManagerSink { + pool, + context, + max_capacity, + cur_capacity, + task_queue, + } + } + + fn try_submit_work(&self) -> bool { + let cur_capacity = self.cur_capacity.fetch_add(1, Ordering::SeqCst); + + if cur_capacity < self.max_capacity { + true + } else { + self.cur_capacity.fetch_sub(1, Ordering::SeqCst); + + false + } + } +} + +impl Sink for DiskManagerSink +where + F: FileSystem + Send + Sync + 'static, +{ + type SinkItem = IDiskMessage; + type SinkError = (); + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + info!("Starting Send For DiskManagerSink With IDiskMessage"); + + if self.try_submit_work() { + info!("DiskManagerSink Submitted Work On First Attempt"); + tasks::execute_on_pool(item, &self.pool, self.context.clone()); + + return Ok(AsyncSink::Ready); + } + + // We split the sink and stream, which means these could be polled in different event loops (I think), + // so we need to add our task, but then try to submit work again, in case the receiver processed work + // right after we tried to submit the first time. + info!("DiskManagerSink Failed To Submit Work On First Attempt, Adding Task To Queue"); + self.task_queue.push(task::current()); + + if self.try_submit_work() { + // Receiver will look at the queue but wake us up, even though we don't need it to now... + info!("DiskManagerSink Submitted Work On Second Attempt"); + tasks::execute_on_pool(item, &self.pool, self.context.clone()); + + Ok(AsyncSink::Ready) + } else { + // Receiver will look at the queue eventually... + Ok(AsyncSink::NotReady(item)) + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + Ok(Async::Ready(())) + } +} diff --git a/packages/disk/src/disk/manager/stream.rs b/packages/disk/src/disk/manager/stream.rs new file mode 100644 index 000000000..17bbb8f8b --- /dev/null +++ b/packages/disk/src/disk/manager/stream.rs @@ -0,0 +1,68 @@ +//! `DiskManagerStream` which is the stream portion of a `DiskManager`. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use crossbeam::queue::SegQueue; +use futures::sync::mpsc::Receiver; +use futures::task::Task; +use futures::{Async, Poll, Stream}; +use log::info; + +use crate::ODiskMessage; + +#[allow(clippy::module_name_repetitions)] +#[derive(Debug)] +pub struct DiskManagerStream { + recv: Receiver, + cur_capacity: Arc, + task_queue: Arc>, +} + +impl DiskManagerStream { + pub(super) fn new( + recv: Receiver, + cur_capacity: Arc, + task_queue: Arc>, + ) -> DiskManagerStream { + DiskManagerStream { + recv, + cur_capacity, + task_queue, + } + } + + fn complete_work(&self) { + self.cur_capacity.fetch_sub(1, Ordering::SeqCst); + } +} + +impl Stream for DiskManagerStream { + type Item = ODiskMessage; + type Error = (); + + fn poll(&mut self) -> Poll, ()> { + info!("Polling DiskManagerStream For ODiskMessage"); + + match self.recv.poll() { + res @ Ok(Async::Ready(Some( + ODiskMessage::TorrentAdded(_) + | ODiskMessage::TorrentRemoved(_) + | ODiskMessage::TorrentSynced(_) + | ODiskMessage::BlockLoaded(_) + | ODiskMessage::BlockProcessed(_), + ))) => { + self.complete_work(); + + info!("Notifying DiskManager That We Can Submit More Work"); + + while let Some(task) = self.task_queue.pop() { + task.notify(); + } + + res + } + other => other, + } + } +} diff --git a/packages/disk/src/disk/mod.rs b/packages/disk/src/disk/mod.rs index 6953acf07..d79a96ee6 100644 --- a/packages/disk/src/disk/mod.rs +++ b/packages/disk/src/disk/mod.rs @@ -4,7 +4,6 @@ use util::bt::InfoHash; use crate::error::{BlockError, TorrentError}; use crate::memory::block::{Block, BlockMut}; -pub mod builder; pub mod fs; pub mod manager; mod tasks; diff --git a/packages/disk/src/lib.rs b/packages/disk/src/lib.rs index 8c813db37..a4a4ab909 100644 --- a/packages/disk/src/lib.rs +++ b/packages/disk/src/lib.rs @@ -4,9 +4,11 @@ mod memory; /// Both `Block` and `Torrent` error types. pub mod error; -pub use crate::disk::builder::DiskManagerBuilder; pub use crate::disk::fs::FileSystem; -pub use crate::disk::manager::{DiskManager, DiskManagerSink, DiskManagerStream}; +pub use crate::disk::manager::builder::DiskManagerBuilder; +pub use crate::disk::manager::sink::DiskManagerSink; +pub use crate::disk::manager::stream::DiskManagerStream; +pub use crate::disk::manager::DiskManager; pub use crate::disk::{IDiskMessage, ODiskMessage}; pub use crate::memory::block::{Block, BlockMetadata, BlockMut}; From 62efa22924f95f462a9e1351cf81fc3a8968a0ed Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Sun, 11 Aug 2024 13:52:12 +0200 Subject: [PATCH 5/6] refactor: peer manager, sink, stream, messages and fused --- packages/peer/src/lib.rs | 7 +- .../src/manager/{future/mod.rs => fused.rs} | 0 packages/peer/src/manager/messages.rs | 54 +++ packages/peer/src/manager/mod.rs | 437 +----------------- packages/peer/src/manager/sink.rs | 258 +++++++++++ packages/peer/src/manager/stream.rs | 138 ++++++ packages/peer/src/manager/task.rs | 2 +- packages/peer/src/message/mod.rs | 2 +- 8 files changed, 466 insertions(+), 432 deletions(-) rename packages/peer/src/manager/{future/mod.rs => fused.rs} (100%) create mode 100644 packages/peer/src/manager/messages.rs create mode 100644 packages/peer/src/manager/sink.rs create mode 100644 packages/peer/src/manager/stream.rs diff --git a/packages/peer/src/lib.rs b/packages/peer/src/lib.rs index 03d09cbe1..98d1a8006 100644 --- a/packages/peer/src/lib.rs +++ b/packages/peer/src/lib.rs @@ -9,10 +9,11 @@ mod protocol; pub use codec::PeerProtocolCodec; pub use crate::manager::builder::PeerManagerBuilder; +pub use crate::manager::messages::{IPeerManagerMessage, ManagedMessage, MessageId, OPeerManagerMessage}; pub use crate::manager::peer_info::PeerInfo; -pub use crate::manager::{ - IPeerManagerMessage, ManagedMessage, MessageId, OPeerManagerMessage, PeerManager, PeerManagerSink, PeerManagerStream, -}; +pub use crate::manager::sink::PeerManagerSink; +pub use crate::manager::stream::PeerManagerStream; +pub use crate::manager::PeerManager; pub use crate::protocol::{NestedPeerProtocol, PeerProtocol}; /// Serializable and deserializable protocol messages. diff --git a/packages/peer/src/manager/future/mod.rs b/packages/peer/src/manager/fused.rs similarity index 100% rename from packages/peer/src/manager/future/mod.rs rename to packages/peer/src/manager/fused.rs diff --git a/packages/peer/src/manager/messages.rs b/packages/peer/src/manager/messages.rs new file mode 100644 index 000000000..235e7f1ca --- /dev/null +++ b/packages/peer/src/manager/messages.rs @@ -0,0 +1,54 @@ +use futures::Sink; + +use crate::PeerInfo; + +/// Trait for giving `PeerManager` message information it needs. +/// +/// For any `PeerProtocol` (or plain `Codec`), that wants to be managed +/// by `PeerManager`, it must ensure that it's message type implements +/// this trait so that we have the hooks necessary to manage the peer. +pub trait ManagedMessage { + /// Retrieve a keep alive message variant. + fn keep_alive() -> Self; + + /// Whether or not this message is a keep alive message. + fn is_keep_alive(&self) -> bool; +} + +//----------------------------------------------------------------------------// + +/// Identifier for matching sent messages with received messages. +pub type MessageId = u64; + +/// Message that can be sent to the `PeerManager`. +pub enum IPeerManagerMessage

+where + P: Sink, +{ + /// Add a peer to the peer manager. + AddPeer(PeerInfo, P), + /// Remove a peer from the peer manager. + RemovePeer(PeerInfo), + /// Send a message to a peer. + SendMessage(PeerInfo, MessageId, P::SinkItem), // TODO: Support querying for statistics +} + +/// Message that can be received from the `PeerManager`. +pub enum OPeerManagerMessage { + /// Message indicating a peer has been added to the peer manager. + PeerAdded(PeerInfo), + /// Message indicating a peer has been removed from the peer manager. + PeerRemoved(PeerInfo), + /// Message indicating a message has been sent to the given peer. + SentMessage(PeerInfo, MessageId), + /// Message indicating we have received a message from a peer. + ReceivedMessage(PeerInfo, M), + /// Message indicating a peer has disconnected from us. + /// + /// Same semantics as `PeerRemoved`, but the peer is not returned. + PeerDisconnect(PeerInfo), + /// Message indicating a peer errored out. + /// + /// Same semantics as `PeerRemoved`, but the peer is not returned. + PeerError(PeerInfo, std::io::Error), +} diff --git a/packages/peer/src/manager/mod.rs b/packages/peer/src/manager/mod.rs index 86a16cba6..66972c1ea 100644 --- a/packages/peer/src/manager/mod.rs +++ b/packages/peer/src/manager/mod.rs @@ -1,4 +1,3 @@ -use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -7,21 +6,24 @@ use std::{cmp, io}; use crossbeam::queue::SegQueue; use futures::sink::Sink; use futures::stream::Stream; -use futures::sync::mpsc::{self, Receiver, Sender}; -use futures::task::{self as futures_task, Task}; -use futures::{Async, AsyncSink, Poll, StartSend}; +use futures::sync::mpsc; +use futures::{Poll, StartSend}; +use messages::{IPeerManagerMessage, ManagedMessage, OPeerManagerMessage}; +use sink::PeerManagerSink; +use stream::PeerManagerStream; use tokio_core::reactor::Handle; -use tokio_timer::{self, Timer}; use crate::manager::builder::PeerManagerBuilder; -use crate::manager::error::{PeerManagerError, PeerManagerErrorKind}; -use crate::manager::peer_info::PeerInfo; +use crate::manager::error::PeerManagerError; pub mod builder; pub mod error; +pub mod messages; pub mod peer_info; +pub mod sink; +pub mod stream; -mod future; +mod fused; mod task; // We configure our tick duration based on this, could let users configure this in the future... @@ -112,422 +114,3 @@ where self.stream.poll() } } - -//----------------------------------------------------------------------------// - -/// Sink half of a `PeerManager`. -pub struct PeerManagerSink

-where - P: Sink + Stream, -{ - handle: Handle, - timer: Timer, - build: PeerManagerBuilder, - send: Sender>, - peers: Arc>>>>, - task_queue: Arc>, -} - -impl

Clone for PeerManagerSink

-where - P: Sink + Stream, -{ - fn clone(&self) -> PeerManagerSink

{ - PeerManagerSink { - handle: self.handle.clone(), - timer: self.timer.clone(), - build: self.build, - send: self.send.clone(), - peers: self.peers.clone(), - task_queue: self.task_queue.clone(), - } - } -} - -impl

PeerManagerSink

-where - P: Sink + Stream, -{ - fn new( - handle: Handle, - timer: Timer, - build: PeerManagerBuilder, - send: Sender>, - peers: Arc>>>>, - task_queue: Arc>, - ) -> PeerManagerSink

{ - PeerManagerSink { - handle, - timer, - build, - send, - peers, - task_queue, - } - } - - fn run_with_lock_sink(&mut self, item: I, call: F, not: G) -> StartSend - where - F: FnOnce( - I, - &mut Handle, - &mut Timer, - &mut PeerManagerBuilder, - &mut Sender>, - &mut HashMap>>, - ) -> StartSend, - G: FnOnce(I) -> T, - { - let (result, took_lock) = if let Ok(mut guard) = self.peers.try_lock() { - let result = call( - item, - &mut self.handle, - &mut self.timer, - &mut self.build, - &mut self.send, - &mut *guard, - ); - - // Closure could return not ready, need to stash in that case - if result.as_ref().map(futures::AsyncSink::is_not_ready).unwrap_or(false) { - self.task_queue.push(futures_task::current()); - } - - (result, true) - } else { - self.task_queue.push(futures_task::current()); - - if let Ok(mut guard) = self.peers.try_lock() { - let result = call( - item, - &mut self.handle, - &mut self.timer, - &mut self.build, - &mut self.send, - &mut *guard, - ); - - // Closure could return not ready, need to stash in that case - if result.as_ref().map(futures::AsyncSink::is_not_ready).unwrap_or(false) { - self.task_queue.push(futures_task::current()); - } - - (result, true) - } else { - (Ok(AsyncSink::NotReady(not(item))), false) - } - }; - - if took_lock { - // Just notify a single person waiting on the lock to reduce contention - if let Some(task) = self.task_queue.pop() { - task.notify(); - } - } - - result - } - - fn run_with_lock_poll(&mut self, call: F) -> Poll - where - F: FnOnce( - &mut Handle, - &mut Timer, - &mut PeerManagerBuilder, - &mut Sender>, - &mut HashMap>>, - ) -> Poll, - { - let (result, took_lock) = if let Ok(mut guard) = self.peers.try_lock() { - let result = call( - &mut self.handle, - &mut self.timer, - &mut self.build, - &mut self.send, - &mut *guard, - ); - - (result, true) - } else { - // Stash a task - self.task_queue.push(futures_task::current()); - - // Try to get lock again in case of race condition - if let Ok(mut guard) = self.peers.try_lock() { - let result = call( - &mut self.handle, - &mut self.timer, - &mut self.build, - &mut self.send, - &mut *guard, - ); - - (result, true) - } else { - (Ok(Async::NotReady), false) - } - }; - - if took_lock { - // Just notify a single person waiting on the lock to reduce contention - if let Some(task) = self.task_queue.pop() { - task.notify(); - } - } - - result - } -} - -impl

Sink for PeerManagerSink

-where - P: Sink + Stream + 'static, - P::SinkItem: ManagedMessage, - P::Item: ManagedMessage, -{ - type SinkItem = IPeerManagerMessage

; - type SinkError = PeerManagerError; - - fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - match item { - IPeerManagerMessage::AddPeer(info, peer) => self.run_with_lock_sink( - (info, peer), - |(info, peer), handle, timer, builder, send, peers| { - if peers.len() >= builder.peer_capacity() { - Ok(AsyncSink::NotReady(IPeerManagerMessage::AddPeer(info, peer))) - } else { - match peers.entry(info) { - Entry::Occupied(_) => Err(PeerManagerError::from_kind(PeerManagerErrorKind::PeerNotFound { info })), - Entry::Vacant(vac) => { - vac.insert(task::run_peer(peer, info, send.clone(), timer.clone(), builder, handle)); - - Ok(AsyncSink::Ready) - } - } - } - }, - |(info, peer)| IPeerManagerMessage::AddPeer(info, peer), - ), - IPeerManagerMessage::RemovePeer(info) => self.run_with_lock_sink( - info, - |info, _, _, _, _, peers| { - peers - .get_mut(&info) - .ok_or_else(|| PeerManagerError::from_kind(PeerManagerErrorKind::PeerNotFound { info })) - .and_then(|send| { - send.start_send(IPeerManagerMessage::RemovePeer(info)) - .map_err(|_| panic!("bip_peer: PeerManager Failed To Send RemovePeer")) - }) - }, - |info| IPeerManagerMessage::RemovePeer(info), - ), - IPeerManagerMessage::SendMessage(info, mid, peer_message) => self.run_with_lock_sink( - (info, mid, peer_message), - |(info, mid, peer_message), _, _, _, _, peers| { - peers - .get_mut(&info) - .ok_or_else(|| PeerManagerError::from_kind(PeerManagerErrorKind::PeerNotFound { info })) - .and_then(|send| { - send.start_send(IPeerManagerMessage::SendMessage(info, mid, peer_message)) - .map_err(|_| panic!("bip_peer: PeerManager Failed to Send SendMessage")) - }) - }, - |(info, mid, peer_message)| IPeerManagerMessage::SendMessage(info, mid, peer_message), - ), - } - } - - fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { - self.run_with_lock_poll(|_, _, _, _, peers| { - for peer_mut in peers.values_mut() { - // Needs type hint in case poll fails (so that error type matches) - let result: Poll<(), Self::SinkError> = peer_mut - .poll_complete() - .map_err(|_| panic!("bip_peer: PeerManaged Failed To Poll Peer")); - - result?; - } - - Ok(Async::Ready(())) - }) - } -} - -//----------------------------------------------------------------------------// - -/// Stream half of a `PeerManager`. -#[allow(clippy::option_option)] -pub struct PeerManagerStream

-where - P: Sink + Stream, -{ - recv: Receiver>, - peers: Arc>>>>, - task_queue: Arc>, - opt_pending: Option>>, -} - -impl

PeerManagerStream

-where - P: Sink + Stream, -{ - fn new( - recv: Receiver>, - peers: Arc>>>>, - task_queue: Arc>, - ) -> PeerManagerStream

{ - PeerManagerStream { - recv, - peers, - task_queue, - opt_pending: None, - } - } - - fn run_with_lock_poll(&mut self, item: I, call: F, not: G) -> Poll - where - F: FnOnce(I, &mut HashMap>>) -> Poll, - G: FnOnce(I) -> Option>, - { - let (result, took_lock) = if let Ok(mut guard) = self.peers.try_lock() { - let result = call(item, &mut *guard); - - // Nothing calling us will return NotReady, so we don't have to push to queue here - - (result, true) - } else { - // Couldn't get the lock, stash a task away - self.task_queue.push(futures_task::current()); - - // Try to get the lock once more, in case of a race condition with stashing the task - if let Ok(mut guard) = self.peers.try_lock() { - let result = call(item, &mut *guard); - - // Nothing calling us will return NotReady, so we don't have to push to queue here - - (result, true) - } else { - // If we couldn't get the lock, stash the item - self.opt_pending = Some(not(item)); - - (Ok(Async::NotReady), false) - } - }; - - if took_lock { - // Just notify a single person waiting on the lock to reduce contention - if let Some(task) = self.task_queue.pop() { - task.notify(); - } - } - - result - } -} - -impl

Stream for PeerManagerStream

-where - P: Sink + Stream, -{ - type Item = OPeerManagerMessage; - type Error = (); - - fn poll(&mut self) -> Poll, Self::Error> { - // Intercept and propagate any messages indicating the peer shutdown so we can remove them from our peer map - let next_message = self - .opt_pending - .take() - .map(|pending| Ok(Async::Ready(pending))) - .unwrap_or_else(|| self.recv.poll()); - - next_message.and_then(|result| match result { - Async::Ready(Some(OPeerManagerMessage::PeerRemoved(info))) => self.run_with_lock_poll( - info, - |info, peers| { - peers - .remove(&info) - .unwrap_or_else(|| panic!("bip_peer: Received PeerRemoved Message With No Matching Peer In Map")); - - Ok(Async::Ready(Some(OPeerManagerMessage::PeerRemoved(info)))) - }, - |info| Some(OPeerManagerMessage::PeerRemoved(info)), - ), - Async::Ready(Some(OPeerManagerMessage::PeerDisconnect(info))) => self.run_with_lock_poll( - info, - |info, peers| { - peers - .remove(&info) - .unwrap_or_else(|| panic!("bip_peer: Received PeerDisconnect Message With No Matching Peer In Map")); - - Ok(Async::Ready(Some(OPeerManagerMessage::PeerDisconnect(info)))) - }, - |info| Some(OPeerManagerMessage::PeerDisconnect(info)), - ), - Async::Ready(Some(OPeerManagerMessage::PeerError(info, error))) => self.run_with_lock_poll( - (info, error), - |(info, error), peers| { - peers - .remove(&info) - .unwrap_or_else(|| panic!("bip_peer: Received PeerError Message With No Matching Peer In Map")); - - Ok(Async::Ready(Some(OPeerManagerMessage::PeerError(info, error)))) - }, - |(info, error)| Some(OPeerManagerMessage::PeerError(info, error)), - ), - other => Ok(other), - }) - } -} - -//----------------------------------------------------------------------------// - -/// Trait for giving `PeerManager` message information it needs. -/// -/// For any `PeerProtocol` (or plain `Codec`), that wants to be managed -/// by `PeerManager`, it must ensure that it's message type implements -/// this trait so that we have the hooks necessary to manage the peer. -pub trait ManagedMessage { - /// Retrieve a keep alive message variant. - fn keep_alive() -> Self; - - /// Whether or not this message is a keep alive message. - fn is_keep_alive(&self) -> bool; -} - -//----------------------------------------------------------------------------// - -/// Identifier for matching sent messages with received messages. -pub type MessageId = u64; - -/// Message that can be sent to the `PeerManager`. -pub enum IPeerManagerMessage

-where - P: Sink, -{ - /// Add a peer to the peer manager. - AddPeer(PeerInfo, P), - /// Remove a peer from the peer manager. - RemovePeer(PeerInfo), - /// Send a message to a peer. - SendMessage(PeerInfo, MessageId, P::SinkItem), // TODO: Support querying for statistics -} - -/// Message that can be received from the `PeerManager`. -pub enum OPeerManagerMessage { - /// Message indicating a peer has been added to the peer manager. - PeerAdded(PeerInfo), - /// Message indicating a peer has been removed from the peer manager. - PeerRemoved(PeerInfo), - /// Message indicating a message has been sent to the given peer. - SentMessage(PeerInfo, MessageId), - /// Message indicating we have received a message from a peer. - ReceivedMessage(PeerInfo, M), - /// Message indicating a peer has disconnected from us. - /// - /// Same semantics as `PeerRemoved`, but the peer is not returned. - PeerDisconnect(PeerInfo), - /// Message indicating a peer errored out. - /// - /// Same semantics as `PeerRemoved`, but the peer is not returned. - PeerError(PeerInfo, io::Error), -} diff --git a/packages/peer/src/manager/sink.rs b/packages/peer/src/manager/sink.rs new file mode 100644 index 000000000..4216f623a --- /dev/null +++ b/packages/peer/src/manager/sink.rs @@ -0,0 +1,258 @@ +//! Sink half of a `PeerManager`. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use crossbeam::queue::SegQueue; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::sync::mpsc::Sender; +use futures::task::{self as futures_task, Task}; +use futures::{Async, AsyncSink, Poll, StartSend}; +use tokio_core::reactor::Handle; +use tokio_timer::{self, Timer}; + +use super::messages::{IPeerManagerMessage, ManagedMessage, OPeerManagerMessage}; +use super::task; +use crate::manager::builder::PeerManagerBuilder; +use crate::manager::error::{PeerManagerError, PeerManagerErrorKind}; +use crate::manager::peer_info::PeerInfo; + +#[allow(clippy::module_name_repetitions)] +pub struct PeerManagerSink

+where + P: Sink + Stream, +{ + handle: Handle, + timer: Timer, + build: PeerManagerBuilder, + send: Sender>, + peers: Arc>>>>, + task_queue: Arc>, +} + +impl

Clone for PeerManagerSink

+where + P: Sink + Stream, +{ + fn clone(&self) -> PeerManagerSink

{ + PeerManagerSink { + handle: self.handle.clone(), + timer: self.timer.clone(), + build: self.build, + send: self.send.clone(), + peers: self.peers.clone(), + task_queue: self.task_queue.clone(), + } + } +} + +impl

PeerManagerSink

+where + P: Sink + Stream, +{ + pub(super) fn new( + handle: Handle, + timer: Timer, + build: PeerManagerBuilder, + send: Sender>, + peers: Arc>>>>, + task_queue: Arc>, + ) -> PeerManagerSink

{ + PeerManagerSink { + handle, + timer, + build, + send, + peers, + task_queue, + } + } + + fn run_with_lock_sink(&mut self, item: I, call: F, not: G) -> StartSend + where + F: FnOnce( + I, + &mut Handle, + &mut Timer, + &mut PeerManagerBuilder, + &mut Sender>, + &mut HashMap>>, + ) -> StartSend, + G: FnOnce(I) -> T, + { + let (result, took_lock) = if let Ok(mut guard) = self.peers.try_lock() { + let result = call( + item, + &mut self.handle, + &mut self.timer, + &mut self.build, + &mut self.send, + &mut *guard, + ); + + // Closure could return not ready, need to stash in that case + if result.as_ref().map(futures::AsyncSink::is_not_ready).unwrap_or(false) { + self.task_queue.push(futures_task::current()); + } + + (result, true) + } else { + self.task_queue.push(futures_task::current()); + + if let Ok(mut guard) = self.peers.try_lock() { + let result = call( + item, + &mut self.handle, + &mut self.timer, + &mut self.build, + &mut self.send, + &mut *guard, + ); + + // Closure could return not ready, need to stash in that case + if result.as_ref().map(futures::AsyncSink::is_not_ready).unwrap_or(false) { + self.task_queue.push(futures_task::current()); + } + + (result, true) + } else { + (Ok(AsyncSink::NotReady(not(item))), false) + } + }; + + if took_lock { + // Just notify a single person waiting on the lock to reduce contention + if let Some(task) = self.task_queue.pop() { + task.notify(); + } + } + + result + } + + fn run_with_lock_poll(&mut self, call: F) -> Poll + where + F: FnOnce( + &mut Handle, + &mut Timer, + &mut PeerManagerBuilder, + &mut Sender>, + &mut HashMap>>, + ) -> Poll, + { + let (result, took_lock) = if let Ok(mut guard) = self.peers.try_lock() { + let result = call( + &mut self.handle, + &mut self.timer, + &mut self.build, + &mut self.send, + &mut *guard, + ); + + (result, true) + } else { + // Stash a task + self.task_queue.push(futures_task::current()); + + // Try to get lock again in case of race condition + if let Ok(mut guard) = self.peers.try_lock() { + let result = call( + &mut self.handle, + &mut self.timer, + &mut self.build, + &mut self.send, + &mut *guard, + ); + + (result, true) + } else { + (Ok(Async::NotReady), false) + } + }; + + if took_lock { + // Just notify a single person waiting on the lock to reduce contention + if let Some(task) = self.task_queue.pop() { + task.notify(); + } + } + + result + } +} + +impl

Sink for PeerManagerSink

+where + P: Sink + Stream + 'static, + P::SinkItem: ManagedMessage, + P::Item: ManagedMessage, +{ + type SinkItem = IPeerManagerMessage

; + type SinkError = PeerManagerError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + match item { + IPeerManagerMessage::AddPeer(info, peer) => self.run_with_lock_sink( + (info, peer), + |(info, peer), handle, timer, builder, send, peers| { + if peers.len() >= builder.peer_capacity() { + Ok(AsyncSink::NotReady(IPeerManagerMessage::AddPeer(info, peer))) + } else { + match peers.entry(info) { + Entry::Occupied(_) => Err(PeerManagerError::from_kind(PeerManagerErrorKind::PeerNotFound { info })), + Entry::Vacant(vac) => { + vac.insert(task::run_peer(peer, info, send.clone(), timer.clone(), builder, handle)); + + Ok(AsyncSink::Ready) + } + } + } + }, + |(info, peer)| IPeerManagerMessage::AddPeer(info, peer), + ), + IPeerManagerMessage::RemovePeer(info) => self.run_with_lock_sink( + info, + |info, _, _, _, _, peers| { + peers + .get_mut(&info) + .ok_or_else(|| PeerManagerError::from_kind(PeerManagerErrorKind::PeerNotFound { info })) + .and_then(|send| { + send.start_send(IPeerManagerMessage::RemovePeer(info)) + .map_err(|_| panic!("bip_peer: PeerManager Failed To Send RemovePeer")) + }) + }, + |info| IPeerManagerMessage::RemovePeer(info), + ), + IPeerManagerMessage::SendMessage(info, mid, peer_message) => self.run_with_lock_sink( + (info, mid, peer_message), + |(info, mid, peer_message), _, _, _, _, peers| { + peers + .get_mut(&info) + .ok_or_else(|| PeerManagerError::from_kind(PeerManagerErrorKind::PeerNotFound { info })) + .and_then(|send| { + send.start_send(IPeerManagerMessage::SendMessage(info, mid, peer_message)) + .map_err(|_| panic!("bip_peer: PeerManager Failed to Send SendMessage")) + }) + }, + |(info, mid, peer_message)| IPeerManagerMessage::SendMessage(info, mid, peer_message), + ), + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.run_with_lock_poll(|_, _, _, _, peers| { + for peer_mut in peers.values_mut() { + // Needs type hint in case poll fails (so that error type matches) + let result: Poll<(), Self::SinkError> = peer_mut + .poll_complete() + .map_err(|_| panic!("bip_peer: PeerManaged Failed To Poll Peer")); + + result?; + } + + Ok(Async::Ready(())) + }) + } +} diff --git a/packages/peer/src/manager/stream.rs b/packages/peer/src/manager/stream.rs new file mode 100644 index 000000000..ffd6052e4 --- /dev/null +++ b/packages/peer/src/manager/stream.rs @@ -0,0 +1,138 @@ +//! Stream half of a `PeerManager`. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use crossbeam::queue::SegQueue; +use futures::sink::Sink; +use futures::stream::Stream; +use futures::sync::mpsc::{Receiver, Sender}; +use futures::task::{self as futures_task, Task}; +use futures::{Async, Poll}; + +use super::messages::{IPeerManagerMessage, OPeerManagerMessage}; +use crate::manager::peer_info::PeerInfo; + +#[allow(clippy::module_name_repetitions)] +#[allow(clippy::option_option)] +pub struct PeerManagerStream

+where + P: Sink + Stream, +{ + recv: Receiver>, + peers: Arc>>>>, + task_queue: Arc>, + opt_pending: Option>>, +} + +impl

PeerManagerStream

+where + P: Sink + Stream, +{ + pub(super) fn new( + recv: Receiver>, + peers: Arc>>>>, + task_queue: Arc>, + ) -> PeerManagerStream

{ + PeerManagerStream { + recv, + peers, + task_queue, + opt_pending: None, + } + } + + fn run_with_lock_poll(&mut self, item: I, call: F, not: G) -> Poll + where + F: FnOnce(I, &mut HashMap>>) -> Poll, + G: FnOnce(I) -> Option>, + { + let (result, took_lock) = if let Ok(mut guard) = self.peers.try_lock() { + let result = call(item, &mut *guard); + + // Nothing calling us will return NotReady, so we don't have to push to queue here + + (result, true) + } else { + // Couldn't get the lock, stash a task away + self.task_queue.push(futures_task::current()); + + // Try to get the lock once more, in case of a race condition with stashing the task + if let Ok(mut guard) = self.peers.try_lock() { + let result = call(item, &mut *guard); + + // Nothing calling us will return NotReady, so we don't have to push to queue here + + (result, true) + } else { + // If we couldn't get the lock, stash the item + self.opt_pending = Some(not(item)); + + (Ok(Async::NotReady), false) + } + }; + + if took_lock { + // Just notify a single person waiting on the lock to reduce contention + if let Some(task) = self.task_queue.pop() { + task.notify(); + } + } + + result + } +} + +impl

Stream for PeerManagerStream

+where + P: Sink + Stream, +{ + type Item = OPeerManagerMessage; + type Error = (); + + fn poll(&mut self) -> Poll, Self::Error> { + // Intercept and propagate any messages indicating the peer shutdown so we can remove them from our peer map + let next_message = self + .opt_pending + .take() + .map(|pending| Ok(Async::Ready(pending))) + .unwrap_or_else(|| self.recv.poll()); + + next_message.and_then(|result| match result { + Async::Ready(Some(OPeerManagerMessage::PeerRemoved(info))) => self.run_with_lock_poll( + info, + |info, peers| { + peers + .remove(&info) + .unwrap_or_else(|| panic!("bip_peer: Received PeerRemoved Message With No Matching Peer In Map")); + + Ok(Async::Ready(Some(OPeerManagerMessage::PeerRemoved(info)))) + }, + |info| Some(OPeerManagerMessage::PeerRemoved(info)), + ), + Async::Ready(Some(OPeerManagerMessage::PeerDisconnect(info))) => self.run_with_lock_poll( + info, + |info, peers| { + peers + .remove(&info) + .unwrap_or_else(|| panic!("bip_peer: Received PeerDisconnect Message With No Matching Peer In Map")); + + Ok(Async::Ready(Some(OPeerManagerMessage::PeerDisconnect(info)))) + }, + |info| Some(OPeerManagerMessage::PeerDisconnect(info)), + ), + Async::Ready(Some(OPeerManagerMessage::PeerError(info, error))) => self.run_with_lock_poll( + (info, error), + |(info, error), peers| { + peers + .remove(&info) + .unwrap_or_else(|| panic!("bip_peer: Received PeerError Message With No Matching Peer In Map")); + + Ok(Async::Ready(Some(OPeerManagerMessage::PeerError(info, error)))) + }, + |(info, error)| Some(OPeerManagerMessage::PeerError(info, error)), + ), + other => Ok(other), + }) + } +} diff --git a/packages/peer/src/manager/task.rs b/packages/peer/src/manager/task.rs index 9d2ea5376..8946773b1 100644 --- a/packages/peer/src/manager/task.rs +++ b/packages/peer/src/manager/task.rs @@ -10,7 +10,7 @@ use tokio_core::reactor::Handle; use tokio_timer::Timer; use crate::manager::builder::PeerManagerBuilder; -use crate::manager::future::{PersistentError, PersistentStream, RecurringTimeoutError, RecurringTimeoutStream}; +use crate::manager::fused::{PersistentError, PersistentStream, RecurringTimeoutError, RecurringTimeoutStream}; use crate::manager::peer_info::PeerInfo; use crate::manager::{IPeerManagerMessage, ManagedMessage, OPeerManagerMessage}; diff --git a/packages/peer/src/message/mod.rs b/packages/peer/src/message/mod.rs index c11dbe3d2..49b1d8fdd 100644 --- a/packages/peer/src/message/mod.rs +++ b/packages/peer/src/message/mod.rs @@ -10,7 +10,7 @@ use byteorder::{BigEndian, WriteBytesExt}; use bytes::Bytes; use nom::{alt, be_u32, be_u8, call, error_node_position, error_position, map, opt, switch, tuple, tuple_parser, value, IResult}; -use crate::manager::ManagedMessage; +use crate::manager::messages::ManagedMessage; use crate::protocol::PeerProtocol; // TODO: Propagate failures to cast values to/from usize From 7badfcaf958048501cb925afafb62666b75116ff Mon Sep 17 00:00:00 2001 From: Cameron Garnham Date: Sun, 11 Aug 2024 13:56:15 +0200 Subject: [PATCH 6/6] refactor: rename uber to uber/mod --- packages/select/src/{uber.rs => uber/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename packages/select/src/{uber.rs => uber/mod.rs} (100%) diff --git a/packages/select/src/uber.rs b/packages/select/src/uber/mod.rs similarity index 100% rename from packages/select/src/uber.rs rename to packages/select/src/uber/mod.rs