Skip to content

Commit

Permalink
Merge pull request #26 from samply/catalogue-extender
Browse files Browse the repository at this point in the history
Catalogue extender
  • Loading branch information
lablans committed Apr 4, 2024
2 parents e1ca059 + 870c2a1 commit 3a32d34
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 17 deletions.
46 changes: 32 additions & 14 deletions src/catalogue.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{collections::BTreeMap, time::Duration};
use std::{collections::BTreeMap, sync::Arc, time::Duration};

use reqwest::Url;
use serde_json::{json, Value};
use tracing::{debug, info};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};

pub type Criteria = BTreeMap<String, u64>;

Expand All @@ -16,39 +17,56 @@ fn get_element<'a>(count: &'a CriteriaGroups, key1: &'a str, key2: &'a str, key3
.and_then(|criteria| criteria.get(key3))
}

pub fn spawn_thing(catalogue_url: Url, prism_url: Url) -> Arc<Mutex<Value>> {
let thing: Arc<Mutex<Value>> = Arc::default();
let thing1 = thing.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
loop {
match get_extended_json(catalogue_url.clone(), prism_url.clone()).await {
Ok(new_value) => {
*thing1.lock().await = new_value;
info!("Updated Catalogue!");
tokio::time::sleep(Duration::from_secs(60 * 60)).await;
},
Err(err) => {
warn!("Failed to get thing: {err}.\n Retrying in 5s.");
tokio::time::sleep(Duration::from_secs(5)).await;
},
}
}
});

thing
}

pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Value {
pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Result<Value, reqwest::Error> {
debug!("Fetching catalogue from {catalogue_url} ...");

let resp = reqwest::Client::new()
.get(catalogue_url)
.timeout(Duration::from_secs(30))
.send()
.await
.expect("Unable to fetch catalogue from upstream; please check URL specified in config.");
.await?;

let mut json: Value = resp.json().await
.expect("Unable to parse catalogue from upstream; please check URL specified in config.");
let mut json: Value = resp.json().await?;

// tokio::time::sleep(Duration::from_secs(10)).await;

let prism_resp = reqwest::Client::new()
.post(format!("{}criteria", prism_url))
.header("Content-Type", "application/json")
.body("{\"sites\": []}")
.timeout(Duration::from_secs(300))
.send()
.await
.expect("Unable to fetch response from Prism; please check it's running.");
.await?;

let mut counts: CriteriaGroups = prism_resp.json().await
.expect("Unable to parse response from Prism into CriteriaGroups");
let mut counts: CriteriaGroups = prism_resp.json().await?;

recurse(&mut json, &mut counts); //TODO remove from counts once copied into catalogue to make it O(n log n)

info!("Catalogue built successfully.");

json
Ok(json)
}

/// Key order: group key (e.g. patient)
Expand Down Expand Up @@ -119,4 +137,4 @@ fn recurse(json: &mut Value, counts: &mut CriteriaGroups) {
},
_ => {}
}
}
}
17 changes: 14 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use axum::{
extract::{Json, Path, Query, State},
http::HeaderValue,
Expand All @@ -13,6 +15,7 @@ use once_cell::sync::Lazy;
use reqwest::{header, Method, StatusCode};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::Mutex;
use tower_http::cors::CorsLayer;
use tracing::{info, warn, Level};
use tracing_subscriber::{util::SubscriberInitExt, EnvFilter};
Expand All @@ -34,7 +37,7 @@ static BEAM_CLIENT: Lazy<BeamClient> = Lazy::new(|| {

#[derive(Clone)]
struct SharedState {
extended_json: Value,
extended_json: Arc<Mutex<Value>>,
}

#[tokio::main]
Expand All @@ -45,6 +48,13 @@ async fn main() {
.finish()
.init();

// TODO: Remove this workaround once clap manages to not choke on URL "".
if let Ok(var) = std::env::var("CATALOGUE_URL") {
if var.is_empty() {
std::env::remove_var("CATALOGUE_URL");
}
}

info!("{:#?}", Lazy::force(&CONFIG));

let cors = CorsLayer::new()
Expand All @@ -53,7 +63,7 @@ async fn main() {
.allow_headers([header::CONTENT_TYPE]);

let make_service = if let Some(url) = CONFIG.catalogue_url.clone() {
let extended_json = catalogue::get_extended_json(url, CONFIG.prism_url.clone()).await;
let extended_json = catalogue::spawn_thing(url, CONFIG.prism_url.clone());
let state = SharedState { extended_json };

let app = Router::new()
Expand Down Expand Up @@ -161,5 +171,6 @@ fn convert_response(response: reqwest::Response) -> axum::response::Response {
}

async fn handle_get_catalogue(State(state): State<SharedState>) -> Json<Value> {
Json(state.extended_json)
// TODO: We can totally avoid this clone by using axum_extra ErasedJson
Json(state.extended_json.lock().await.clone())
}

0 comments on commit 3a32d34

Please sign in to comment.