diff --git a/src/main.rs b/src/main.rs index ed841b0..710da1e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,7 +12,7 @@ use std::fs::File; use std::io::{Error, ErrorKind, Read, Result, Write}; use std::net::{SocketAddr, TcpStream}; use std::os::fd::AsRawFd; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{mpsc, Arc, Mutex}; @@ -56,7 +56,7 @@ Receiver options: to 32 is probably overkill. "; -const WIRE_PROTO_VERSION: u16 = 1; +const WIRE_PROTO_VERSION: u16 = 2; const MAX_CHUNK_LEN: u64 = 4096 * 64; /// Metadata about all the files we want to transfer. @@ -113,11 +113,11 @@ impl TransferPlan { /// The index of a file in the transfer plan. #[derive(BorshDeserialize, BorshSerialize, Copy, Clone, Debug, Eq, Hash, PartialEq)] -struct FileId(u16); +struct FileId(u64); impl FileId { fn from_usize(i: usize) -> FileId { - assert!(i < u16::MAX as usize, "Can transfer at most 2^16 files."); + assert!(i < u64::MAX as usize, "Can transfer at most 2^64 files."); FileId(i as _) } } @@ -194,17 +194,18 @@ struct SendState { id: FileId, len: u64, offset: AtomicU64, - in_file: File, + in_file_path: PathBuf, } enum SendResult { Done, + FileVanished, Progress { bytes_sent: u64 }, } /// Metadata about a chunk of data that follows. /// -/// The Borsh-generated representation of this is zero-overhead (14 bytes). +/// The Borsh-generated representation of this is zero-overhead (20 bytes). #[derive(BorshDeserialize, BorshSerialize, Debug)] struct ChunkHeader { /// Which file is the chunk from? @@ -218,8 +219,8 @@ struct ChunkHeader { } impl ChunkHeader { - fn to_bytes(&self) -> [u8; 14] { - let mut buffer = [0_u8; 14]; + fn to_bytes(&self) -> [u8; 20] { + let mut buffer = [0_u8; 20]; let mut cursor = std::io::Cursor::new(&mut buffer[..]); self.serialize(&mut cursor) .expect("Writing to memory never fails."); @@ -229,6 +230,11 @@ impl ChunkHeader { impl SendState { pub fn send_one(&self, start_time: Instant, out: &mut TcpStream) -> Result { + // By deferring the opening of the file descriptor to this point, + // we effectively limit the amount of open files to the amount of send threads. + // However, this now introduces the possibility of files getting deleted between + // getting listed and their turn for transfer. + // A vanishing file is expected, it is not a transfer-terminating event. let offset = self.offset.fetch_add(MAX_CHUNK_LEN, Ordering::SeqCst); let end = self.len.min(offset + MAX_CHUNK_LEN); @@ -236,6 +242,13 @@ impl SendState { return Ok(SendResult::Done); } + let res = std::fs::File::open(&self.in_file_path); + if let Err(e) = std::fs::File::open(&self.in_file_path) { + if e.kind() == std::io::ErrorKind::NotFound { + return Ok(SendResult::FileVanished); + } + } + let in_file = res?; print_progress(offset, self.len, start_time); let header = ChunkHeader { @@ -253,7 +266,7 @@ impl SendState { let end = end as i64; let mut off = offset as i64; let out_fd = out.as_raw_fd(); - let in_fd = self.in_file.as_raw_fd(); + let in_fd = in_file.as_raw_fd(); let mut total_written: u64 = 0; while off < end { let count = (end - off) as usize; @@ -318,7 +331,7 @@ fn main_send( id: FileId::from_usize(i), len: metadata.len(), offset: AtomicU64::new(0), - in_file: file, + in_file_path: fname.into(), }; plan.files.push(file_plan); send_states.push(state); @@ -389,6 +402,9 @@ fn main_send( std::thread::sleep(to_wait.unwrap()); } match file.send_one(start_time, &mut stream) { + Ok(SendResult::FileVanished) => { + println!("File {:?} vanished", file.in_file_path); + } Ok(SendResult::Progress { bytes_sent: bytes_written, }) => { @@ -427,12 +443,10 @@ struct Chunk { struct FileReceiver { fname: String, - /// The file we’re writing to, if we have started writing. - /// /// We don’t open the file immediately so we don’t create a zero-sized file /// when a transfer fails. We only open the file after we have at least some /// data for it. - out_file: Option, + file_created: bool, /// Chunks that we cannot yet write because a preceding chunk has not yet arrived. pending: HashMap, @@ -448,7 +462,7 @@ impl FileReceiver { fn new(plan: FilePlan) -> FileReceiver { FileReceiver { fname: plan.name, - out_file: None, + file_created: false, pending: HashMap::new(), offset: 0, total_len: plan.len, @@ -457,28 +471,29 @@ impl FileReceiver { /// Write or buffer a chunk that we received for this file. fn handle_chunk(&mut self, chunk: Chunk) -> Result<()> { - let mut out_file = match self.out_file.take() { - None => { - let path: &Path = self.fname.as_ref(); - if let Some(dir) = path.parent() { - std::fs::create_dir_all(dir)?; - } - let file = File::create(path)?; - - // Resize the file to its final size already: - // * So that the file system can do a better job of allocating - // a single extent for it, and it doesn't have to fragment - // the file. - // * If we run out of space, we learn about that before we waste - // time on the transfer (although then maybe we should do it - // before we receive a chunk after all?). - // This can make debugging a bit harder, because when you look - // at just the file size you might think it's fully transferred. - file.set_len(self.total_len)?; - - file + let path: &Path = self.fname.as_ref(); + let mut out_file = if !self.file_created { + if let Some(dir) = path.parent() { + std::fs::create_dir_all(dir)?; } - Some(f) => f, + let file = File::create(path)?; + + // Resize the file to its final size already: + // * So that the file system can do a better job of allocating + // a single extent for it, and it doesn't have to fragment + // the file. + // * If we run out of space, we learn about that before we waste + // time on the transfer (although then maybe we should do it + // before we receive a chunk after all?). + // This can make debugging a bit harder, because when you look + // at just the file size you might think it's fully transferred. + file.set_len(self.total_len)?; + + self.file_created = true; + + file + } else { + File::options().append(true).open(path)? }; self.pending.insert(chunk.offset, chunk); @@ -488,7 +503,7 @@ impl FileReceiver { self.offset += chunk.data.len() as u64; } - self.out_file = Some(out_file); + //self.out_file = Some(out_file); Ok(()) } } @@ -571,7 +586,7 @@ fn main_recv( // Read a chunk header. If we hit EOF, that is not an error, it // means that the sender has nothing more to send so we can just // exit here. - let mut buf = [0u8; 14]; + let mut buf = [0u8; 20]; match stream.read_exact(&mut buf) { Ok(..) => {} Err(err) if err.kind() == ErrorKind::UnexpectedEof => break, @@ -637,10 +652,12 @@ fn main_recv( #[cfg(test)] mod tests { use super::*; + use std::env; use std::{ net::{IpAddr, Ipv4Addr}, thread, }; + use tempfile::TempDir; #[test] fn test_accepts_valid_protocol() { @@ -717,4 +734,41 @@ mod tests { ["0", "a/1", "a/b/2"].map(|f| base_path.join(f).to_str().unwrap().to_owned()) ); } + + #[test] + fn test_sends_20_thousand_files() { + let (events_tx, events_rx) = std::sync::mpsc::channel::(); + let cwd = env::current_dir().unwrap(); + thread::spawn(|| { + let td = TempDir::new_in(".").unwrap(); + let tmp_path = td.path().strip_prefix(cwd).unwrap(); + println!("{tmp_path:?}"); + let mut fnames = Vec::new(); + for i in 0..20_000 { + let path = tmp_path.join(i.to_string()); + fnames.push(path.clone().into_os_string().into_string().unwrap()); + let mut f = std::fs::File::create(path).unwrap(); + f.write(&[1, 2, 3]).unwrap(); + } + main_send( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + &fnames, + 1, + events_tx, + None, + ) + .unwrap(); + }); + match events_rx.recv().unwrap() { + SenderEvent::Listening(port) => { + main_recv( + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port), + "1", + WriteMode::Force, + 1, + ) + .unwrap(); + } + } + } }