Skip to content

Commit

Permalink
process updates in order of size
Browse files Browse the repository at this point in the history
  • Loading branch information
djugei committed Mar 13, 2024
1 parent 1af8a4c commit e2b8937
Showing 1 changed file with 123 additions and 113 deletions.
236 changes: 123 additions & 113 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,34 @@ fn main() {

fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::Result<()> {
let upgrades = Command::new("pacman").args(["-Sup"]).output()?.stdout;
let lines: Vec<_> = upgrades
let mut lines: Vec<_> = upgrades
.lines()
.map(|l| l.unwrap())
.filter(|l| !l.starts_with("file"))
.inspect(|l| debug!("{}", l))
.filter_map(|line| {
let (_, filename) = line.rsplit_once('/').unwrap();
if let Some((oldname, oldpath)) = newest_cached(filename).expect("io error on local disk") {
// try to find the decompressed size for better progress monitoring
use memmap2::Mmap;
let orig = std::fs::File::open(oldpath).expect("io error on local disk");
// i promise to not open the same file as writable at the same time
let orig = unsafe { Mmap::map(&orig).expect("mmap failed") };
// 16 megabytes seems like an ok average size
// todo: find the actual average size of a decompressed package
let default_size = 16 * 1024 * 1024;
let dec_size = zstd::bulk::Decompressor::upper_bound(&orig).unwrap_or_else(|| {
debug!("using default size for {oldname}");
default_size
});
Some((line, oldname, orig, (dec_size as u64)))
} else {
info!("no cached package found, leaving {} for pacman", filename);
None
}
})
.collect();
lines.sort_unstable_by_key(|(_, _, _, s)| std::cmp::Reverse(*s));

info!("downloading {} updates", lines.len());

Expand All @@ -133,7 +155,7 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R
let total_pg = ProgressBar::new(0)
.with_style(
ProgressStyle::with_template(
"#### {msg} [{wide_bar}] {bytes}/{total_bytes} elapsed: {elapsed} eta: {eta} ####",
"#### {msg} [{wide_bar}] ~{bytes}/{total_bytes} elapsed: {elapsed} eta: {eta} ####",
)
.unwrap(),
)
Expand All @@ -143,7 +165,7 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R
total_pg.tick();

let mut set = JoinSet::new();
for line in lines {
for (line, oldname, orig, mut dec_size) in lines {
let client = client.clone();
let maxpar = maxpar.clone();
let maxpar_req = maxpar_req.clone();
Expand All @@ -152,126 +174,114 @@ fn upgrade(server: Url, delta_cache: PathBuf, multi: MultiProgress) -> anyhow::R
let delta_cache = delta_cache.clone();
let total_pg = total_pg.clone();
set.spawn(async move {
debug!("spawned thread for {}", &line);
debug!("spawned task for {}", &line);
let (_, filename) = line.rsplit_once('/').unwrap();
let filefut = async {
if let Some((oldname, oldpath)) = newest_cached(filename)? {
// try to find the decompressed size for better progress monitoring
use memmap2::Mmap;
let orig = std::fs::File::open(oldpath)?;
// i promise to not open the same file as writable at the same time
let orig = unsafe { Mmap::map(&orig)? };
// 16 megabytes seems like an ok average size
// todo: find the actual average size of a decompressed package
let default_size = 16 * 1024 * 1024;
let dec_size = zstd::bulk::Decompressor::upper_bound(&orig).unwrap_or_else(|| {
debug!("using default size for {oldname}");
default_size
});
total_pg.inc_length(dec_size.try_into().unwrap());

// delta download
let mut file_name = delta_cache.clone();
file_name.push(filename);
let mut deltafile_name = file_name.clone();
deltafile_name.as_mut_os_string().push(".delta");

let mut deltafile = tokio::fs::File::create(deltafile_name.clone()).await?;

let style = ProgressStyle::with_template(
"{prefix}{msg} [{wide_bar}] {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}",
)
.unwrap()
.progress_chars("█▇▆▅▄▃▂▁ ");

let guard = maxpar_req.acquire().await;
let pg = ProgressBar::new(0)
.with_style(style)
.with_prefix(format!("deltadownload {}", filename))
.with_message(": waiting for server, this may take up to a few minutes");
let pg = multi.add(pg);
pg.tick();

let mut url: String = server.into();
url.push('/');
url.push_str(&oldname);
url.push('/');
url.push_str(filename);
let mut delta = {
loop {
// catch both client and server timeouts and simply retry
match client.get(&url).send().await {
Ok(d) => match d.status() {
StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {
debug!("timeout; retrying {}", url)
}
status if !status.is_success() => panic!("request failed with {}", status),
_ => break d,
},
Err(e) => {
if !e.is_timeout() {
panic!("{}", e);
}
total_pg.inc_length(dec_size);

// delta download
let mut file_name = delta_cache.clone();
file_name.push(filename);
let mut deltafile_name = file_name.clone();
deltafile_name.as_mut_os_string().push(".delta");

let mut deltafile = tokio::fs::File::create(deltafile_name.clone()).await?;

let style = ProgressStyle::with_template(
"{prefix}{msg} [{wide_bar}] {bytes}/{total_bytes} {binary_bytes_per_sec} {eta}",
)
.unwrap()
.progress_chars("█▇▆▅▄▃▂▁ ");

let guard = maxpar_req.acquire().await;
let pg = ProgressBar::new(0)
.with_style(style)
.with_prefix(format!("deltadownload {}", filename))
.with_message(": waiting for server, this may take up to a few minutes");
let pg = multi.add(pg);
pg.tick();

let mut url: String = server.into();
url.push('/');
url.push_str(&oldname);
url.push('/');
url.push_str(filename);
let mut delta = {
loop {
// catch both client and server timeouts and simply retry
match client.get(&url).send().await {
Ok(d) => match d.status() {
StatusCode::REQUEST_TIMEOUT | StatusCode::GATEWAY_TIMEOUT => {
debug!("timeout; retrying {}", url)
}
status if !status.is_success() => panic!("request failed with {}", status),
_ => break d,
},
Err(e) => {
if !e.is_timeout() {
panic!("{}", e);
}
}
}
};
std::mem::drop(guard);
pg.set_length(delta.content_length().unwrap_or(0));
pg.set_message("");
pg.tick();

// acquire guard after sending request but before using the body
// so the deltas can get generated on the server as parallel as possible
// but the download does not get fragmented/overwhelmed
let guard = maxpar.acquire().await;
let mut tries = 0;
'retry: loop {
loop {
match delta.chunk().await {
Ok(Some(chunk)) => {
let len = chunk.len();
deltafile.write_all(&chunk).await?;
pg.inc(len.try_into().unwrap());
}
Ok(None) => break 'retry,
Err(e) => {
if tries < 3 {
deltafile.seek(std::io::SeekFrom::Start(0)).await?;
pg.set_position(0);
debug!("download failed {tries}/3");
tries += 1;
continue 'retry;
} else {
anyhow::bail!(e);
}
}
};
std::mem::drop(guard);
pg.set_length(delta.content_length().unwrap_or(0));
pg.set_message("");
pg.tick();
total_pg.inc(dec_size / 3);
dec_size -= dec_size / 3;

// acquire guard after sending request but before using the body
// so the deltas can get generated on the server as parallel as possible
// but the download does not get fragmented/overwhelmed
let guard = maxpar.acquire().await;
let mut tries = 0;
'retry: loop {
loop {
match delta.chunk().await {
Ok(Some(chunk)) => {
let len = chunk.len();
deltafile.write_all(&chunk).await?;
pg.inc(len.try_into().unwrap());
}
Ok(None) => break 'retry,
Err(e) => {
if tries < 3 {
deltafile.seek(std::io::SeekFrom::Start(0)).await?;
pg.set_position(0);
debug!("download failed {tries}/3");
tries += 1;
continue 'retry;
} else {
anyhow::bail!(e);
}
}
}
}
drop(guard);
info!(
"downloaded {} in {} seconds",
deltafile_name.display(),
pg.elapsed().as_secs_f64()
);

{
let p_pg = ProgressBar::new(0);
let p_pg = multi.insert_after(&pg, p_pg);
pg.finish_and_clear();
multi.remove(&pg);
tokio::task::spawn_blocking(move || -> Result<_, _> {
let orig = Cursor::new(orig);
let orig = zstd::decode_all(orig).unwrap();
apply_patch(&orig, &deltafile_name, &file_name, p_pg)
})
.await??;
}
total_pg.inc(dec_size.try_into().unwrap());
} else {
info!("no cached package found, leaving {} for pacman", filename);
}
drop(guard);
info!(
"downloaded {} in {} seconds",
deltafile_name.display(),
pg.elapsed().as_secs_f64()
);
total_pg.inc(dec_size / 2);
dec_size -= dec_size / 2;

{
let p_pg = ProgressBar::new(0);
let p_pg = multi.insert_after(&pg, p_pg);
pg.finish_and_clear();
multi.remove(&pg);
tokio::task::spawn_blocking(move || -> Result<_, _> {
let orig = Cursor::new(orig);
let orig = zstd::decode_all(orig).unwrap();
apply_patch(&orig, &deltafile_name, &file_name, p_pg)
})
.await??;
}
total_pg.inc(dec_size);
Ok::<(), anyhow::Error>(())
};

Expand Down

0 comments on commit e2b8937

Please sign in to comment.