From 1af8a4c51f06fd484ffef6e285de00b10022bd48 Mon Sep 17 00:00:00 2001 From: djugei Date: Wed, 13 Mar 2024 10:51:36 +0100 Subject: [PATCH] fixed and ran rustfmt --- client/src/main.rs | 233 +++++++++++++++++++++------------------------ rustfmt.toml | 1 + server/src/main.rs | 64 ++++++------- 3 files changed, 142 insertions(+), 156 deletions(-) create mode 100644 rustfmt.toml diff --git a/client/src/main.rs b/client/src/main.rs index 25d0441..8e87a38 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -57,8 +57,7 @@ enum Commands { fn main() { // set up a logger that does not conflict with progress bars - let logger = - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); + let logger = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); let multi = MultiProgress::new(); indicatif_log_bridge::LogWrapper::new(multi.clone(), logger) @@ -104,10 +103,7 @@ fn main() { panic!("pacman -Su failed, aborting. Fix errors and try again"); } } - Commands::Download { - server, - delta_cache, - } => { + Commands::Download { server, delta_cache } => { std::fs::create_dir_all(&delta_cache).unwrap(); upgrade(server, delta_cache, multi).unwrap() } @@ -135,14 +131,17 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R let client = reqwest::Client::new(); let total_pg = ProgressBar::new(0) - .with_style(ProgressStyle::with_template("#### {msg} [{wide_bar}] {bytes}/{total_bytes} elapsed: {elapsed} eta: {eta} ####").unwrap()) - .with_message("total progress:") - ; + .with_style( + ProgressStyle::with_template( + "#### {msg} [{wide_bar}] {bytes}/{total_bytes} elapsed: {elapsed} eta: {eta} ####", + ) + .unwrap(), + ) + .with_message("total progress:"); total_pg.enable_steady_tick(Duration::from_millis(100)); let total_pg = multi.add(total_pg); total_pg.tick(); - let mut set = JoinSet::new(); for line in lines { let client = client.clone(); @@ -156,76 +155,77 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R debug!("spawned thread for {}", &line); let (_, filename) = line.rsplit_once('/').unwrap(); let filefut = async { - if let Some((oldname, oldpath)) = newest_cached(filename)? { - // try to find the decompressed size for better progress monitoring - use memmap2::Mmap; - let orig = std::fs::File::open(oldpath)?; - // i promise to not open the same file as writable at the same time - let orig = unsafe { Mmap::map(&orig)?}; - // 16 megabytes seems like an ok average size - // todo: find the actual average size of a decompressed package - let default_size = 16*1024*1024; - let dec_size = zstd::bulk::Decompressor::upper_bound(&orig) - .unwrap_or_else(|| { + if let Some((oldname, oldpath)) = newest_cached(filename)? { + // try to find the decompressed size for better progress monitoring + use memmap2::Mmap; + let orig = std::fs::File::open(oldpath)?; + // i promise to not open the same file as writable at the same time + let orig = unsafe { Mmap::map(&orig)? }; + // 16 megabytes seems like an ok average size + // todo: find the actual average size of a decompressed package + let default_size = 16 * 1024 * 1024; + let dec_size = zstd::bulk::Decompressor::upper_bound(&orig).unwrap_or_else(|| { debug!("using default size for {oldname}"); default_size }); - total_pg.inc_length(dec_size.try_into().unwrap()); - - // delta download - let mut file_name = delta_cache.clone(); - file_name.push(filename); - let mut deltafile_name = file_name.clone(); - deltafile_name.as_mut_os_string().push(".delta"); - - let mut deltafile = tokio::fs::File::create(deltafile_name.clone()) - .await?; - - let style = ProgressStyle::with_template( - "{prefix}{msg} [{wide_bar}] {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}").unwrap() - .progress_chars("█▇▆▅▄▃▂▁ "); - - let guard = maxpar_req.acquire().await; - let pg = ProgressBar::new(0) - .with_style(style) - .with_prefix(format!("deltadownload {}", filename)) - .with_message(": waiting for server, this may take up to a few minutes") - ; - let pg = multi.add(pg); - pg.tick(); - - let mut url: String = server.into(); - url.push('/'); - url.push_str(&oldname); - url.push('/'); - url.push_str(filename); - let mut delta = { - loop { - // catch both client and server timeouts and simply retry - match client.get(&url).send().await { - Ok(d) => { match d.status() { - StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {debug!("retrying request {}", url)} - status if !status.is_success() => panic!("request failed with{}", status), - _=> break d, + total_pg.inc_length(dec_size.try_into().unwrap()); + + // delta download + let mut file_name = delta_cache.clone(); + file_name.push(filename); + let mut deltafile_name = file_name.clone(); + deltafile_name.as_mut_os_string().push(".delta"); + + let mut deltafile = tokio::fs::File::create(deltafile_name.clone()).await?; + + let style = ProgressStyle::with_template( + "{prefix}{msg} [{wide_bar}] {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}", + ) + .unwrap() + .progress_chars("█▇▆▅▄▃▂▁ "); + + let guard = maxpar_req.acquire().await; + let pg = ProgressBar::new(0) + .with_style(style) + .with_prefix(format!("deltadownload {}", filename)) + .with_message(": waiting for server, this may take up to a few minutes"); + let pg = multi.add(pg); + pg.tick(); + + let mut url: String = server.into(); + url.push('/'); + url.push_str(&oldname); + url.push('/'); + url.push_str(filename); + let mut delta = { + loop { + // catch both client and server timeouts and simply retry + match client.get(&url).send().await { + Ok(d) => match d.status() { + StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => { + debug!("timeout; retrying {}", url) + } + status if !status.is_success() => panic!("request failed with {}", status), + _ => break d, + }, + Err(e) => { + if !e.is_timeout() { + panic!("{}", e); + } } } - Err(e) => if !e.is_timeout(){ - panic!("{}", e); - }, - } - debug!("timeout; retrying {}", &url); - } - }; - std::mem::drop(guard); - pg.set_length(delta.content_length().unwrap_or(0)); - pg.set_message(""); - pg.tick(); - - // acquire guard after sending request but before using the body - // so the deltas can get generated on the server as parallel as possible - // but the download does not get fragmented/overwhelmed - let guard = maxpar.acquire().await; - let mut tries = 0; + } + }; + std::mem::drop(guard); + pg.set_length(delta.content_length().unwrap_or(0)); + pg.set_message(""); + pg.tick(); + + // acquire guard after sending request but before using the body + // so the deltas can get generated on the server as parallel as possible + // but the download does not get fragmented/overwhelmed + let guard = maxpar.acquire().await; + let mut tries = 0; 'retry: loop { loop { match delta.chunk().await { @@ -233,42 +233,46 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R let len = chunk.len(); deltafile.write_all(&chunk).await?; pg.inc(len.try_into().unwrap()); - }, + } Ok(None) => break 'retry, Err(e) => { - if tries <3 { + if tries < 3 { deltafile.seek(std::io::SeekFrom::Start(0)).await?; pg.set_position(0); debug!("download failed {tries}/3"); - tries +=1; + tries += 1; continue 'retry; } else { anyhow::bail!(e); } - }, + } } } + } + drop(guard); + info!( + "downloaded {} in {} seconds", + deltafile_name.display(), + pg.elapsed().as_secs_f64() + ); + + { + let p_pg = ProgressBar::new(0); + let p_pg = multi.insert_after(&pg, p_pg); + pg.finish_and_clear(); + multi.remove(&pg); + tokio::task::spawn_blocking(move || -> Result<_, _> { + let orig = Cursor::new(orig); + let orig = zstd::decode_all(orig).unwrap(); + apply_patch(&orig, &deltafile_name, &file_name, p_pg) + }) + .await??; + } + total_pg.inc(dec_size.try_into().unwrap()); + } else { + info!("no cached package found, leaving {} for pacman", filename); } - drop(guard); - info!("downloaded {} in {} seconds", deltafile_name.display(), pg.elapsed().as_secs_f64()); - - { - let p_pg = ProgressBar::new(0); - let p_pg = multi.insert_after(&pg, p_pg); - pg.finish_and_clear(); - multi.remove(&pg); - tokio::task::spawn_blocking(move || -> Result<_,_> { - let orig = Cursor::new(orig); - let orig = zstd::decode_all(orig).unwrap(); - apply_patch(&orig, &deltafile_name, &file_name, p_pg) - }) - .await??; - } - total_pg.inc(dec_size.try_into().unwrap()); - } else { - info!("no cached package found, leaving {} for pacman", filename); - } - Ok::<(),anyhow::Error>(()) + Ok::<(), anyhow::Error>(()) }; let sigfut = async { @@ -284,13 +288,13 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R while let Some(chunk) = res.chunk().await? { sigfile.write_all(&chunk).await? } - Ok::<(),anyhow::Error>(()) + Ok::<(), anyhow::Error>(()) }; let (f, s) = tokio::join!(filefut, sigfut); f.context("creating delat file failed")?; s.context("creating signature file failed")?; - Ok::<(),anyhow::Error>(()) + Ok::<(), anyhow::Error>(()) }); } @@ -298,7 +302,6 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R if let Err(e) = res { error!("{}", e); error!("if the error is temporary, you can try running the command again"); - } } }); @@ -334,11 +337,7 @@ fn gen_delta(orig: &Path, new: &Path, patch: &Path) -> Result<(), std::io::Error let new = OpenOptions::new().read(true).open(new).unwrap(); let mut new = zstd::Decoder::new(new)?; - let patch = OpenOptions::new() - .write(true) - .create(true) - .open(patch) - .unwrap(); + let patch = OpenOptions::new().write(true).create(true).open(patch).unwrap(); let mut patch = zstd::Encoder::new(patch, 22)?; ddelta::generate_chunked(&mut orig, &mut new, &mut patch, None, |_| {}).unwrap(); @@ -347,23 +346,13 @@ fn gen_delta(orig: &Path, new: &Path, patch: &Path) -> Result<(), std::io::Error Ok(()) } -fn apply_patch( - orig: &[u8], - patch: &Path, - new: &Path, - pb: ProgressBar, -) -> Result<(), std::io::Error> { - let style = ProgressStyle::with_template( - "{prefix} {wide_bar} {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}", - ) - .unwrap() - .progress_chars("█▇▆▅▄▃▂▁ "); +fn apply_patch(orig: &[u8], patch: &Path, new: &Path, pb: ProgressBar) -> Result<(), std::io::Error> { + let style = ProgressStyle::with_template("{prefix} {wide_bar} {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}") + .unwrap() + .progress_chars("█▇▆▅▄▃▂▁ "); pb.set_style(style); - pb.set_prefix(format!( - "patching {}", - new.file_name().unwrap().to_string_lossy() - )); + pb.set_prefix(format!("patching {}", new.file_name().unwrap().to_string_lossy())); pb.set_length(orig.len().try_into().unwrap()); let orig = Cursor::new(orig); diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..7530651 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +max_width = 120 diff --git a/server/src/main.rs b/server/src/main.rs index 4dc1a39..728bcf0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -40,11 +40,7 @@ fn main() { path.push(p.to_string()); path }; - async fn inner_f( - client: Client, - key: Package, - mut file: File, - ) -> Result { + async fn inner_f(client: Client, key: Package, mut file: File) -> Result { use tokio::io::AsyncWriteExt; let mut uri = String::new(); @@ -61,7 +57,10 @@ fn main() { response = client.get(uri).send().await?; } if !response.status().is_success() { - return Err(DownloadError::Status{ status: response.status(), url: response.url().clone()}) + return Err(DownloadError::Status { + status: response.status(), + url: response.url().clone(), + }); } while let Some(mut chunk) = response.chunk().await? { @@ -102,27 +101,26 @@ fn main() { let new = new.into_std().await; let mut new = zstd::Decoder::new(new)?; - let f: tokio::task::JoinHandle> = - tokio::task::spawn_blocking(move || { - let mut zpatch = zstd::Encoder::new(patch, 22)?; - let e = zpatch.set_parameter(zstd::zstd_safe::CParameter::NbWorkers(4)); - if let Err(e) = e { - debug!("failed to make zstd multithread"); - } - let mut last_report = 0; - ddelta::generate_chunked(&mut old, &mut new, &mut zpatch, None, |s| match s { - ddelta::State::Reading => debug!("reading"), - ddelta::State::Sorting => debug!("sorting"), - ddelta::State::Working(p) => { - const MB: u64 = 1024 * 1024; - if p > last_report + (8 * MB) { - debug!("working: {}MB done", p / MB); - last_report = p; - } + let f: tokio::task::JoinHandle> = tokio::task::spawn_blocking(move || { + let mut zpatch = zstd::Encoder::new(patch, 22)?; + let e = zpatch.set_parameter(zstd::zstd_safe::CParameter::NbWorkers(4)); + if let Err(e) = e { + debug!("failed to make zstd multithread"); + } + let mut last_report = 0; + ddelta::generate_chunked(&mut old, &mut new, &mut zpatch, None, |s| match s { + ddelta::State::Reading => debug!("reading"), + ddelta::State::Sorting => debug!("sorting"), + ddelta::State::Working(p) => { + const MB: u64 = 1024 * 1024; + if p > last_report + (8 * MB) { + debug!("working: {}MB done", p / MB); + last_report = p; } - })?; - Ok(zpatch.finish()?) - }); + } + })?; + Ok(zpatch.finish()?) + }); let f = f.await.expect("threading error")?; let f = File::from_std(f); @@ -156,8 +154,10 @@ enum DownloadError { #[error("http request failed: {0}")] Connection(#[from] reqwest::Error), #[error("bad status code: {status} while fetching {url}")] - Status{ url: reqwest::Url, status: reqwest::StatusCode}, - + Status { + url: reqwest::Url, + status: reqwest::StatusCode, + }, } #[derive(Error, Debug)] enum DeltaError { @@ -192,8 +192,7 @@ where let f = s.get_or_generate(delta).await; snd.send(f).expect("request was probably canceled"); }); - rcv.await - .expect("uncaught error in FileCache, thats a bug")?? + rcv.await.expect("uncaught error in FileCache, thats a bug")?? }; let len = file.metadata().await?.len(); @@ -220,10 +219,7 @@ where } async fn root() -> (StatusCode, &'static str) { - ( - StatusCode::OK, - "welcome to the inofficial archlinux delta server", - ) + (StatusCode::OK, "welcome to the inofficial archlinux delta server") } async fn fallback() -> (StatusCode, &'static str) {