diff --git a/Cargo.lock b/Cargo.lock index 506173a..e943159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + [[package]] name = "bitflags" version = "2.6.0" @@ -72,6 +78,7 @@ version = "1.0.0" dependencies = [ "borsh", "libc", + "parking_lot", "tempfile", "walkdir", ] @@ -104,6 +111,16 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "memchr" version = "2.7.1" @@ -116,6 +133,29 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "proc-macro-crate" version = "3.1.0" @@ -166,6 +206,15 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags", +] + [[package]] name = "rustix" version = "0.38.34" @@ -188,6 +237,18 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "syn" version = "2.0.49" diff --git a/Cargo.toml b/Cargo.toml index 58b59f5..af1c76f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ publish = false libc = "0.2.153" borsh = { version = "1.3.1", features = ["derive"] } walkdir = "2.5.0" +parking_lot = "0.12.3" [profile.dev] panic = "abort" diff --git a/src/main.rs b/src/main.rs index 65dd28d..11f27f3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,6 @@ use std::net::{SocketAddr, TcpStream}; use std::os::fd::AsRawFd; use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{mpsc, Arc, Mutex}; use std::time::Instant; use walkdir::WalkDir; @@ -104,7 +103,7 @@ impl TransferPlan { fn assert_paths_relative(&self) { for file in &self.files { assert!( - !file.name.starts_with("/"), + !file.name.starts_with('/'), "Transferring files with an absolute path name is not allowed.", ); } @@ -190,11 +189,16 @@ fn print_progress(offset: u64, len: u64, start_time: Instant) { ); } +enum SendStateInner { + Pending { fname: PathBuf }, + InProgress { file: File, offset: u64 }, + Done, +} + struct SendState { id: FileId, len: u64, - offset: AtomicU64, - in_file_path: PathBuf, + state: parking_lot::Mutex, } enum SendResult { @@ -235,20 +239,40 @@ impl SendState { // However, this now introduces the possibility of files getting deleted between // getting listed and their turn for transfer. // A vanishing file is expected, it is not a transfer-terminating event. - let offset = self.offset.fetch_add(MAX_CHUNK_LEN, Ordering::SeqCst); + let mut state = self.state.lock(); + let offset = match *state { + SendStateInner::Pending { ref fname } => { + let res = std::fs::File::open(fname); + if let Err(ref e) = res { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(SendResult::FileVanished); + } + } + *state = SendStateInner::InProgress { + file: res?, + offset: 0, + }; + 0 + } + SendStateInner::Done => { + return Ok(SendResult::Done); + } + SendStateInner::InProgress { + file: _, + mut offset, + } => { + offset += MAX_CHUNK_LEN; + offset + } + }; + let end = self.len.min(offset + MAX_CHUNK_LEN); if offset >= self.len || offset >= end { + *state = SendStateInner::Done; return Ok(SendResult::Done); } - let res = std::fs::File::open(&self.in_file_path); - if let Err(e) = std::fs::File::open(&self.in_file_path) { - if e.kind() == std::io::ErrorKind::NotFound { - return Ok(SendResult::FileVanished); - } - } - let in_file = res?; print_progress(offset, self.len, start_time); let header = ChunkHeader { @@ -263,10 +287,19 @@ impl SendState { borsh::to_vec(&header)?.len() ); + let in_fd = if let SendStateInner::InProgress { + ref file, + offset: _, + } = *state + { + file.as_raw_fd() + } else { + unreachable!(); + }; + let end = end as i64; let mut off = offset as i64; let out_fd = out.as_raw_fd(); - let in_fd = in_file.as_raw_fd(); let mut total_written: u64 = 0; while off < end { let count = (end - off) as usize; @@ -321,8 +354,7 @@ fn main_send( let mut send_states = Vec::new(); for (i, fname) in all_filenames_from_path_names(fnames)?.iter().enumerate() { - let file = std::fs::File::open(&fname)?; - let metadata = file.metadata()?; + let metadata = std::fs::metadata(fname)?; let file_plan = FilePlan { name: fname.clone(), len: metadata.len(), @@ -330,8 +362,9 @@ fn main_send( let state = SendState { id: FileId::from_usize(i), len: metadata.len(), - offset: AtomicU64::new(0), - in_file_path: fname.into(), + state: parking_lot::Mutex::new(SendStateInner::Pending { + fname: fname.into(), + }), }; plan.files.push(file_plan); send_states.push(state); @@ -376,7 +409,7 @@ fn main_send( // Stop the listener, don't send anything over our new connection. let is_done = state_arc .iter() - .all(|f| f.offset.load(Ordering::SeqCst) >= f.len); + .all(|f| matches!(*f.state.lock(), SendStateInner::Done)); if is_done { break; } @@ -403,7 +436,7 @@ fn main_send( } match file.send_one(start_time, &mut stream) { Ok(SendResult::FileVanished) => { - println!("File {:?} vanished", file.in_file_path); + println!("File {:?} vanished", file.id); } Ok(SendResult::Progress { bytes_sent: bytes_written, @@ -446,7 +479,7 @@ struct FileReceiver { /// We don’t open the file immediately so we don’t create a zero-sized file /// when a transfer fails. We only open the file after we have at least some /// data for it. - file_created: bool, + out_file: Option, /// Chunks that we cannot yet write because a preceding chunk has not yet arrived. pending: HashMap, @@ -462,7 +495,7 @@ impl FileReceiver { fn new(plan: FilePlan) -> FileReceiver { FileReceiver { fname: plan.name, - file_created: false, + out_file: None, pending: HashMap::new(), offset: 0, total_len: plan.len, @@ -471,29 +504,28 @@ impl FileReceiver { /// Write or buffer a chunk that we received for this file. fn handle_chunk(&mut self, chunk: Chunk) -> Result<()> { - let path: &Path = self.fname.as_ref(); - let mut out_file = if !self.file_created { - if let Some(dir) = path.parent() { - std::fs::create_dir_all(dir)?; + let mut out_file = match self.out_file.take() { + None => { + let path: &Path = self.fname.as_ref(); + if let Some(dir) = path.parent() { + std::fs::create_dir_all(dir)?; + } + let file = File::create(path)?; + + // Resize the file to its final size already: + // * So that the file system can do a better job of allocating + // a single extent for it, and it doesn't have to fragment + // the file. + // * If we run out of space, we learn about that before we waste + // time on the transfer (although then maybe we should do it + // before we receive a chunk after all?). + // This can make debugging a bit harder, because when you look + // at just the file size you might think it's fully transferred. + file.set_len(self.total_len)?; + + file } - let file = File::create(path)?; - - // Resize the file to its final size already: - // * So that the file system can do a better job of allocating - // a single extent for it, and it doesn't have to fragment - // the file. - // * If we run out of space, we learn about that before we waste - // time on the transfer (although then maybe we should do it - // before we receive a chunk after all?). - // This can make debugging a bit harder, because when you look - // at just the file size you might think it's fully transferred. - file.set_len(self.total_len)?; - - self.file_created = true; - - file - } else { - File::options().append(true).open(path)? + Some(f) => f, }; self.pending.insert(chunk.offset, chunk); @@ -737,11 +769,11 @@ mod tests { #[test] fn test_sends_20_thousand_files() { let (events_tx, events_rx) = std::sync::mpsc::channel::(); + env::set_current_dir("/tmp/").unwrap(); let cwd = env::current_dir().unwrap(); thread::spawn(|| { let td = TempDir::new_in(".").unwrap(); let tmp_path = td.path().strip_prefix(cwd).unwrap(); - println!("{tmp_path:?}"); let mut fnames = Vec::new(); for i in 0..20_000 { let path = tmp_path.join(i.to_string());