diff --git a/async_file_cache/Cargo.toml b/async_file_cache/Cargo.toml index fb390d2..8a88133 100644 --- a/async_file_cache/Cargo.toml +++ b/async_file_cache/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] tokio = { version = "*", features = ["sync", "fs", "io-util", "macros"] } -log = "*" +tracing = "*" [dev-dependencies] tempfile = "*" diff --git a/async_file_cache/src/lib.rs b/async_file_cache/src/lib.rs index 5e188ff..f6c4a32 100644 --- a/async_file_cache/src/lib.rs +++ b/async_file_cache/src/lib.rs @@ -1,7 +1,7 @@ use core::future::Future; -use log::{debug, trace}; use std::ffi::OsString; use std::{collections::HashMap, hash::Hash, io::ErrorKind, path::PathBuf, pin::pin}; +use tracing::{debug, trace}; use tokio::io::AsyncSeekExt; use tokio::{fs::File, sync::Mutex, sync::Semaphore}; diff --git a/server/Cargo.toml b/server/Cargo.toml index f661b10..9f8bd67 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -6,18 +6,23 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -axum = { version = "*", features = [] } -hyper = { version = "*", features = ["client", "http1", "stream", "runtime"] } -tokio = { version = "*", features = ["rt", "rt-multi-thread", "sync", "fs", "io-util", "macros"] } +thiserror = "*" +anyhow = "*" + +tracing = "*" +console-subscriber = "*" + +tokio = { version = "*", features = ["rt", "rt-multi-thread", "sync", "fs", "io-util", "macros", "tracing"] } tokio-util = { version = "*", features = ["io"] } futures-util = "*" -log = "*" -env_logger = "*" -ddelta = "*" -zstd = { version = "*", features = ["zstdmt"] } + +axum = { version = "*", features = [] } +hyper = { version = "*", features = ["client", "http1", "stream", "runtime"] } reqwest = { version = "*", default-features = false, features = ["rustls-tls"]} + +ddelta = { path = "../../ddelta-rs"} +zstd = { version = "*", features = ["zstdmt"] } +strsim = "*" + parsing = {path = "../parsing"} -thiserror = "*" -anyhow = "*" async_file_cache = {path = "../async_file_cache"} -strsim = "*" diff --git a/server/src/main.rs b/server/src/main.rs index 2a08942..d0d46b6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -7,10 +7,10 @@ use axum::{ Router, }; use core::future::Future; -use log::{debug, error, info}; use std::{panic, path::PathBuf, sync::Arc}; use thiserror::Error; use tokio::fs::File; +use tracing::{debug, error, info, Instrument}; use async_file_cache::FileCache; @@ -25,7 +25,7 @@ type Str = Box; use reqwest::Client; fn main() { - env_logger::init(); + console_subscriber::init(); let mut path = PathBuf::from(LOCAL); path.push("pkg"); @@ -41,20 +41,18 @@ fn main() { path.push(p.to_string()); path }; + #[tracing::instrument(level = "info", skip(client, file), "Downloading")] async fn inner_f(client: Client, key: Package, mut file: File) -> Result { use tokio::io::AsyncWriteExt; - let mut uri = String::new(); - uri.push_str(MIRROR); - uri.push_str(&key.to_string()); + let uri = format!("{MIRROR}{key}"); + debug!(key = key.to_string(), uri, "getting from primary"); let mut response = client.get(uri).send().await?; if response.status() == reqwest::StatusCode::NOT_FOUND { // fall back to archive mirror - info!("using fallback mirror for {key}"); - let mut uri = String::new(); - uri.push_str(FALLBACK_MIRROR); - uri.push_str(&key.to_string()); + info!(key = key.to_string(), "using fallback mirror"); + let uri = format!("{FALLBACK_MIRROR}{key}"); response = client.get(uri).send().await?; } if !response.status().is_success() { @@ -91,6 +89,7 @@ fn main() { F: Send + Fn(S, Package, File) -> FF, FF: Future> + Send, { + let keystring = key.to_string(); let old = state.get_or_generate(key.clone().get_old()); let new = state.get_or_generate(key.get_new()); let (old, new) = tokio::join!(old, new); @@ -101,6 +100,7 @@ fn main() { let mut old = zstd::Decoder::new(old)?; let new = new.into_std().await; let mut new = zstd::Decoder::new(new)?; + let span = tracing::info_span!("delta request", key = keystring); let f: tokio::task::JoinHandle> = tokio::task::spawn_blocking(move || { let mut zpatch = zstd::Encoder::new(patch, 22)?; @@ -110,19 +110,19 @@ fn main() { } let mut last_report = 0; ddelta::generate_chunked(&mut old, &mut new, &mut zpatch, None, |s| match s { - ddelta::State::Reading => debug!("reading"), - ddelta::State::Sorting => debug!("sorting"), + ddelta::State::Reading => debug!(key = keystring, "reading"), + ddelta::State::Sorting => debug!(key = keystring, "sorting"), ddelta::State::Working(p) => { const MB: u64 = 1024 * 1024; if p > last_report + (8 * MB) { - debug!("working: {}MB done", p / MB); + debug!(key = keystring, "working: {}MB done", p / MB); last_report = p; } } })?; Ok(zpatch.finish()?) }); - let f = f.await.expect("threading error")?; + let f = f.instrument(span).await.expect("threading error")?; let f = File::from_std(f); @@ -182,6 +182,7 @@ where F: Send + Sync + 'static + Fn(S, Delta, File) -> FF, FF: Send + 'static + Future>, { + let c_span = tracing::info_span!("delta request", from, to); let c = || async { let from = Package::try_from(&*from)?; let to = Package::try_from(&*to)?; @@ -217,7 +218,7 @@ where ]; anyhow::Ok((StatusCode::OK, headers, body)) }; - c().await.map_err(|e| { + c().instrument(c_span).await.map_err(|e| { error!("{:?}", e); ( axum::http::status::StatusCode::INTERNAL_SERVER_ERROR,