Skip to content

Commit

Permalink
Open files only once, instead of per chunk
Browse files Browse the repository at this point in the history
  • Loading branch information
DavidVentura committed Nov 28, 2024
1 parent 034809a commit 40320a4
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 44 deletions.
61 changes: 61 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ publish = false
libc = "0.2.153"
borsh = { version = "1.3.1", features = ["derive"] }
walkdir = "2.5.0"
parking_lot = "0.12.3"

[profile.dev]
panic = "abort"
Expand Down
120 changes: 76 additions & 44 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::net::{SocketAddr, TcpStream};
use std::os::fd::AsRawFd;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::time::Instant;
use walkdir::WalkDir;
Expand Down Expand Up @@ -104,7 +103,7 @@ impl TransferPlan {
fn assert_paths_relative(&self) {
for file in &self.files {
assert!(
!file.name.starts_with("/"),
!file.name.starts_with('/'),
"Transferring files with an absolute path name is not allowed.",
);
}
Expand Down Expand Up @@ -190,11 +189,16 @@ fn print_progress(offset: u64, len: u64, start_time: Instant) {
);
}

enum SendStateInner {
Pending { fname: PathBuf },
InProgress { file: File, offset: u64 },
Done,
}

struct SendState {
id: FileId,
len: u64,
offset: AtomicU64,
in_file_path: PathBuf,
state: parking_lot::Mutex<SendStateInner>,
}

enum SendResult {
Expand Down Expand Up @@ -235,20 +239,40 @@ impl SendState {
// However, this now introduces the possibility of files getting deleted between
// getting listed and their turn for transfer.
// A vanishing file is expected, it is not a transfer-terminating event.
let offset = self.offset.fetch_add(MAX_CHUNK_LEN, Ordering::SeqCst);
let mut state = self.state.lock();
let offset = match *state {
SendStateInner::Pending { ref fname } => {
let res = std::fs::File::open(fname);
if let Err(ref e) = res {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(SendResult::FileVanished);
}
}
*state = SendStateInner::InProgress {
file: res?,
offset: 0,
};
0
}
SendStateInner::Done => {
return Ok(SendResult::Done);
}
SendStateInner::InProgress {
file: _,
mut offset,
} => {
offset += MAX_CHUNK_LEN;
offset
}
};

let end = self.len.min(offset + MAX_CHUNK_LEN);

if offset >= self.len || offset >= end {
*state = SendStateInner::Done;
return Ok(SendResult::Done);
}

let res = std::fs::File::open(&self.in_file_path);
if let Err(e) = std::fs::File::open(&self.in_file_path) {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(SendResult::FileVanished);
}
}
let in_file = res?;
print_progress(offset, self.len, start_time);

let header = ChunkHeader {
Expand All @@ -263,10 +287,19 @@ impl SendState {
borsh::to_vec(&header)?.len()
);

let in_fd = if let SendStateInner::InProgress {
ref file,
offset: _,
} = *state
{
file.as_raw_fd()
} else {
unreachable!();
};

let end = end as i64;
let mut off = offset as i64;
let out_fd = out.as_raw_fd();
let in_fd = in_file.as_raw_fd();
let mut total_written: u64 = 0;
while off < end {
let count = (end - off) as usize;
Expand Down Expand Up @@ -321,17 +354,17 @@ fn main_send(
let mut send_states = Vec::new();

for (i, fname) in all_filenames_from_path_names(fnames)?.iter().enumerate() {
let file = std::fs::File::open(&fname)?;
let metadata = file.metadata()?;
let metadata = std::fs::metadata(fname)?;
let file_plan = FilePlan {
name: fname.clone(),
len: metadata.len(),
};
let state = SendState {
id: FileId::from_usize(i),
len: metadata.len(),
offset: AtomicU64::new(0),
in_file_path: fname.into(),
state: parking_lot::Mutex::new(SendStateInner::Pending {
fname: fname.into(),
}),
};
plan.files.push(file_plan);
send_states.push(state);
Expand Down Expand Up @@ -376,7 +409,7 @@ fn main_send(
// Stop the listener, don't send anything over our new connection.
let is_done = state_arc
.iter()
.all(|f| f.offset.load(Ordering::SeqCst) >= f.len);
.all(|f| matches!(*f.state.lock(), SendStateInner::Done));
if is_done {
break;
}
Expand All @@ -403,7 +436,7 @@ fn main_send(
}
match file.send_one(start_time, &mut stream) {
Ok(SendResult::FileVanished) => {
println!("File {:?} vanished", file.in_file_path);
println!("File {:?} vanished", file.id);
}
Ok(SendResult::Progress {
bytes_sent: bytes_written,
Expand Down Expand Up @@ -446,7 +479,7 @@ struct FileReceiver {
/// We don’t open the file immediately so we don’t create a zero-sized file
/// when a transfer fails. We only open the file after we have at least some
/// data for it.
file_created: bool,
out_file: Option<File>,

/// Chunks that we cannot yet write because a preceding chunk has not yet arrived.
pending: HashMap<u64, Chunk>,
Expand All @@ -462,7 +495,7 @@ impl FileReceiver {
fn new(plan: FilePlan) -> FileReceiver {
FileReceiver {
fname: plan.name,
file_created: false,
out_file: None,
pending: HashMap::new(),
offset: 0,
total_len: plan.len,
Expand All @@ -471,29 +504,28 @@ impl FileReceiver {

/// Write or buffer a chunk that we received for this file.
fn handle_chunk(&mut self, chunk: Chunk) -> Result<()> {
let path: &Path = self.fname.as_ref();
let mut out_file = if !self.file_created {
if let Some(dir) = path.parent() {
std::fs::create_dir_all(dir)?;
let mut out_file = match self.out_file.take() {
None => {
let path: &Path = self.fname.as_ref();
if let Some(dir) = path.parent() {
std::fs::create_dir_all(dir)?;
}
let file = File::create(path)?;

// Resize the file to its final size already:
// * So that the file system can do a better job of allocating
// a single extent for it, and it doesn't have to fragment
// the file.
// * If we run out of space, we learn about that before we waste
// time on the transfer (although then maybe we should do it
// before we receive a chunk after all?).
// This can make debugging a bit harder, because when you look
// at just the file size you might think it's fully transferred.
file.set_len(self.total_len)?;

file
}
let file = File::create(path)?;

// Resize the file to its final size already:
// * So that the file system can do a better job of allocating
// a single extent for it, and it doesn't have to fragment
// the file.
// * If we run out of space, we learn about that before we waste
// time on the transfer (although then maybe we should do it
// before we receive a chunk after all?).
// This can make debugging a bit harder, because when you look
// at just the file size you might think it's fully transferred.
file.set_len(self.total_len)?;

self.file_created = true;

file
} else {
File::options().append(true).open(path)?
Some(f) => f,
};
self.pending.insert(chunk.offset, chunk);

Expand Down Expand Up @@ -737,11 +769,11 @@ mod tests {
#[test]
fn test_sends_20_thousand_files() {
let (events_tx, events_rx) = std::sync::mpsc::channel::<SenderEvent>();
env::set_current_dir("/tmp/").unwrap();
let cwd = env::current_dir().unwrap();
thread::spawn(|| {
let td = TempDir::new_in(".").unwrap();
let tmp_path = td.path().strip_prefix(cwd).unwrap();
println!("{tmp_path:?}");
let mut fnames = Vec::new();
for i in 0..20_000 {
let path = tmp_path.join(i.to_string());
Expand Down

0 comments on commit 40320a4

Please sign in to comment.