From 066d081a98c562973c89cb8bf4d5c18b750301b0 Mon Sep 17 00:00:00 2001 From: David Ventura Date: Tue, 26 Nov 2024 18:21:36 +0100 Subject: [PATCH 1/7] Remove max file limit on transfer --- src/main.rs | 128 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 91 insertions(+), 37 deletions(-) diff --git a/src/main.rs b/src/main.rs index ed841b0..710da1e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::fs::File; use std::io::{Error, ErrorKind, Read, Result, Write}; use std::net::{SocketAddr, TcpStream}; use std::os::fd::AsRawFd; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{mpsc, Arc, Mutex}; @@ -56,7 +56,7 @@ Receiver options: to 32 is probably overkill. "; -const WIRE_PROTO_VERSION: u16 = 1; +const WIRE_PROTO_VERSION: u16 = 2; const MAX_CHUNK_LEN: u64 = 4096 * 64; /// Metadata about all the files we want to transfer. @@ -113,11 +113,11 @@ impl TransferPlan { /// The index of a file in the transfer plan. #[derive(BorshDeserialize, BorshSerialize, Copy, Clone, Debug, Eq, Hash, PartialEq)] -struct FileId(u16); +struct FileId(u64); impl FileId { fn from_usize(i: usize) -> FileId { - assert!(i < u16::MAX as usize, "Can transfer at most 2^16 files."); + assert!(i < u64::MAX as usize, "Can transfer at most 2^64 files."); FileId(i as _) } } @@ -194,17 +194,18 @@ struct SendState { id: FileId, len: u64, offset: AtomicU64, - in_file: File, + in_file_path: PathBuf, } enum SendResult { Done, + FileVanished, Progress { bytes_sent: u64 }, } /// Metadata about a chunk of data that follows. /// -/// The Borsh-generated representation of this is zero-overhead (14 bytes). +/// The Borsh-generated representation of this is zero-overhead (20 bytes). #[derive(BorshDeserialize, BorshSerialize, Debug)] struct ChunkHeader { /// Which file is the chunk from? @@ -218,8 +219,8 @@ struct ChunkHeader { } impl ChunkHeader { - fn to_bytes(&self) -> [u8; 14] { - let mut buffer = [0_u8; 14]; + fn to_bytes(&self) -> [u8; 20] { + let mut buffer = [0_u8; 20]; let mut cursor = std::io::Cursor::new(&mut buffer[..]); self.serialize(&mut cursor) .expect("Writing to memory never fails."); @@ -229,6 +230,11 @@ impl ChunkHeader { impl SendState { pub fn send_one(&self, start_time: Instant, out: &mut TcpStream) -> Result { + // By deferring the opening of the file descriptor to this point, + // we effectively limit the amount of open files to the amount of send threads. + // However, this now introduces the possibility of files getting deleted between + // getting listed and their turn for transfer. + // A vanishing file is expected, it is not a transfer-terminating event. let offset = self.offset.fetch_add(MAX_CHUNK_LEN, Ordering::SeqCst); let end = self.len.min(offset + MAX_CHUNK_LEN); @@ -236,6 +242,13 @@ impl SendState { return Ok(SendResult::Done); } + let res = std::fs::File::open(&self.in_file_path); + if let Err(e) = std::fs::File::open(&self.in_file_path) { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(SendResult::FileVanished); + } + } + let in_file = res?; print_progress(offset, self.len, start_time); let header = ChunkHeader { @@ -253,7 +266,7 @@ impl SendState { let end = end as i64; let mut off = offset as i64; let out_fd = out.as_raw_fd(); - let in_fd = self.in_file.as_raw_fd(); + let in_fd = in_file.as_raw_fd(); let mut total_written: u64 = 0; while off < end { let count = (end - off) as usize; @@ -318,7 +331,7 @@ fn main_send( id: FileId::from_usize(i), len: metadata.len(), offset: AtomicU64::new(0), - in_file: file, + in_file_path: fname.into(), }; plan.files.push(file_plan); send_states.push(state); @@ -389,6 +402,9 @@ fn main_send( std::thread::sleep(to_wait.unwrap()); } match file.send_one(start_time, &mut stream) { + Ok(SendResult::FileVanished) => { + println!("File {:?} vanished", file.in_file_path); + } Ok(SendResult::Progress { bytes_sent: bytes_written, }) => { @@ -427,12 +443,10 @@ struct Chunk { struct FileReceiver { fname: String, - /// The file we’re writing to, if we have started writing. - /// /// We don’t open the file immediately so we don’t create a zero-sized file /// when a transfer fails. We only open the file after we have at least some /// data for it. - out_file: Option, + file_created: bool, /// Chunks that we cannot yet write because a preceding chunk has not yet arrived. pending: HashMap, @@ -448,7 +462,7 @@ impl FileReceiver { fn new(plan: FilePlan) -> FileReceiver { FileReceiver { fname: plan.name, - out_file: None, + file_created: false, pending: HashMap::new(), offset: 0, total_len: plan.len, @@ -457,28 +471,29 @@ impl FileReceiver { /// Write or buffer a chunk that we received for this file. fn handle_chunk(&mut self, chunk: Chunk) -> Result<()> { - let mut out_file = match self.out_file.take() { - None => { - let path: &Path = self.fname.as_ref(); - if let Some(dir) = path.parent() { - std::fs::create_dir_all(dir)?; - } - let file = File::create(path)?; - - // Resize the file to its final size already: - // * So that the file system can do a better job of allocating - // a single extent for it, and it doesn't have to fragment - // the file. - // * If we run out of space, we learn about that before we waste - // time on the transfer (although then maybe we should do it - // before we receive a chunk after all?). - // This can make debugging a bit harder, because when you look - // at just the file size you might think it's fully transferred. - file.set_len(self.total_len)?; - - file + let path: &Path = self.fname.as_ref(); + let mut out_file = if !self.file_created { + if let Some(dir) = path.parent() { + std::fs::create_dir_all(dir)?; } - Some(f) => f, + let file = File::create(path)?; + + // Resize the file to its final size already: + // * So that the file system can do a better job of allocating + // a single extent for it, and it doesn't have to fragment + // the file. + // * If we run out of space, we learn about that before we waste + // time on the transfer (although then maybe we should do it + // before we receive a chunk after all?). + // This can make debugging a bit harder, because when you look + // at just the file size you might think it's fully transferred. + file.set_len(self.total_len)?; + + self.file_created = true; + + file + } else { + File::options().append(true).open(path)? }; self.pending.insert(chunk.offset, chunk); @@ -488,7 +503,7 @@ impl FileReceiver { self.offset += chunk.data.len() as u64; } - self.out_file = Some(out_file); + //self.out_file = Some(out_file); Ok(()) } } @@ -571,7 +586,7 @@ fn main_recv( // Read a chunk header. If we hit EOF, that is not an error, it // means that the sender has nothing more to send so we can just // exit here. - let mut buf = [0u8; 14]; + let mut buf = [0u8; 20]; match stream.read_exact(&mut buf) { Ok(..) => {} Err(err) if err.kind() == ErrorKind::UnexpectedEof => break, @@ -637,10 +652,12 @@ fn main_recv( #[cfg(test)] mod tests { use super::*; + use std::env; use std::{ net::{IpAddr, Ipv4Addr}, thread, }; + use tempfile::TempDir; #[test] fn test_accepts_valid_protocol() { @@ -717,4 +734,41 @@ mod tests { ["0", "a/1", "a/b/2"].map(|f| base_path.join(f).to_str().unwrap().to_owned()) ); } + + #[test] + fn test_sends_20_thousand_files() { + let (events_tx, events_rx) = std::sync::mpsc::channel::(); + let cwd = env::current_dir().unwrap(); + thread::spawn(|| { + let td = TempDir::new_in(".").unwrap(); + let tmp_path = td.path().strip_prefix(cwd).unwrap(); + println!("{tmp_path:?}"); + let mut fnames = Vec::new(); + for i in 0..20_000 { + let path = tmp_path.join(i.to_string()); + fnames.push(path.clone().into_os_string().into_string().unwrap()); + let mut f = std::fs::File::create(path).unwrap(); + f.write(&[1, 2, 3]).unwrap(); + } + main_send( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + &fnames, + 1, + events_tx, + None, + ) + .unwrap(); + }); + match events_rx.recv().unwrap() { + SenderEvent::Listening(port) => { + main_recv( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), + "1", + WriteMode::Force, + 1, + ) + .unwrap(); + } + } + } } From 034809ad1a502dd14cdd1761788ea52539aacc43 Mon Sep 17 00:00:00 2001 From: David Ventura Date: Thu, 28 Nov 2024 14:38:55 +0100 Subject: [PATCH 2/7] Lower the file-id to 32bit --- src/main.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index 710da1e..65dd28d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -113,11 +113,11 @@ impl TransferPlan { /// The index of a file in the transfer plan. #[derive(BorshDeserialize, BorshSerialize, Copy, Clone, Debug, Eq, Hash, PartialEq)] -struct FileId(u64); +struct FileId(u32); impl FileId { fn from_usize(i: usize) -> FileId { - assert!(i < u64::MAX as usize, "Can transfer at most 2^64 files."); + assert!(i < u32::MAX as usize, "Can transfer at most 2^32 files."); FileId(i as _) } } @@ -205,7 +205,7 @@ enum SendResult { /// Metadata about a chunk of data that follows. /// -/// The Borsh-generated representation of this is zero-overhead (20 bytes). +/// The Borsh-generated representation of this is zero-overhead (16 bytes). #[derive(BorshDeserialize, BorshSerialize, Debug)] struct ChunkHeader { /// Which file is the chunk from? @@ -219,8 +219,8 @@ struct ChunkHeader { } impl ChunkHeader { - fn to_bytes(&self) -> [u8; 20] { - let mut buffer = [0_u8; 20]; + fn to_bytes(&self) -> [u8; 16] { + let mut buffer = [0_u8; 16]; let mut cursor = std::io::Cursor::new(&mut buffer[..]); self.serialize(&mut cursor) .expect("Writing to memory never fails."); @@ -503,7 +503,6 @@ impl FileReceiver { self.offset += chunk.data.len() as u64; } - //self.out_file = Some(out_file); Ok(()) } } @@ -586,7 +585,7 @@ fn main_recv( // Read a chunk header. If we hit EOF, that is not an error, it // means that the sender has nothing more to send so we can just // exit here. - let mut buf = [0u8; 20]; + let mut buf = [0u8; 16]; match stream.read_exact(&mut buf) { Ok(..) => {} Err(err) if err.kind() == ErrorKind::UnexpectedEof => break, From 76156abacc6858f8599f85b1ea64f10d4be67167 Mon Sep 17 00:00:00 2001 From: David Ventura Date: Thu, 28 Nov 2024 15:46:34 +0100 Subject: [PATCH 3/7] Open files only once, instead of per chunk --- Cargo.lock | 61 ++++++++++++++++++++++++++++ Cargo.toml | 1 + src/main.rs | 114 +++++++++++++++++++++++++++++++--------------------- 3 files changed, 131 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 506173a..e943159 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + [[package]] name = "bitflags" version = "2.6.0" @@ -72,6 +78,7 @@ version = "1.0.0" dependencies = [ "borsh", "libc", + "parking_lot", "tempfile", "walkdir", ] @@ -104,6 +111,16 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "memchr" version = "2.7.1" @@ -116,6 +133,29 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "proc-macro-crate" version = "3.1.0" @@ -166,6 +206,15 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags", +] + [[package]] name = "rustix" version = "0.38.34" @@ -188,6 +237,18 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "syn" version = "2.0.49" diff --git a/Cargo.toml b/Cargo.toml index 58b59f5..af1c76f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ publish = false libc = "0.2.153" borsh = { version = "1.3.1", features = ["derive"] } walkdir = "2.5.0" +parking_lot = "0.12.3" [profile.dev] panic = "abort" diff --git a/src/main.rs b/src/main.rs index 65dd28d..6bedb02 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,7 +14,6 @@ use std::net::{SocketAddr, TcpStream}; use std::os::fd::AsRawFd; use std::path::{Path, PathBuf}; use std::str::FromStr; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{mpsc, Arc, Mutex}; use std::time::Instant; use walkdir::WalkDir; @@ -104,7 +103,7 @@ impl TransferPlan { fn assert_paths_relative(&self) { for file in &self.files { assert!( - !file.name.starts_with("/"), + !file.name.starts_with('/'), "Transferring files with an absolute path name is not allowed.", ); } @@ -190,11 +189,16 @@ fn print_progress(offset: u64, len: u64, start_time: Instant) { ); } +enum SendStateInner { + Pending { fname: PathBuf }, + InProgress { file: File, offset: u64 }, + Done, +} + struct SendState { id: FileId, len: u64, - offset: AtomicU64, - in_file_path: PathBuf, + state: parking_lot::Mutex, } enum SendResult { @@ -235,20 +239,42 @@ impl SendState { // However, this now introduces the possibility of files getting deleted between // getting listed and their turn for transfer. // A vanishing file is expected, it is not a transfer-terminating event. - let offset = self.offset.fetch_add(MAX_CHUNK_LEN, Ordering::SeqCst); + let mut state = self.state.lock(); + let (offset, in_fd) = match *state { + SendStateInner::Pending { ref fname } => { + let res = std::fs::File::open(fname); + if let Err(ref e) = res { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(SendResult::FileVanished); + } + } + let res = res?; + let fd = res.as_raw_fd(); + *state = SendStateInner::InProgress { + file: res, + offset: 0, + }; + (0, fd) + } + SendStateInner::Done => { + return Ok(SendResult::Done); + } + SendStateInner::InProgress { + ref file, + mut offset, + } => { + offset += MAX_CHUNK_LEN; + (offset, file.as_raw_fd()) + } + }; + let end = self.len.min(offset + MAX_CHUNK_LEN); if offset >= self.len || offset >= end { + *state = SendStateInner::Done; return Ok(SendResult::Done); } - let res = std::fs::File::open(&self.in_file_path); - if let Err(e) = std::fs::File::open(&self.in_file_path) { - if e.kind() == std::io::ErrorKind::NotFound { - return Ok(SendResult::FileVanished); - } - } - let in_file = res?; print_progress(offset, self.len, start_time); let header = ChunkHeader { @@ -265,9 +291,8 @@ impl SendState { let end = end as i64; let mut off = offset as i64; - let out_fd = out.as_raw_fd(); - let in_fd = in_file.as_raw_fd(); let mut total_written: u64 = 0; + let out_fd = out.as_raw_fd(); while off < end { let count = (end - off) as usize; // Note, sendfile advances the offset by the number of bytes written @@ -321,8 +346,7 @@ fn main_send( let mut send_states = Vec::new(); for (i, fname) in all_filenames_from_path_names(fnames)?.iter().enumerate() { - let file = std::fs::File::open(&fname)?; - let metadata = file.metadata()?; + let metadata = std::fs::metadata(fname)?; let file_plan = FilePlan { name: fname.clone(), len: metadata.len(), @@ -330,8 +354,9 @@ fn main_send( let state = SendState { id: FileId::from_usize(i), len: metadata.len(), - offset: AtomicU64::new(0), - in_file_path: fname.into(), + state: parking_lot::Mutex::new(SendStateInner::Pending { + fname: fname.into(), + }), }; plan.files.push(file_plan); send_states.push(state); @@ -376,7 +401,7 @@ fn main_send( // Stop the listener, don't send anything over our new connection. let is_done = state_arc .iter() - .all(|f| f.offset.load(Ordering::SeqCst) >= f.len); + .all(|f| matches!(*f.state.lock(), SendStateInner::Done)); if is_done { break; } @@ -403,7 +428,7 @@ fn main_send( } match file.send_one(start_time, &mut stream) { Ok(SendResult::FileVanished) => { - println!("File {:?} vanished", file.in_file_path); + println!("File {:?} vanished", file.id); } Ok(SendResult::Progress { bytes_sent: bytes_written, @@ -446,7 +471,7 @@ struct FileReceiver { /// We don’t open the file immediately so we don’t create a zero-sized file /// when a transfer fails. We only open the file after we have at least some /// data for it. - file_created: bool, + out_file: Option, /// Chunks that we cannot yet write because a preceding chunk has not yet arrived. pending: HashMap, @@ -462,7 +487,7 @@ impl FileReceiver { fn new(plan: FilePlan) -> FileReceiver { FileReceiver { fname: plan.name, - file_created: false, + out_file: None, pending: HashMap::new(), offset: 0, total_len: plan.len, @@ -471,29 +496,28 @@ impl FileReceiver { /// Write or buffer a chunk that we received for this file. fn handle_chunk(&mut self, chunk: Chunk) -> Result<()> { - let path: &Path = self.fname.as_ref(); - let mut out_file = if !self.file_created { - if let Some(dir) = path.parent() { - std::fs::create_dir_all(dir)?; + let mut out_file = match self.out_file.take() { + None => { + let path: &Path = self.fname.as_ref(); + if let Some(dir) = path.parent() { + std::fs::create_dir_all(dir)?; + } + let file = File::create(path)?; + + // Resize the file to its final size already: + // * So that the file system can do a better job of allocating + // a single extent for it, and it doesn't have to fragment + // the file. + // * If we run out of space, we learn about that before we waste + // time on the transfer (although then maybe we should do it + // before we receive a chunk after all?). + // This can make debugging a bit harder, because when you look + // at just the file size you might think it's fully transferred. + file.set_len(self.total_len)?; + + file } - let file = File::create(path)?; - - // Resize the file to its final size already: - // * So that the file system can do a better job of allocating - // a single extent for it, and it doesn't have to fragment - // the file. - // * If we run out of space, we learn about that before we waste - // time on the transfer (although then maybe we should do it - // before we receive a chunk after all?). - // This can make debugging a bit harder, because when you look - // at just the file size you might think it's fully transferred. - file.set_len(self.total_len)?; - - self.file_created = true; - - file - } else { - File::options().append(true).open(path)? + Some(f) => f, }; self.pending.insert(chunk.offset, chunk); @@ -737,11 +761,11 @@ mod tests { #[test] fn test_sends_20_thousand_files() { let (events_tx, events_rx) = std::sync::mpsc::channel::(); + env::set_current_dir("/tmp/").unwrap(); let cwd = env::current_dir().unwrap(); thread::spawn(|| { let td = TempDir::new_in(".").unwrap(); let tmp_path = td.path().strip_prefix(cwd).unwrap(); - println!("{tmp_path:?}"); let mut fnames = Vec::new(); for i in 0..20_000 { let path = tmp_path.join(i.to_string()); From e96a31517a45b4d48f1d64a7a2580577811e5dbe Mon Sep 17 00:00:00 2001 From: David Ventura Date: Tue, 7 Jan 2025 12:02:13 +0100 Subject: [PATCH 4/7] Keep the receiver file open if there is pending data --- src/main.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main.rs b/src/main.rs index 6bedb02..aa47af2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -527,6 +527,11 @@ impl FileReceiver { self.offset += chunk.data.len() as u64; } + if self.offset < self.total_len { + self.out_file = Some(out_file); + // Only keep the file open as long as there is more to write + } + Ok(()) } } From 6fb78506857c43c80daf0231adcdc1df442cd7c0 Mon Sep 17 00:00:00 2001 From: David Ventura Date: Tue, 7 Jan 2025 12:07:05 +0100 Subject: [PATCH 5/7] drop lock while transmitting --- src/main.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main.rs b/src/main.rs index aa47af2..ec6e5c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -274,6 +274,9 @@ impl SendState { *state = SendStateInner::Done; return Ok(SendResult::Done); } + // Drop the lock while sending -- so that multiple threads can + // send data from the same file at once + std::mem::drop(state); print_progress(offset, self.len, start_time); From 5265d1dd834e36d057b3693ea010cbb1bb31a3c9 Mon Sep 17 00:00:00 2001 From: David Ventura Date: Tue, 7 Jan 2025 12:07:56 +0100 Subject: [PATCH 6/7] Tidy up handling of vanished files --- src/main.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index ec6e5c5..84562ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -242,13 +242,13 @@ impl SendState { let mut state = self.state.lock(); let (offset, in_fd) = match *state { SendStateInner::Pending { ref fname } => { - let res = std::fs::File::open(fname); - if let Err(ref e) = res { - if e.kind() == std::io::ErrorKind::NotFound { - return Ok(SendResult::FileVanished); + let res = match std::fs::File::open(fname) { + Ok(f) => f, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Ok(SendResult::FileVanished) } - } - let res = res?; + Err(e) => return Err(e), + }; let fd = res.as_raw_fd(); *state = SendStateInner::InProgress { file: res, From 22de9c7717fdfa5186861ae7ab7250f26d37aa7b Mon Sep 17 00:00:00 2001 From: David Ventura Date: Tue, 7 Jan 2025 13:19:57 +0100 Subject: [PATCH 7/7] add test for sending large file & fix offset calculation on sender --- src/main.rs | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 84562ee..dc7dd87 100644 --- a/src/main.rs +++ b/src/main.rs @@ -261,10 +261,10 @@ impl SendState { } SendStateInner::InProgress { ref file, - mut offset, + ref mut offset, } => { - offset += MAX_CHUNK_LEN; - (offset, file.as_raw_fd()) + *offset += MAX_CHUNK_LEN; + (*offset, file.as_raw_fd()) } }; @@ -766,6 +766,44 @@ mod tests { ); } + #[test] + fn test_sends_large_file() { + let (events_tx, events_rx) = std::sync::mpsc::channel::(); + env::set_current_dir("/tmp/").unwrap(); + let cwd = env::current_dir().unwrap(); + thread::spawn(|| { + let td = TempDir::new_in(".").unwrap(); + let tmp_path = td.path().strip_prefix(cwd).unwrap(); + let path = tmp_path.join("large"); + let fnames = &[path.clone().into_os_string().into_string().unwrap()]; + + { + let mut f = std::fs::File::create(path).unwrap(); + f.write_all(&vec![0u8; MAX_CHUNK_LEN as usize * 100]) + .unwrap(); + } + + main_send( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + fnames, + 1, + events_tx, + None, + ) + .unwrap(); + }); + match events_rx.recv().unwrap() { + SenderEvent::Listening(port) => { + main_recv( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), + "1", + WriteMode::Force, + 1, + ) + .unwrap(); + } + } + } #[test] fn test_sends_20_thousand_files() { let (events_tx, events_rx) = std::sync::mpsc::channel::();