Skip to content

Commit

Permalink
Remove max file limit on transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidVentura committed Nov 27, 2024
1 parent ad9023d commit 066d081
Showing 1 changed file with 91 additions and 37 deletions.
128 changes: 91 additions & 37 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::fs::File;
use std::io::{Error, ErrorKind, Read, Result, Write};
use std::net::{SocketAddr, TcpStream};
use std::os::fd::AsRawFd;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{mpsc, Arc, Mutex};
Expand Down Expand Up @@ -56,7 +56,7 @@ Receiver options:
to 32 is probably overkill.
";

const WIRE_PROTO_VERSION: u16 = 1;
const WIRE_PROTO_VERSION: u16 = 2;
const MAX_CHUNK_LEN: u64 = 4096 * 64;

/// Metadata about all the files we want to transfer.
Expand Down Expand Up @@ -113,11 +113,11 @@ impl TransferPlan {

/// The index of a file in the transfer plan.
#[derive(BorshDeserialize, BorshSerialize, Copy, Clone, Debug, Eq, Hash, PartialEq)]
struct FileId(u16);
struct FileId(u64);

impl FileId {
fn from_usize(i: usize) -> FileId {
assert!(i < u16::MAX as usize, "Can transfer at most 2^16 files.");
assert!(i < u64::MAX as usize, "Can transfer at most 2^64 files.");
FileId(i as _)
}
}
Expand Down Expand Up @@ -194,17 +194,18 @@ struct SendState {
id: FileId,
len: u64,
offset: AtomicU64,
in_file: File,
in_file_path: PathBuf,
}

enum SendResult {
Done,
FileVanished,
Progress { bytes_sent: u64 },
}

/// Metadata about a chunk of data that follows.
///
/// The Borsh-generated representation of this is zero-overhead (14 bytes).
/// The Borsh-generated representation of this is zero-overhead (20 bytes).
#[derive(BorshDeserialize, BorshSerialize, Debug)]
struct ChunkHeader {
/// Which file is the chunk from?
Expand All @@ -218,8 +219,8 @@ struct ChunkHeader {
}

impl ChunkHeader {
fn to_bytes(&self) -> [u8; 14] {
let mut buffer = [0_u8; 14];
fn to_bytes(&self) -> [u8; 20] {
let mut buffer = [0_u8; 20];
let mut cursor = std::io::Cursor::new(&mut buffer[..]);
self.serialize(&mut cursor)
.expect("Writing to memory never fails.");
Expand All @@ -229,13 +230,25 @@ impl ChunkHeader {

impl SendState {
pub fn send_one(&self, start_time: Instant, out: &mut TcpStream) -> Result<SendResult> {
// By deferring the opening of the file descriptor to this point,
// we effectively limit the amount of open files to the amount of send threads.
// However, this now introduces the possibility of files getting deleted between
// getting listed and their turn for transfer.
// A vanishing file is expected, it is not a transfer-terminating event.
let offset = self.offset.fetch_add(MAX_CHUNK_LEN, Ordering::SeqCst);
let end = self.len.min(offset + MAX_CHUNK_LEN);

if offset >= self.len || offset >= end {
return Ok(SendResult::Done);
}

let res = std::fs::File::open(&self.in_file_path);
if let Err(e) = std::fs::File::open(&self.in_file_path) {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(SendResult::FileVanished);
}
}
let in_file = res?;
print_progress(offset, self.len, start_time);

let header = ChunkHeader {
Expand All @@ -253,7 +266,7 @@ impl SendState {
let end = end as i64;
let mut off = offset as i64;
let out_fd = out.as_raw_fd();
let in_fd = self.in_file.as_raw_fd();
let in_fd = in_file.as_raw_fd();
let mut total_written: u64 = 0;
while off < end {
let count = (end - off) as usize;
Expand Down Expand Up @@ -318,7 +331,7 @@ fn main_send(
id: FileId::from_usize(i),
len: metadata.len(),
offset: AtomicU64::new(0),
in_file: file,
in_file_path: fname.into(),
};
plan.files.push(file_plan);
send_states.push(state);
Expand Down Expand Up @@ -389,6 +402,9 @@ fn main_send(
std::thread::sleep(to_wait.unwrap());
}
match file.send_one(start_time, &mut stream) {
Ok(SendResult::FileVanished) => {
println!("File {:?} vanished", file.in_file_path);
}
Ok(SendResult::Progress {
bytes_sent: bytes_written,
}) => {
Expand Down Expand Up @@ -427,12 +443,10 @@ struct Chunk {
struct FileReceiver {
fname: String,

/// The file we’re writing to, if we have started writing.
///
/// We don’t open the file immediately so we don’t create a zero-sized file
/// when a transfer fails. We only open the file after we have at least some
/// data for it.
out_file: Option<File>,
file_created: bool,

/// Chunks that we cannot yet write because a preceding chunk has not yet arrived.
pending: HashMap<u64, Chunk>,
Expand All @@ -448,7 +462,7 @@ impl FileReceiver {
fn new(plan: FilePlan) -> FileReceiver {
FileReceiver {
fname: plan.name,
out_file: None,
file_created: false,
pending: HashMap::new(),
offset: 0,
total_len: plan.len,
Expand All @@ -457,28 +471,29 @@ impl FileReceiver {

/// Write or buffer a chunk that we received for this file.
fn handle_chunk(&mut self, chunk: Chunk) -> Result<()> {
let mut out_file = match self.out_file.take() {
None => {
let path: &Path = self.fname.as_ref();
if let Some(dir) = path.parent() {
std::fs::create_dir_all(dir)?;
}
let file = File::create(path)?;

// Resize the file to its final size already:
// * So that the file system can do a better job of allocating
// a single extent for it, and it doesn't have to fragment
// the file.
// * If we run out of space, we learn about that before we waste
// time on the transfer (although then maybe we should do it
// before we receive a chunk after all?).
// This can make debugging a bit harder, because when you look
// at just the file size you might think it's fully transferred.
file.set_len(self.total_len)?;

file
let path: &Path = self.fname.as_ref();
let mut out_file = if !self.file_created {
if let Some(dir) = path.parent() {
std::fs::create_dir_all(dir)?;
}
Some(f) => f,
let file = File::create(path)?;

// Resize the file to its final size already:
// * So that the file system can do a better job of allocating
// a single extent for it, and it doesn't have to fragment
// the file.
// * If we run out of space, we learn about that before we waste
// time on the transfer (although then maybe we should do it
// before we receive a chunk after all?).
// This can make debugging a bit harder, because when you look
// at just the file size you might think it's fully transferred.
file.set_len(self.total_len)?;

self.file_created = true;

file
} else {
File::options().append(true).open(path)?
};
self.pending.insert(chunk.offset, chunk);

Expand All @@ -488,7 +503,7 @@ impl FileReceiver {
self.offset += chunk.data.len() as u64;
}

self.out_file = Some(out_file);
//self.out_file = Some(out_file);
Ok(())
}
}
Expand Down Expand Up @@ -571,7 +586,7 @@ fn main_recv(
// Read a chunk header. If we hit EOF, that is not an error, it
// means that the sender has nothing more to send so we can just
// exit here.
let mut buf = [0u8; 14];
let mut buf = [0u8; 20];
match stream.read_exact(&mut buf) {
Ok(..) => {}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => break,
Expand Down Expand Up @@ -637,10 +652,12 @@ fn main_recv(
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use std::{
net::{IpAddr, Ipv4Addr},
thread,
};
use tempfile::TempDir;

#[test]
fn test_accepts_valid_protocol() {
Expand Down Expand Up @@ -717,4 +734,41 @@ mod tests {
["0", "a/1", "a/b/2"].map(|f| base_path.join(f).to_str().unwrap().to_owned())
);
}

#[test]
fn test_sends_20_thousand_files() {
let (events_tx, events_rx) = std::sync::mpsc::channel::<SenderEvent>();
let cwd = env::current_dir().unwrap();
thread::spawn(|| {
let td = TempDir::new_in(".").unwrap();
let tmp_path = td.path().strip_prefix(cwd).unwrap();
println!("{tmp_path:?}");
let mut fnames = Vec::new();
for i in 0..20_000 {
let path = tmp_path.join(i.to_string());
fnames.push(path.clone().into_os_string().into_string().unwrap());
let mut f = std::fs::File::create(path).unwrap();
f.write(&[1, 2, 3]).unwrap();
}
main_send(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
&fnames,
1,
events_tx,
None,
)
.unwrap();
});
match events_rx.recv().unwrap() {
SenderEvent::Listening(port) => {
main_recv(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port),
"1",
WriteMode::Force,
1,
)
.unwrap();
}
}
}
}

0 comments on commit 066d081

Please sign in to comment.