Skip to content

Commit

Permalink
Remove max file limit on transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidVentura committed Nov 26, 2024
1 parent ad9023d commit d932b8a
Showing 1 changed file with 61 additions and 7 deletions.
68 changes: 61 additions & 7 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::fs::File;
use std::io::{Error, ErrorKind, Read, Result, Write};
use std::net::{SocketAddr, TcpStream};
use std::os::fd::AsRawFd;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{mpsc, Arc, Mutex};
Expand Down Expand Up @@ -56,7 +56,7 @@ Receiver options:
to 32 is probably overkill.
";

const WIRE_PROTO_VERSION: u16 = 1;
const WIRE_PROTO_VERSION: u16 = 2;
const MAX_CHUNK_LEN: u64 = 4096 * 64;

/// Metadata about all the files we want to transfer.
Expand Down Expand Up @@ -113,11 +113,11 @@ impl TransferPlan {

/// The index of a file in the transfer plan.
#[derive(BorshDeserialize, BorshSerialize, Copy, Clone, Debug, Eq, Hash, PartialEq)]
struct FileId(u16);
struct FileId(u64);

impl FileId {
fn from_usize(i: usize) -> FileId {
assert!(i < u16::MAX as usize, "Can transfer at most 2^16 files.");
assert!(i < u64::MAX as usize, "Can transfer at most 2^64 files.");
FileId(i as _)
}
}
Expand Down Expand Up @@ -194,11 +194,12 @@ struct SendState {
id: FileId,
len: u64,
offset: AtomicU64,
in_file: File,
in_file_path: PathBuf,
}

enum SendResult {
Done,
FileVanished,
Progress { bytes_sent: u64 },
}

Expand Down Expand Up @@ -229,13 +230,25 @@ impl ChunkHeader {

impl SendState {
pub fn send_one(&self, start_time: Instant, out: &mut TcpStream) -> Result<SendResult> {
// By deferring the opening of the file descriptor to this point,
// we effectively limit the amount of open files to the amount of send threads.
// However, this now introduces the possibility of files getting deleted between
// getting listed and their turn for transfer.
// A vanishing file is expected, it is not a transfer-terminating event.
let offset = self.offset.fetch_add(MAX_CHUNK_LEN, Ordering::SeqCst);
let end = self.len.min(offset + MAX_CHUNK_LEN);

if offset >= self.len || offset >= end {
return Ok(SendResult::Done);
}

let res = std::fs::File::open(&self.in_file_path);
if let Err(e) = std::fs::File::open(&self.in_file_path) {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(SendResult::FileVanished);
}
}
let in_file = res?;
print_progress(offset, self.len, start_time);

let header = ChunkHeader {
Expand All @@ -253,7 +266,7 @@ impl SendState {
let end = end as i64;
let mut off = offset as i64;
let out_fd = out.as_raw_fd();
let in_fd = self.in_file.as_raw_fd();
let in_fd = in_file.as_raw_fd();
let mut total_written: u64 = 0;
while off < end {
let count = (end - off) as usize;
Expand Down Expand Up @@ -318,7 +331,7 @@ fn main_send(
id: FileId::from_usize(i),
len: metadata.len(),
offset: AtomicU64::new(0),
in_file: file,
in_file_path: fname.into(),
};
plan.files.push(file_plan);
send_states.push(state);
Expand Down Expand Up @@ -389,6 +402,9 @@ fn main_send(
std::thread::sleep(to_wait.unwrap());
}
match file.send_one(start_time, &mut stream) {
Ok(SendResult::FileVanished) => {
println!("File {:?} vanished", file.in_file_path);
}
Ok(SendResult::Progress {
bytes_sent: bytes_written,
}) => {
Expand Down Expand Up @@ -637,10 +653,12 @@ fn main_recv(
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use std::{
net::{IpAddr, Ipv4Addr},
thread,
};
use tempfile::TempDir;

#[test]
fn test_accepts_valid_protocol() {
Expand Down Expand Up @@ -717,4 +735,40 @@ mod tests {
["0", "a/1", "a/b/2"].map(|f| base_path.join(f).to_str().unwrap().to_owned())
);
}

#[test]
fn test_sends_20_thousand_files() {
let (events_tx, events_rx) = std::sync::mpsc::channel::<SenderEvent>();
let cwd = env::current_dir().unwrap();
thread::spawn(|| {
let td = TempDir::new_in(".").unwrap();
let tmp_path = td.path().strip_prefix(cwd).unwrap();
println!("{tmp_path:?}");
let mut fnames = Vec::new();
for i in 0..20_000 {
let path = tmp_path.join(i.to_string());
fnames.push(path.clone().into_os_string().into_string().unwrap());
std::fs::File::create(path).unwrap();
}
main_send(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0),
&fnames,
1,
events_tx,
None,
)
.unwrap();
});
match events_rx.recv().unwrap() {
SenderEvent::Listening(port) => {
main_recv(
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port),
"1",
WriteMode::Force,
1,
)
.unwrap();
}
}
}
}

0 comments on commit d932b8a

Please sign in to comment.