Skip to content

Commit

Permalink
fix disk bench
Browse files Browse the repository at this point in the history
  • Loading branch information
da2ce7 committed Aug 10, 2024
1 parent e48d96c commit 9574a7e
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 18 deletions.
2 changes: 1 addition & 1 deletion packages/disk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ thiserror = "1"
[dev-dependencies]
tracing-subscriber = "0"
rand = "0"
criterion = { version = "0", features = ["async_futures"] }
criterion = { version = "0", features = ["async_tokio"] }


[[bench]]
Expand Down
23 changes: 16 additions & 7 deletions packages/disk/benches/disk_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::fs;
use std::sync::Arc;

use bytes::BytesMut;
use criterion::async_executor::FuturesExecutor;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use disk::error::TorrentError;
use disk::fs::NativeFileSystem;
use disk::fs_cache::FileHandleCache;
use disk::{Block, BlockMetadata, DiskManagerBuilder, FileSystem, IDiskMessage, InfoHash, ODiskMessage};
Expand Down Expand Up @@ -51,18 +51,25 @@ where
{
let mut block_send_guard = block_send.lock().await;
block_send_guard.send(IDiskMessage::AddTorrent(metainfo)).await.unwrap();
} // MutexGuard is dropped here
}

while let Some(res_message) = {
let mut block_recv_guard = block_recv.lock().await;
block_recv_guard.next().await
} {
match res_message.unwrap() {
let error = match res_message.unwrap() {
ODiskMessage::TorrentAdded(_) => {
break;
}
ODiskMessage::FoundGoodPiece(_, _) => (),
_ => panic!("Didn't Receive TorrentAdded"),
ODiskMessage::FoundGoodPiece(_, _) => continue,
ODiskMessage::TorrentError(_, error) => error,

other => panic!("should receive `TorrentAdded` or `FoundGoodPiece`, but got: {other:?}"),
};

match error {
TorrentError::ExistingInfoHash { .. } => break,
other => panic!("should receive `TorrentAdded` or `FoundGoodPiece`, but got: {other:?}"),
}
}
}
Expand Down Expand Up @@ -125,7 +132,7 @@ where
match res_message.unwrap() {
ODiskMessage::BlockProcessed(_) => blocks_sent -= 1,
ODiskMessage::FoundGoodPiece(_, _) | ODiskMessage::FoundBadPiece(_, _) => (),
_ => panic!("Unexpected Message Received In process_blocks"),
other => panic!("should receive `BlockProcessed`, `FoundGoodPiece` or `FoundBadPiece`, but got: {other:?}"),
}

if blocks_sent == 0 {
Expand Down Expand Up @@ -168,9 +175,11 @@ fn bench_process_file_with_fs<F>(
block_recv: block_recv.clone(),
};

let runner = &tokio::runtime::Runtime::new().unwrap();

c.bench_with_input(id, &Arc::new(data), |b, i| {
let metainfo_clone = metainfo.clone();
b.to_async(FuturesExecutor).iter(move || {
b.to_async(runner).iter(move || {
let data = i.clone();
let metainfo = metainfo_clone.clone();
async move {
Expand Down
20 changes: 14 additions & 6 deletions packages/disk/src/disk/tasks/context.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -72,20 +73,27 @@ where
&self.fs
}

pub fn insert_torrent(&self, file: Metainfo, state: &Arc<Mutex<PieceCheckerState>>) -> bool {
pub fn insert_torrent(
&self,
file: Metainfo,
state: &Arc<Mutex<PieceCheckerState>>,
) -> Result<InfoHash, (InfoHash, MetainfoState)> {
let mut write_torrents = self
.torrents
.write()
.expect("bip_disk: DiskManagerContext::insert_torrents Failed To Write Torrent");

let hash = file.info().info_hash();
let hash_not_exists = !write_torrents.contains_key(&hash);

if hash_not_exists {
write_torrents.insert(hash, MetainfoState::new(file, state.clone()));
}
let entry = write_torrents.entry(hash);

hash_not_exists
match entry {
Entry::Occupied(key) => Err((hash, key.get().clone())),
Entry::Vacant(vac) => {
vac.insert(MetainfoState::new(file, state.clone()));
Ok(hash)
}
}
}

pub async fn update_torrent<'a, C, D>(self, hash: InfoHash, with_state: C) -> Option<D>
Expand Down
7 changes: 3 additions & 4 deletions packages/disk/src/disk/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ where
// In case we are resuming a download, we need to send the diff for the newly added torrent
send_piece_diff(&init_state, info_hash, sender, true).await;

if context.insert_torrent(file, &init_state) {
Ok(())
} else {
Err(TorrentError::ExistingInfoHash { hash: info_hash })
match context.insert_torrent(file, &init_state) {
Ok(_) => Ok(()),
Err((hash, _)) => Err(TorrentError::ExistingInfoHash { hash }),
}
}

Expand Down

0 comments on commit 9574a7e

Please sign in to comment.