From 826a3a99afcd1ef58bf412e4d87e406c9168d2b0 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 1 Mar 2024 16:41:32 +0100 Subject: [PATCH] changed query struct, optimization by Threated --- Cargo.toml | 2 +- src/beam.rs | 3 ++- src/config.rs | 14 ++++++++---- src/errors.rs | 4 +++- src/main.rs | 63 ++++++++++++++++++++++++++++++--------------------- 5 files changed, 52 insertions(+), 34 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3794974..397639a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ tracing-subscriber = { version = "0.3.11", default_features = false, features = once_cell = "1.18" # Command Line Interface -clap = { version = "4.0", default_features = false, features = ["std", "env", "derive", "help"] } +clap = { version = "4.0", features = ["std", "env", "derive", "help"] } [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/src/beam.rs b/src/beam.rs index 5c621a8..1c9ee5c 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -2,6 +2,7 @@ use crate::config::CONFIG; use beam_lib::{AppId, MsgId, RawString, TaskRequest}; pub fn create_beam_task(target_sites: Vec) -> TaskRequest { + let target = &CONFIG.target; let id = MsgId::new(); let proxy_id = &CONFIG.beam_app_id_long.proxy_id(); let broker_id = proxy_id @@ -11,7 +12,7 @@ pub fn create_beam_task(target_sites: Vec) -> TaskRequest { .1; let to = target_sites .iter() - .map(|site| AppId::new_unchecked(format!("focus.{site}.{broker_id}"))) + .map(|site| AppId::new_unchecked(format!("{target}.{site}.{broker_id}"))) .collect(); let metadata = { serde_json::json!({ diff --git a/src/config.rs b/src/config.rs index 090107a..0268df4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -66,6 +66,9 @@ struct CliArgs { #[clap(long, env, value_parser)] auth_header: Option, + /// Target_application_name + #[clap(long, env, value_parser, default_value = "focus")] + target: String, } #[derive(Debug)] @@ -79,8 +82,8 @@ pub(crate) struct Config { pub project: String, pub bind_addr: SocketAddr, pub auth_header: Option, - pub query: String - + pub query: String, + pub target: String, } impl Config { @@ -98,17 +101,18 @@ impl Config { bind_addr: cli_args.bind_addr, auth_header: cli_args.auth_header, query: get_query(), + target: cli_args.target, }; Ok(config) } } fn get_query() -> String { - let query_file_name = format!("../resources/query_{}.encoded", CliArgs::parse().project); - fs::read_to_string(&query_file_name).unwrap_or_else(|_| panic!("File {} can't be read", &query_file_name)) + let query_file_name = format!("resources/query_{}.encoded", CliArgs::parse().project); + fs::read_to_string(&query_file_name) + .unwrap_or_else(|_| panic!("File {} can't be read", &query_file_name)) } - fn parse_cors(v: &str) -> Result { if v == "*" || v.to_lowercase() == "any" { Ok(AllowOrigin::any()) diff --git a/src/errors.rs b/src/errors.rs index aa3412c..a6b6d3e 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -11,11 +11,13 @@ pub enum PrismError { #[error("Beam error: {0}")] BeamError(String), #[error("CQL tampered with: {0}")] - DeserializationError(String), + DeserializationError(serde_json::Error), #[error("Serialization error: {0}")] SerializationError(String), #[error("Invalid Header Value: {0}")] InvalidHeaderValue(http::header::InvalidHeaderValue), #[error("Decode error: {0}")] DecodeError(base64::DecodeError), + #[error("Unexpected WorkStatus: {0:?}")] + UnexpectedWorkStatusError(beam_lib::WorkStatus), } diff --git a/src/main.rs b/src/main.rs index 2ec2c6f..0ca03a0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -29,15 +29,13 @@ use base64::Engine as _; use once_cell::sync::Lazy; use reqwest::Method; use serde::{Deserialize, Serialize}; -use tracing_subscriber::util::SubscriberInitExt; use beam::create_beam_task; use beam_lib::{AppId, BeamClient, MsgId}; use criteria::{combine_groups_of_criteria_groups, CriteriaGroups}; use std::{collections::HashMap, time::Duration}; use tower_http::cors::CorsLayer; -use tracing::{error, info, warn, Level}; -use tracing_subscriber::EnvFilter; +use tracing::{error, info, warn}; use beam_lib::{RawString, TaskResult}; @@ -51,10 +49,7 @@ static BEAM_CLIENT: Lazy = Lazy::new(|| { #[derive(Serialize, Deserialize, Clone, Debug)] struct LensQuery { - // kept it the same as in spot, but only sites needed - id: MsgId, // prism ignores this and creates its own - sites: Vec, //TODO: coordinate with Lens team to introduce a new type of lens query with sites only - query: String, // prism ignores this and uses the default query for the project + sites: Vec, } type Site = String; @@ -77,7 +72,7 @@ pub async fn main() { /* 🏳️‍🌈⃤ Prism returns cumulative positive numbers of each of individual criteria defined in CQL queries and Measures at sites it is queried about. - Prism doesn't return all the search criteria in the search tree and is not a replacement for MDR. + Prism doesn't return all the search criteria in the search tree and is not a replacement for MDR. It doesn't return criteria for which there are no results. It can't return results for range types. It is not crucial that the counts are current or that they include all the BHs, speed of drawing lens is more important. @@ -93,7 +88,7 @@ pub async fn main() { cache: HashMap::new(), }; - let sites_to_query: HashSet = HashSet::new(); + let sites_to_query: HashSet = HashSet::new(); //accumulates sites to query, those for which Lens asked for criteria, and they either weren't cached or the cache had expired, emptied when task to sites sent let shared_state = SharedState { @@ -105,18 +100,12 @@ pub async fn main() { error!("Cannot initialize logger: {}", e); exit(1); }; - tracing_subscriber::FmtSubscriber::builder() - .with_max_level(Level::DEBUG) - .with_env_filter(EnvFilter::from_default_env()) - .finish() - .init(); - info!("{:#?}", &CONFIG); if let Err(e) = wait_for_beam_proxy().await { error!("Beam doesn't work, it doesn't make sense that I run: {}", e); exit(2); } - + spawn_site_querying(shared_state.clone()); let cors = CorsLayer::new() @@ -236,8 +225,6 @@ async fn query_sites( } async fn get_results(shared_state: SharedState, task_id: MsgId) -> Result<(), PrismError> { - let criteria_cache: &mut tokio::sync::MutexGuard<'_, CriteriaCache> = - &mut shared_state.criteria_cache.lock().await; let resp = BEAM_CLIENT .raw_beam_request( Method::GET, @@ -268,6 +255,16 @@ async fn get_results(shared_state: SharedState, task_id: MsgId) -> Result<(), Pr while let Some(Ok(async_sse::Event::Message(msg))) = stream.next().await { let (from, measure_report) = match decode_result(&msg) { Ok(v) => v, + Err(PrismError::UnexpectedWorkStatusError(beam_lib::WorkStatus::Claimed)) => { + info!("Task claimed:) {msg:?}"); + continue; + } + Err(PrismError::UnexpectedWorkStatusError( + beam_lib::WorkStatus::PermFailed | beam_lib::WorkStatus::TempFailed, + )) => { + warn!("WorkStatus PermFailed: {msg:?}"); + continue; + } Err(e) => { warn!("Failed to deserialize message {msg:?} into a result: {e}"); continue; @@ -280,26 +277,40 @@ async fn get_results(shared_state: SharedState, task_id: MsgId) -> Result<(), Pr continue; } }; - criteria_cache.cache.insert( + shared_state.criteria_cache.lock().await.cache.insert( //if successful caching the criteria - from.app_name().into(), // extracting site name from app long name + from.as_ref().split('.').nth(1).unwrap().to_string(), // extracting site name from app long name (criteria, std::time::SystemTime::now()), ); } Ok(()) } -fn decode_result(msg: &async_sse::Message) -> anyhow::Result<(AppId, MeasureReport)> { - let result: TaskResult = serde_json::from_slice(msg.data())?; - let decoded = BASE64.decode(result.body.0)?; - Ok((result.from, serde_json::from_slice(&decoded)?)) +fn decode_result(msg: &async_sse::Message) -> Result<(AppId, MeasureReport), PrismError> { + let result: TaskResult = + serde_json::from_slice(msg.data()).map_err(PrismError::DeserializationError)?; + match result.status { + beam_lib::WorkStatus::Succeeded => {} + yep => { + // claimed not an error!!!! + return Err(PrismError::UnexpectedWorkStatusError(yep)); + } + } + let decoded = BASE64 + .decode(result.body.0) + .map_err(PrismError::DecodeError)?; + Ok(( + result.from, + serde_json::from_slice(&decoded).map_err(PrismError::DeserializationError)?, + )) } async fn wait_for_beam_proxy() -> beam_lib::Result<()> { - const MAX_RETRIES: u8 = 32; + const MAX_RETRIES: u8 = 3; let mut tries = 1; loop { - match reqwest::get(format!("{}/v1/health", CONFIG.beam_proxy_url)).await { + match reqwest::get(format!("http://localhost:8082/v1/health")).await { + //FIXME why doesn't it work with url from config Ok(res) if res.status() == StatusCode::OK => return Ok(()), _ if tries <= MAX_RETRIES => tries += 1, Err(e) => return Err(e.into()),