Skip to content

Commit

Permalink
Merge pull request #25 from samply/catalogue-extender
Browse files Browse the repository at this point in the history
Catalogue extender
  • Loading branch information
lablans committed Apr 4, 2024
2 parents 9021d2a + 9714a59 commit e1ca059
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rustyspot"
version = "0.2.0"
version = "0.2.1"
edition = "2021"
license = "Apache-2.0"
documentation = "https://github.com/samply/spot"
Expand Down
65 changes: 24 additions & 41 deletions src/catalogue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{collections::BTreeMap, time::Duration};

use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use tracing::{debug, info};

Expand All @@ -11,18 +10,13 @@ pub type CriteriaGroup = BTreeMap<String, Criteria>;

pub type CriteriaGroups = BTreeMap<String, CriteriaGroup>;

fn get_element<'a>(
count: &'a CriteriaGroups,
key1: &'a str,
key2: &'a str,
key3: &'a str,
) -> Option<&'a u64> {
count
.get(key1)
fn get_element<'a>(count: &'a CriteriaGroups, key1: &'a str, key2: &'a str, key3: &'a str) -> Option<&'a u64> {
count.get(key1)
.and_then(|group| group.get(key2))
.and_then(|criteria| criteria.get(key3))
}


pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Value {
debug!("Fetching catalogue from {catalogue_url} ...");

Expand All @@ -33,11 +27,11 @@ pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Value {
.await
.expect("Unable to fetch catalogue from upstream; please check URL specified in config.");

let mut json: Value = resp
.json()
.await
let mut json: Value = resp.json().await
.expect("Unable to parse catalogue from upstream; please check URL specified in config.");

// tokio::time::sleep(Duration::from_secs(10)).await;

let prism_resp = reqwest::Client::new()
.post(format!("{}criteria", prism_url))
.header("Content-Type", "application/json")
Expand All @@ -47,9 +41,7 @@ pub async fn get_extended_json(catalogue_url: Url, prism_url: Url) -> Value {
.await
.expect("Unable to fetch response from Prism; please check it's running.");

let mut counts: CriteriaGroups = prism_resp
.json()
.await
let mut counts: CriteriaGroups = prism_resp.json().await
.expect("Unable to parse response from Prism into CriteriaGroups");

recurse(&mut json, &mut counts); //TODO remove from counts once copied into catalogue to make it O(n log n)
Expand All @@ -68,9 +60,10 @@ fn recurse(json: &mut Value, counts: &mut CriteriaGroups) {
for ele in arr {
recurse(ele, counts);
}
}
},
Value::Object(obj) => {
if !obj.contains_key("childCategories") {

if ! obj.contains_key("childCategories") {
for (_key, child_val) in obj.iter_mut() {
recurse(child_val, counts);
}
Expand All @@ -79,15 +72,10 @@ fn recurse(json: &mut Value, counts: &mut CriteriaGroups) {
.expect("Got JSON where a criterion key was not a string. Please check json.").to_owned();

//TODO consolidate catalogue and MeasureReport group names
let group_key = if group_key == "patient" {
"patients"
} else if group_key == "tumor_classification" {
"diagnosis"
} else if group_key == "biosamples" {
"specimen"
} else {
&group_key
};
let group_key = if group_key == "patient" {"patients"}
else if group_key == "tumor_classification" {"diagnosis"}
else if group_key == "biosamples" {"specimen"}
else {&group_key};

let children_cats = obj
.get_mut("childCategories")
Expand All @@ -96,7 +84,7 @@ fn recurse(json: &mut Value, counts: &mut CriteriaGroups) {
.unwrap()
.iter_mut()
.filter(|item| item.get("type").unwrap_or(&Value::Null) == "EQUALS");

for child_cat in children_cats {
let stratifier_key = child_cat.get("key").expect("Got JSON element with childCategory that does not contain a (stratifier) key. Please check json.").as_str()
.expect("Got JSON where a criterion key was not a string. Please check json.").to_owned();
Expand All @@ -108,32 +96,27 @@ fn recurse(json: &mut Value, counts: &mut CriteriaGroups) {
.expect("Got JSON element with childCategory with criteria that are not an array. Please check json.");

for criterion in criteria {
let criterion = criterion.as_object_mut().expect(
"Got JSON where a criterion was not an object. Please check json.",
);
let criterion = criterion.as_object_mut()
.expect("Got JSON where a criterion was not an object. Please check json.");
let stratum_key = criterion.get("key")
.expect("Got JSON where a criterion did not have a key. Please check json.")
.as_str()
.expect("Got JSON where a criterion key was not a string. Please check json.");

let count_from_prism =
get_element(counts, &group_key, &stratifier_key, stratum_key);

let count_from_prism = get_element(counts, &group_key, &stratifier_key, stratum_key);

match count_from_prism {
Some(count) => {
criterion.insert("count".into(), json!(count));
}
},
None => {
debug!(
"No count from Prism for {}, {}, {}",
group_key, stratifier_key, stratum_key
);
debug!("No count from Prism for {}, {}, {}", group_key, stratifier_key, stratum_key);
}
}
}
}
}
}
}
},
_ => {}
}
}
}
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub struct Config {

/// URL to catalogue.json file
#[clap(long, env)]
pub catalogue_url: Url,
pub catalogue_url: Option<Url>,

/// URL to prism
#[clap(long, env, default_value= "http://localhost:8066")]
Expand Down
39 changes: 25 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,40 @@ async fn main() {

info!("{:#?}", Lazy::force(&CONFIG));

let extended_json =
catalogue::get_extended_json(CONFIG.catalogue_url.clone(), CONFIG.prism_url.clone()).await;
let state = SharedState { extended_json };

// TODO: Add check for reachability of beam-proxy

let cors = CorsLayer::new()
.allow_methods([Method::GET, Method::POST])
.allow_origin(CONFIG.cors_origin.clone())
.allow_headers([header::CONTENT_TYPE]);

let app = Router::new()
.route("/beam", post(handle_create_beam_task))
.route("/beam/:task_id", get(handle_listen_to_beam_tasks))
.route("/catalogue", get(handle_get_catalogue))
.with_state(state)
.layer(axum::middleware::map_response(banner::set_server_header))
.layer(cors);
let make_service = if let Some(url) = CONFIG.catalogue_url.clone() {
let extended_json = catalogue::get_extended_json(url, CONFIG.prism_url.clone()).await;
let state = SharedState { extended_json };

let app = Router::new()
.route("/beam", post(handle_create_beam_task))
.route("/beam/:task_id", get(handle_listen_to_beam_tasks))
.route("/catalogue", get(handle_get_catalogue))
.with_state(state)
.layer(axum::middleware::map_response(banner::set_server_header))
.layer(cors);

app.into_make_service()
} else {
let app = Router::new()
.route("/beam", post(handle_create_beam_task))
.route("/beam/:task_id", get(handle_listen_to_beam_tasks))
.layer(axum::middleware::map_response(banner::set_server_header))
.layer(cors);

app.into_make_service()
};

// TODO: Add check for reachability of beam-proxy

banner::print_banner();

axum::Server::bind(&CONFIG.bind_addr)
.serve(app.into_make_service())
.serve(make_service)
.await
.unwrap();
}
Expand Down

0 comments on commit e1ca059

Please sign in to comment.