From 1605df1585defd3e045cb4e727084367a47b0a0e Mon Sep 17 00:00:00 2001 From: janskiba Date: Thu, 4 Apr 2024 15:27:21 +0200 Subject: [PATCH 1/2] Retry getting catalogue from prism --- src/catalogue.rs | 46 ++++++++++++++++++++++++++++++++-------------- src/main.rs | 10 +++++++--- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/src/catalogue.rs b/src/catalogue.rs index d618fa5..888eb9b 100644 --- a/src/catalogue.rs +++ b/src/catalogue.rs @@ -1,8 +1,9 @@ -use std::{collections::BTreeMap, time::Duration}; +use std::{collections::BTreeMap, sync::Arc, time::Duration}; use reqwest::Url; use serde_json::{json, Value}; -use tracing::{debug, info}; +use tokio::sync::Mutex; +use tracing::{debug, info, warn}; pub type Criteria = BTreeMap; @@ -16,21 +17,40 @@ fn get_element<'a>(count: &'a CriteriaGroups, key1: &'a str, key2: &'a str, key3 .and_then(|criteria| criteria.get(key3)) } +pub fn spawn_thing(catalogue_url: Url, prism_url: Url) -> Arc> { + let thing: Arc> = Arc::default(); + let thing1 = thing.clone(); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(10)).await; + loop { + match get_extended_json(catalogue_url.clone(), prism_url.clone()).await { + Ok(new_value) => { + *thing1.lock().await = new_value; + info!("Updated Catalogue!"); + tokio::time::sleep(Duration::from_secs(60 * 60)).await; + }, + Err(err) => { + warn!("Failed to get thing: {err}.\n Retrying in 5s."); + tokio::time::sleep(Duration::from_secs(5)).await; + }, + } + } + }); + + thing +} -pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Value { +pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Result { debug!("Fetching catalogue from {catalogue_url} ..."); let resp = reqwest::Client::new() .get(catalogue_url) .timeout(Duration::from_secs(30)) .send() - .await - .expect("Unable to fetch catalogue from upstream; please check URL specified in config."); + .await?; - let mut json: Value = resp.json().await - .expect("Unable to parse catalogue from upstream; please check URL specified in config."); + let mut json: Value = resp.json().await?; -// tokio::time::sleep(Duration::from_secs(10)).await; let prism_resp = reqwest::Client::new() .post(format!("{}criteria", prism_url)) @@ -38,17 +58,15 @@ pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Value { .body("{\"sites\": []}") .timeout(Duration::from_secs(300)) .send() - .await - .expect("Unable to fetch response from Prism; please check it's running."); + .await?; - let mut counts: CriteriaGroups = prism_resp.json().await - .expect("Unable to parse response from Prism into CriteriaGroups"); + let mut counts: CriteriaGroups = prism_resp.json().await?; recurse(&mut json, &mut counts); //TODO remove from counts once copied into catalogue to make it O(n log n) info!("Catalogue built successfully."); - json + Ok(json) } /// Key order: group key (e.g. patient) @@ -119,4 +137,4 @@ fn recurse(json: &mut Value, counts: &mut CriteriaGroups) { }, _ => {} } -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index ace0b20..1815d78 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use axum::{ extract::{Json, Path, Query, State}, http::HeaderValue, @@ -13,6 +15,7 @@ use once_cell::sync::Lazy; use reqwest::{header, Method, StatusCode}; use serde::{Deserialize, Serialize}; use serde_json::Value; +use tokio::sync::Mutex; use tower_http::cors::CorsLayer; use tracing::{info, warn, Level}; use tracing_subscriber::{util::SubscriberInitExt, EnvFilter}; @@ -34,7 +37,7 @@ static BEAM_CLIENT: Lazy = Lazy::new(|| { #[derive(Clone)] struct SharedState { - extended_json: Value, + extended_json: Arc>, } #[tokio::main] @@ -53,7 +56,7 @@ async fn main() { .allow_headers([header::CONTENT_TYPE]); let make_service = if let Some(url) = CONFIG.catalogue_url.clone() { - let extended_json = catalogue::get_extended_json(url, CONFIG.prism_url.clone()).await; + let extended_json = catalogue::spawn_thing(url, CONFIG.prism_url.clone()); let state = SharedState { extended_json }; let app = Router::new() @@ -161,5 +164,6 @@ fn convert_response(response: reqwest::Response) -> axum::response::Response { } async fn handle_get_catalogue(State(state): State) -> Json { - Json(state.extended_json) + // TODO: We can totally avoid this clone by using axum_extra ErasedJson + Json(state.extended_json.lock().await.clone()) } From 870c2a16900c15d4b6cc0198c0fc00d7677c9085 Mon Sep 17 00:00:00 2001 From: lablans Date: Thu, 4 Apr 2024 13:36:09 +0000 Subject: [PATCH 2/2] Fix clap choking on empty URL --- src/main.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main.rs b/src/main.rs index 1815d78..c97c69a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -48,6 +48,13 @@ async fn main() { .finish() .init(); + // TODO: Remove this workaround once clap manages to not choke on URL "". + if let Ok(var) = std::env::var("CATALOGUE_URL") { + if var.is_empty() { + std::env::remove_var("CATALOGUE_URL"); + } + } + info!("{:#?}", Lazy::force(&CONFIG)); let cors = CorsLayer::new()