Skip to content

Commit

Permalink
changed query struct, optimization by Threated
Browse files Browse the repository at this point in the history
  • Loading branch information
enola-dkfz committed Mar 1, 2024
1 parent 0720c57 commit 826a3a9
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ tracing-subscriber = { version = "0.3.11", default_features = false, features =
once_cell = "1.18"

# Command Line Interface
clap = { version = "4.0", default_features = false, features = ["std", "env", "derive", "help"] }
clap = { version = "4.0", features = ["std", "env", "derive", "help"] }

[dev-dependencies]
pretty_assertions = "1.4.0"
Expand Down
3 changes: 2 additions & 1 deletion src/beam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::config::CONFIG;
use beam_lib::{AppId, MsgId, RawString, TaskRequest};

pub fn create_beam_task(target_sites: Vec<String>) -> TaskRequest<RawString> {
let target = &CONFIG.target;
let id = MsgId::new();
let proxy_id = &CONFIG.beam_app_id_long.proxy_id();
let broker_id = proxy_id
Expand All @@ -11,7 +12,7 @@ pub fn create_beam_task(target_sites: Vec<String>) -> TaskRequest<RawString> {
.1;
let to = target_sites
.iter()
.map(|site| AppId::new_unchecked(format!("focus.{site}.{broker_id}")))
.map(|site| AppId::new_unchecked(format!("{target}.{site}.{broker_id}")))
.collect();
let metadata = {
serde_json::json!({
Expand Down
14 changes: 9 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ struct CliArgs {
#[clap(long, env, value_parser)]
auth_header: Option<String>,

/// Target_application_name
#[clap(long, env, value_parser, default_value = "focus")]
target: String,
}

#[derive(Debug)]
Expand All @@ -79,8 +82,8 @@ pub(crate) struct Config {
pub project: String,
pub bind_addr: SocketAddr,
pub auth_header: Option<String>,
pub query: String

pub query: String,
pub target: String,
}

impl Config {
Expand All @@ -98,17 +101,18 @@ impl Config {
bind_addr: cli_args.bind_addr,
auth_header: cli_args.auth_header,
query: get_query(),
target: cli_args.target,
};
Ok(config)
}
}

fn get_query() -> String {
let query_file_name = format!("../resources/query_{}.encoded", CliArgs::parse().project);
fs::read_to_string(&query_file_name).unwrap_or_else(|_| panic!("File {} can't be read", &query_file_name))
let query_file_name = format!("resources/query_{}.encoded", CliArgs::parse().project);
fs::read_to_string(&query_file_name)
.unwrap_or_else(|_| panic!("File {} can't be read", &query_file_name))
}


fn parse_cors(v: &str) -> Result<AllowOrigin, http::header::InvalidHeaderValue> {
if v == "*" || v.to_lowercase() == "any" {
Ok(AllowOrigin::any())
Expand Down
4 changes: 3 additions & 1 deletion src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ pub enum PrismError {
#[error("Beam error: {0}")]
BeamError(String),
#[error("CQL tampered with: {0}")]
DeserializationError(String),
DeserializationError(serde_json::Error),
#[error("Serialization error: {0}")]
SerializationError(String),
#[error("Invalid Header Value: {0}")]
InvalidHeaderValue(http::header::InvalidHeaderValue),
#[error("Decode error: {0}")]
DecodeError(base64::DecodeError),
#[error("Unexpected WorkStatus: {0:?}")]
UnexpectedWorkStatusError(beam_lib::WorkStatus),
}
63 changes: 37 additions & 26 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,13 @@ use base64::Engine as _;
use once_cell::sync::Lazy;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use tracing_subscriber::util::SubscriberInitExt;

use beam::create_beam_task;
use beam_lib::{AppId, BeamClient, MsgId};
use criteria::{combine_groups_of_criteria_groups, CriteriaGroups};
use std::{collections::HashMap, time::Duration};
use tower_http::cors::CorsLayer;
use tracing::{error, info, warn, Level};
use tracing_subscriber::EnvFilter;
use tracing::{error, info, warn};

use beam_lib::{RawString, TaskResult};

Expand All @@ -51,10 +49,7 @@ static BEAM_CLIENT: Lazy<BeamClient> = Lazy::new(|| {

#[derive(Serialize, Deserialize, Clone, Debug)]
struct LensQuery {
// kept it the same as in spot, but only sites needed
id: MsgId, // prism ignores this and creates its own
sites: Vec<String>, //TODO: coordinate with Lens team to introduce a new type of lens query with sites only
query: String, // prism ignores this and uses the default query for the project
sites: Vec<String>,
}

type Site = String;
Expand All @@ -77,7 +72,7 @@ pub async fn main() {
/*
🏳️‍🌈⃤
Prism returns cumulative positive numbers of each of individual criteria defined in CQL queries and Measures at sites it is queried about.
Prism doesn't return all the search criteria in the search tree and is not a replacement for MDR.
Prism doesn't return all the search criteria in the search tree and is not a replacement for MDR.
It doesn't return criteria for which there are no results. It can't return results for range types.
It is not crucial that the counts are current or that they include all the BHs, speed of drawing lens is more important.
Expand All @@ -93,7 +88,7 @@ pub async fn main() {
cache: HashMap::new(),
};

let sites_to_query: HashSet<String> = HashSet::new();
let sites_to_query: HashSet<String> = HashSet::new();
//accumulates sites to query, those for which Lens asked for criteria, and they either weren't cached or the cache had expired, emptied when task to sites sent

let shared_state = SharedState {
Expand All @@ -105,18 +100,12 @@ pub async fn main() {
error!("Cannot initialize logger: {}", e);
exit(1);
};
tracing_subscriber::FmtSubscriber::builder()
.with_max_level(Level::DEBUG)
.with_env_filter(EnvFilter::from_default_env())
.finish()
.init();
info!("{:#?}", &CONFIG);

if let Err(e) = wait_for_beam_proxy().await {
error!("Beam doesn't work, it doesn't make sense that I run: {}", e);
exit(2);
}

spawn_site_querying(shared_state.clone());

let cors = CorsLayer::new()
Expand Down Expand Up @@ -236,8 +225,6 @@ async fn query_sites(
}

async fn get_results(shared_state: SharedState, task_id: MsgId) -> Result<(), PrismError> {
let criteria_cache: &mut tokio::sync::MutexGuard<'_, CriteriaCache> =
&mut shared_state.criteria_cache.lock().await;
let resp = BEAM_CLIENT
.raw_beam_request(
Method::GET,
Expand Down Expand Up @@ -268,6 +255,16 @@ async fn get_results(shared_state: SharedState, task_id: MsgId) -> Result<(), Pr
while let Some(Ok(async_sse::Event::Message(msg))) = stream.next().await {
let (from, measure_report) = match decode_result(&msg) {
Ok(v) => v,
Err(PrismError::UnexpectedWorkStatusError(beam_lib::WorkStatus::Claimed)) => {
info!("Task claimed:) {msg:?}");
continue;
}
Err(PrismError::UnexpectedWorkStatusError(
beam_lib::WorkStatus::PermFailed | beam_lib::WorkStatus::TempFailed,
)) => {
warn!("WorkStatus PermFailed: {msg:?}");
continue;
}
Err(e) => {
warn!("Failed to deserialize message {msg:?} into a result: {e}");
continue;
Expand All @@ -280,26 +277,40 @@ async fn get_results(shared_state: SharedState, task_id: MsgId) -> Result<(), Pr
continue;
}
};
criteria_cache.cache.insert(
shared_state.criteria_cache.lock().await.cache.insert(
//if successful caching the criteria
from.app_name().into(), // extracting site name from app long name
from.as_ref().split('.').nth(1).unwrap().to_string(), // extracting site name from app long name
(criteria, std::time::SystemTime::now()),
);
}
Ok(())
}

fn decode_result(msg: &async_sse::Message) -> anyhow::Result<(AppId, MeasureReport)> {
let result: TaskResult<RawString> = serde_json::from_slice(msg.data())?;
let decoded = BASE64.decode(result.body.0)?;
Ok((result.from, serde_json::from_slice(&decoded)?))
fn decode_result(msg: &async_sse::Message) -> Result<(AppId, MeasureReport), PrismError> {
let result: TaskResult<RawString> =
serde_json::from_slice(msg.data()).map_err(PrismError::DeserializationError)?;
match result.status {
beam_lib::WorkStatus::Succeeded => {}
yep => {
// claimed not an error!!!!
return Err(PrismError::UnexpectedWorkStatusError(yep));
}
}
let decoded = BASE64
.decode(result.body.0)
.map_err(PrismError::DecodeError)?;
Ok((
result.from,
serde_json::from_slice(&decoded).map_err(PrismError::DeserializationError)?,
))
}

async fn wait_for_beam_proxy() -> beam_lib::Result<()> {
const MAX_RETRIES: u8 = 32;
const MAX_RETRIES: u8 = 3;
let mut tries = 1;
loop {
match reqwest::get(format!("{}/v1/health", CONFIG.beam_proxy_url)).await {
match reqwest::get(format!("http://localhost:8082/v1/health")).await {
//FIXME why doesn't it work with url from config
Ok(res) if res.status() == StatusCode::OK => return Ok(()),
_ if tries <= MAX_RETRIES => tries += 1,
Err(e) => return Err(e.into()),
Expand Down

0 comments on commit 826a3a9

Please sign in to comment.