Skip to content

Commit

Permalink
feat: add tracing output
Browse files Browse the repository at this point in the history
  • Loading branch information
Cyrix126 committed Jun 14, 2024
1 parent c9b68b5 commit 3fe586e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 17 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ Work in progress, not functional.
### TODO
- [x] allows to limit cache by size
- [x] organize code in modules
- [x] tracing
- [ ] remove allocation when possible
- [ ] tracing
- [ ] tests
- [ ] benchmarks
- [ ] documentation
Expand Down
35 changes: 26 additions & 9 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use reqwest::{
StatusCode,
};
use tokio::spawn;
use tracing::{debug, info, trace, warn};
use uuid::Uuid;

use crate::{
Expand All @@ -25,36 +26,46 @@ pub async fn delete_entry(
Path(path): Path<String>,
State(state): State<AppState>,
) -> impl IntoResponse {
debug!("new request to delete a cache entry");
if let Ok(uuid) = Uuid::from_str(&path) {
state.cache.invalidate(&uuid).await;
state.index_cache.lock().await.delete_uuid_from_index(&uuid);
debug!("cache entry removed");
return StatusCode::OK;
}
warn!("deletion request for invalid uuid");
StatusCode::NOT_FOUND
}
// handle delete_all endpoint
pub async fn delete_all(State(state): State<AppState>) -> impl IntoResponse {
debug!("new request to delete all cache entries");
state.cache.invalidate_all();
*state.index_cache.lock().await = IndexCache::new();
debug!("all cache cleared");
StatusCode::OK
}

// handle request
pub async fn handler(State(state): State<AppState>, request: Request) -> impl IntoResponse {
debug!("new request for backend");
trace!("{:?}", request);
// check if etag is present in headers
if state.cache.check_etag(request.headers()) {
// respond 304 if etag is present in cache
debug!("etag is valid, returning 304 status");
return StatusCode::NOT_MODIFIED.into_response();
}

// if response is in cache with valid header if any, return response from cache
let index = state.index_cache;
if let Some(uuid) = index.lock().await.request_to_uuid(&request) {
if let Some(rep) = state.cache.get(&uuid).await {
info!("cache entry is served");
return rep.into_response();
} else {
// present in index_cache but not in cache, it means it was automatically invalidated.
// must update index cache.
debug!("index was not updated, entry in cache was deleted automaticcaly");
index.lock().await.delete_uuid_from_index(&uuid);
}
}
Expand All @@ -63,12 +74,13 @@ pub async fn handler(State(state): State<AppState>, request: Request) -> impl In
let req_method = request.method().to_owned();
let req_headers = request.headers().to_owned();
let req_uri = request.uri().to_owned();
debug!("response was not cached, requesting backend service");
let url_backend = state.config.to_backend_uri(&req_uri);
debug!("Request URI retrieved: {req_uri}");
debug!("Request URL transmitted:{url_backend}");
match state
.client
.request(
request.method().to_owned(),
state.config.to_backend_uri(request.uri()),
)
.request(request.method().to_owned(), url_backend)
.headers(request.headers().to_owned())
.body(to_bytes(request.into_body(), usize::MAX).await.unwrap())
.send()
Expand All @@ -78,15 +90,16 @@ pub async fn handler(State(state): State<AppState>, request: Request) -> impl In
// first send Response and then cache so client wait as little as possible.
// need to add Etag headers to response
let uuid = Uuid::new_v4();

let cache = state.cache.clone();
rep.headers_mut()
.insert(ETAG, HeaderValue::from_str(&uuid.to_string()).unwrap());
let headers = rep.headers().to_owned();
let req_headers_match_vary = match headers_match_vary(&req_headers, headers.get(VARY)) {
Ok(h) => h,
Err(_err) => {
// seems backend service response contains malformated header value for Vary
Err(err) => {
warn!("backend service contains malformated header value for Vary");
debug!("{err}");
trace!("{:?}", rep);
HeaderMap::new()
}
};
Expand All @@ -98,17 +111,21 @@ pub async fn handler(State(state): State<AppState>, request: Request) -> impl In
);

spawn(enc!((uuid, axum_rep, index) async move {
debug!("adding the new response to the cache and indexing");
// add entry to index cache
index.lock().await.add_entry(uuid, req_method, req_uri, req_headers_match_vary);
// add response to cache
cache.insert(uuid, axum_rep).await;

}));
debug!("serving new response with added header Etag");
trace!("{:?}", axum_rep);
axum_rep.into_response()
}
Err(_err) => {
Err(err) => {
// the request to the backend failed

warn!("the request to the backend service failed");
debug!("{}", err);
StatusCode::INTERNAL_SERVER_ERROR.into_response()
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ impl Config {
.expect("could not parse to Url")
} else {
// no uri recognized, using fallback backend
self.fall_back_endpoint.to_owned()
Url::parse(&format!("{}{}", self.fall_back_endpoint, uri_request).replace("//", "/"))
.expect("could not parse to Url")
}
}
}
Expand Down
13 changes: 7 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use index_cache::IndexCache;
use reqwest::Client;
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::info;

/// Handlers
mod api;
Expand All @@ -28,26 +29,26 @@ struct AppState {

#[tokio::main]
async fn main() -> Result<()> {
// load config
let config = confy::load_path::<Config>("/etc/mnemosyne")?;
// create cache moka
// create state app
tracing_subscriber::fmt::init();
info!("loading configuration file");
let config = confy::load_path::<Config>("/etc/mnemosyne/config.toml")?;
let listen = config.listen_address;
info!("creating the cache and index...");
let state = AppState {
cache: Cache::new(&config),
config,
index_cache: Arc::new(Mutex::new(IndexCache::new())),
client: Client::new(),
};
info!("Done.");
// create route for cache API
let route = Router::new()
.route("/delete/:uuid", delete(api::delete_entry))
.route("/delete_all", delete(api::delete_all))
.fallback(api::handler)
.with_state(state);
info!("starting to listen on {listen}");
let listener = tokio::net::TcpListener::bind(listen).await?;
axum::serve(listener, route.into_make_service()).await?;
// create listener for all endpoints

Ok(())
}

0 comments on commit 3fe586e

Please sign in to comment.