From e2b8937bacca78aca7279e3871ec3f0344efb270 Mon Sep 17 00:00:00 2001 From: djugei Date: Wed, 13 Mar 2024 11:36:57 +0100 Subject: [PATCH] process updates in order of size --- client/src/main.rs | 236 +++++++++++++++++++++++---------------------- 1 file changed, 123 insertions(+), 113 deletions(-) diff --git a/client/src/main.rs b/client/src/main.rs index 8e87a38..0fc2b24 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -112,12 +112,34 @@ fn main() { fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::Result<()> { let upgrades = Command::new("pacman").args(["-Sup"]).output()?.stdout; - let lines: Vec<_> = upgrades + let mut lines: Vec<_> = upgrades .lines() .map(|l| l.unwrap()) .filter(|l| !l.starts_with("file")) .inspect(|l| debug!("{}", l)) + .filter_map(|line| { + let (_, filename) = line.rsplit_once('/').unwrap(); + if let Some((oldname, oldpath)) = newest_cached(filename).expect("io error on local disk") { + // try to find the decompressed size for better progress monitoring + use memmap2::Mmap; + let orig = std::fs::File::open(oldpath).expect("io error on local disk"); + // i promise to not open the same file as writable at the same time + let orig = unsafe { Mmap::map(&orig).expect("mmap failed") }; + // 16 megabytes seems like an ok average size + // todo: find the actual average size of a decompressed package + let default_size = 16 * 1024 * 1024; + let dec_size = zstd::bulk::Decompressor::upper_bound(&orig).unwrap_or_else(|| { + debug!("using default size for {oldname}"); + default_size + }); + Some((line, oldname, orig, (dec_size as u64))) + } else { + info!("no cached package found, leaving {} for pacman", filename); + None + } + }) .collect(); + lines.sort_unstable_by_key(|(_, _, _, s)| std::cmp::Reverse(*s)); info!("downloading {} updates", lines.len()); @@ -133,7 +155,7 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R let total_pg = ProgressBar::new(0) .with_style( ProgressStyle::with_template( - "#### {msg} [{wide_bar}] {bytes}/{total_bytes} elapsed: {elapsed} eta: {eta} ####", + "#### {msg} [{wide_bar}] ~{bytes}/{total_bytes} elapsed: {elapsed} eta: {eta} ####", ) .unwrap(), ) @@ -143,7 +165,7 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R total_pg.tick(); let mut set = JoinSet::new(); - for line in lines { + for (line, oldname, orig, mut dec_size) in lines { let client = client.clone(); let maxpar = maxpar.clone(); let maxpar_req = maxpar_req.clone(); @@ -152,126 +174,114 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R let delta_cache = delta_cache.clone(); let total_pg = total_pg.clone(); set.spawn(async move { - debug!("spawned thread for {}", &line); + debug!("spawned task for {}", &line); let (_, filename) = line.rsplit_once('/').unwrap(); let filefut = async { - if let Some((oldname, oldpath)) = newest_cached(filename)? { - // try to find the decompressed size for better progress monitoring - use memmap2::Mmap; - let orig = std::fs::File::open(oldpath)?; - // i promise to not open the same file as writable at the same time - let orig = unsafe { Mmap::map(&orig)? }; - // 16 megabytes seems like an ok average size - // todo: find the actual average size of a decompressed package - let default_size = 16 * 1024 * 1024; - let dec_size = zstd::bulk::Decompressor::upper_bound(&orig).unwrap_or_else(|| { - debug!("using default size for {oldname}"); - default_size - }); - total_pg.inc_length(dec_size.try_into().unwrap()); - - // delta download - let mut file_name = delta_cache.clone(); - file_name.push(filename); - let mut deltafile_name = file_name.clone(); - deltafile_name.as_mut_os_string().push(".delta"); - - let mut deltafile = tokio::fs::File::create(deltafile_name.clone()).await?; - - let style = ProgressStyle::with_template( - "{prefix}{msg} [{wide_bar}] {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}", - ) - .unwrap() - .progress_chars("█▇▆▅▄▃▂▁ "); - - let guard = maxpar_req.acquire().await; - let pg = ProgressBar::new(0) - .with_style(style) - .with_prefix(format!("deltadownload {}", filename)) - .with_message(": waiting for server, this may take up to a few minutes"); - let pg = multi.add(pg); - pg.tick(); - - let mut url: String = server.into(); - url.push('/'); - url.push_str(&oldname); - url.push('/'); - url.push_str(filename); - let mut delta = { - loop { - // catch both client and server timeouts and simply retry - match client.get(&url).send().await { - Ok(d) => match d.status() { - StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => { - debug!("timeout; retrying {}", url) - } - status if !status.is_success() => panic!("request failed with {}", status), - _ => break d, - }, - Err(e) => { - if !e.is_timeout() { - panic!("{}", e); - } + total_pg.inc_length(dec_size); + + // delta download + let mut file_name = delta_cache.clone(); + file_name.push(filename); + let mut deltafile_name = file_name.clone(); + deltafile_name.as_mut_os_string().push(".delta"); + + let mut deltafile = tokio::fs::File::create(deltafile_name.clone()).await?; + + let style = ProgressStyle::with_template( + "{prefix}{msg} [{wide_bar}] {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}", + ) + .unwrap() + .progress_chars("█▇▆▅▄▃▂▁ "); + + let guard = maxpar_req.acquire().await; + let pg = ProgressBar::new(0) + .with_style(style) + .with_prefix(format!("deltadownload {}", filename)) + .with_message(": waiting for server, this may take up to a few minutes"); + let pg = multi.add(pg); + pg.tick(); + + let mut url: String = server.into(); + url.push('/'); + url.push_str(&oldname); + url.push('/'); + url.push_str(filename); + let mut delta = { + loop { + // catch both client and server timeouts and simply retry + match client.get(&url).send().await { + Ok(d) => match d.status() { + StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => { + debug!("timeout; retrying {}", url) + } + status if !status.is_success() => panic!("request failed with {}", status), + _ => break d, + }, + Err(e) => { + if !e.is_timeout() { + panic!("{}", e); } } } - }; - std::mem::drop(guard); - pg.set_length(delta.content_length().unwrap_or(0)); - pg.set_message(""); - pg.tick(); - - // acquire guard after sending request but before using the body - // so the deltas can get generated on the server as parallel as possible - // but the download does not get fragmented/overwhelmed - let guard = maxpar.acquire().await; - let mut tries = 0; - 'retry: loop { - loop { - match delta.chunk().await { - Ok(Some(chunk)) => { - let len = chunk.len(); - deltafile.write_all(&chunk).await?; - pg.inc(len.try_into().unwrap()); - } - Ok(None) => break 'retry, - Err(e) => { - if tries < 3 { - deltafile.seek(std::io::SeekFrom::Start(0)).await?; - pg.set_position(0); - debug!("download failed {tries}/3"); - tries += 1; - continue 'retry; - } else { - anyhow::bail!(e); - } + } + }; + std::mem::drop(guard); + pg.set_length(delta.content_length().unwrap_or(0)); + pg.set_message(""); + pg.tick(); + total_pg.inc(dec_size / 3); + dec_size -= dec_size / 3; + + // acquire guard after sending request but before using the body + // so the deltas can get generated on the server as parallel as possible + // but the download does not get fragmented/overwhelmed + let guard = maxpar.acquire().await; + let mut tries = 0; + 'retry: loop { + loop { + match delta.chunk().await { + Ok(Some(chunk)) => { + let len = chunk.len(); + deltafile.write_all(&chunk).await?; + pg.inc(len.try_into().unwrap()); + } + Ok(None) => break 'retry, + Err(e) => { + if tries < 3 { + deltafile.seek(std::io::SeekFrom::Start(0)).await?; + pg.set_position(0); + debug!("download failed {tries}/3"); + tries += 1; + continue 'retry; + } else { + anyhow::bail!(e); } } } } - drop(guard); - info!( - "downloaded {} in {} seconds", - deltafile_name.display(), - pg.elapsed().as_secs_f64() - ); - - { - let p_pg = ProgressBar::new(0); - let p_pg = multi.insert_after(&pg, p_pg); - pg.finish_and_clear(); - multi.remove(&pg); - tokio::task::spawn_blocking(move || -> Result<_, _> { - let orig = Cursor::new(orig); - let orig = zstd::decode_all(orig).unwrap(); - apply_patch(&orig, &deltafile_name, &file_name, p_pg) - }) - .await??; - } - total_pg.inc(dec_size.try_into().unwrap()); - } else { - info!("no cached package found, leaving {} for pacman", filename); } + drop(guard); + info!( + "downloaded {} in {} seconds", + deltafile_name.display(), + pg.elapsed().as_secs_f64() + ); + total_pg.inc(dec_size / 2); + dec_size -= dec_size / 2; + + { + let p_pg = ProgressBar::new(0); + let p_pg = multi.insert_after(&pg, p_pg); + pg.finish_and_clear(); + multi.remove(&pg); + tokio::task::spawn_blocking(move || -> Result<_, _> { + let orig = Cursor::new(orig); + let orig = zstd::decode_all(orig).unwrap(); + apply_patch(&orig, &deltafile_name, &file_name, p_pg) + }) + .await??; + } + total_pg.inc(dec_size); Ok::<(), anyhow::Error>(()) };