From 9714a851eb8172caf9a023bd577ce8a5a28c245b Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Wed, 27 Sep 2023 18:07:10 +0200 Subject: [PATCH] feat(hermes): Check price ids exist before each request This check will make rejects faster (and block invalid requests to benchmarks). The other benefit is that we can log the errors from the get_price_feeds_with_update_data since it should not fail anymore. --- hermes/Cargo.lock | 2 +- hermes/Cargo.toml | 2 +- hermes/src/aggregate.rs | 16 ++++---- hermes/src/api/rest.rs | 47 ++++++++++++++++++++--- hermes/src/api/rest/get_price_feed.rs | 14 ++++++- hermes/src/api/rest/get_vaa.rs | 14 ++++++- hermes/src/api/rest/get_vaa_ccip.rs | 14 ++++++- hermes/src/api/rest/latest_price_feeds.rs | 15 +++++++- hermes/src/api/rest/latest_vaas.rs | 15 +++++++- hermes/src/api/ws.rs | 18 +++++++-- hermes/src/state/benchmarks.rs | 4 +- 11 files changed, 131 insertions(+), 30 deletions(-) diff --git a/hermes/Cargo.lock b/hermes/Cargo.lock index d5512973e8..7f3a173615 100644 --- a/hermes/Cargo.lock +++ b/hermes/Cargo.lock @@ -1858,7 +1858,7 @@ checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" [[package]] name = "hermes" -version = "0.1.21" +version = "0.1.22" dependencies = [ "anyhow", "async-trait", diff --git a/hermes/Cargo.toml b/hermes/Cargo.toml index 2b97d06280..6b4a0eb742 100644 --- a/hermes/Cargo.toml +++ b/hermes/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.1.21" +version = "0.1.22" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/hermes/src/aggregate.rs b/hermes/src/aggregate.rs index 24b59de102..05c8a27666 100644 --- a/hermes/src/aggregate.rs +++ b/hermes/src/aggregate.rs @@ -313,7 +313,7 @@ async fn build_message_states( async fn get_verified_price_feeds( state: &S, - price_ids: Vec, + price_ids: &[PriceIdentifier], request_time: RequestTime, ) -> Result where @@ -373,14 +373,14 @@ where pub async fn get_price_feeds_with_update_data( state: &S, - price_ids: Vec, + price_ids: &[PriceIdentifier], request_time: RequestTime, ) -> Result where S: AggregateCache, S: Benchmarks, { - match get_verified_price_feeds(state, price_ids.clone(), request_time.clone()).await { + match get_verified_price_feeds(state, price_ids, request_time.clone()).await { Ok(price_feeds_with_update_data) => Ok(price_feeds_with_update_data), Err(e) => { if let RequestTime::FirstAfter(publish_time) = request_time { @@ -567,7 +567,7 @@ mod test { // price feed with correct update data. let price_feeds_with_update_data = get_price_feeds_with_update_data( &*state, - vec![PriceIdentifier::new([100; 32])], + &[PriceIdentifier::new([100; 32])], RequestTime::Latest, ) .await @@ -688,7 +688,7 @@ mod test { // Get the price feeds with update data let price_feeds_with_update_data = get_price_feeds_with_update_data( &*state, - vec![PriceIdentifier::new([100; 32])], + &[PriceIdentifier::new([100; 32])], RequestTime::Latest, ) .await @@ -753,7 +753,7 @@ mod test { for slot in 900..1000 { let price_feeds_with_update_data = get_price_feeds_with_update_data( &*state, - vec![ + &[ PriceIdentifier::new([100; 32]), PriceIdentifier::new([200; 32]), ], @@ -770,9 +770,9 @@ mod test { for slot in 0..900 { assert!(get_price_feeds_with_update_data( &*state, - vec![ + &[ PriceIdentifier::new([100; 32]), - PriceIdentifier::new([200; 32]), + PriceIdentifier::new([200; 32]) ], RequestTime::FirstAfter(slot as i64), ) diff --git a/hermes/src/api/rest.rs b/hermes/src/api/rest.rs index 69ce369193..6e80fcb909 100644 --- a/hermes/src/api/rest.rs +++ b/hermes/src/api/rest.rs @@ -1,9 +1,13 @@ -use axum::{ - http::StatusCode, - response::{ - IntoResponse, - Response, +use { + super::ApiState, + axum::{ + http::StatusCode, + response::{ + IntoResponse, + Response, + }, }, + pyth_sdk::PriceIdentifier, }; mod get_price_feed; @@ -32,6 +36,7 @@ pub enum RestError { UpdateDataNotFound, CcipUpdateDataNotFound, InvalidCCIPInput, + PriceIdsNotFound { missing_ids: Vec }, } impl IntoResponse for RestError { @@ -53,6 +58,38 @@ impl IntoResponse for RestError { RestError::InvalidCCIPInput => { (StatusCode::BAD_REQUEST, "Invalid CCIP input").into_response() } + RestError::PriceIdsNotFound { missing_ids } => { + let missing_ids = missing_ids + .into_iter() + .map(|id| id.to_string()) + .collect::>() + .join(", "); + + ( + StatusCode::NOT_FOUND, + format!("Price ids not found: {}", missing_ids), + ) + .into_response() + } } } } + +/// Verify that the price ids exist in the aggregate state. +pub async fn verify_price_ids_exist( + state: &ApiState, + price_ids: &[PriceIdentifier], +) -> Result<(), RestError> { + let all_ids = crate::aggregate::get_price_feed_ids(&*state.state).await; + let missing_ids = price_ids + .iter() + .filter(|id| !all_ids.contains(id)) + .cloned() + .collect::>(); + + if !missing_ids.is_empty() { + return Err(RestError::PriceIdsNotFound { missing_ids }); + } + + Ok(()) +} diff --git a/hermes/src/api/rest/get_price_feed.rs b/hermes/src/api/rest/get_price_feed.rs index 79c86a9073..0fbfa608b0 100644 --- a/hermes/src/api/rest/get_price_feed.rs +++ b/hermes/src/api/rest/get_price_feed.rs @@ -1,4 +1,5 @@ use { + super::verify_price_ids_exist, crate::{ aggregate::{ RequestTime, @@ -65,13 +66,22 @@ pub async fn get_price_feed( ) -> Result, RestError> { let price_id: PriceIdentifier = params.id.into(); + verify_price_ids_exist(&state, &[price_id]).await?; + let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( &*state.state, - vec![price_id], + &[price_id], RequestTime::FirstAfter(params.publish_time), ) .await - .map_err(|_| RestError::UpdateDataNotFound)?; + .map_err(|e| { + tracing::warn!( + "Error getting price feed {:?} with update data: {:?}", + price_id, + e + ); + RestError::UpdateDataNotFound + })?; let mut price_feed = price_feeds_with_update_data .price_feeds diff --git a/hermes/src/api/rest/get_vaa.rs b/hermes/src/api/rest/get_vaa.rs index f5cf697342..cae52125e0 100644 --- a/hermes/src/api/rest/get_vaa.rs +++ b/hermes/src/api/rest/get_vaa.rs @@ -1,4 +1,5 @@ use { + super::verify_price_ids_exist, crate::{ aggregate::{ get_price_feeds_with_update_data, @@ -73,13 +74,22 @@ pub async fn get_vaa( ) -> Result, RestError> { let price_id: PriceIdentifier = params.id.into(); + verify_price_ids_exist(&state, &[price_id]).await?; + let price_feeds_with_update_data = get_price_feeds_with_update_data( &*state.state, - vec![price_id], + &[price_id], RequestTime::FirstAfter(params.publish_time), ) .await - .map_err(|_| RestError::UpdateDataNotFound)?; + .map_err(|e| { + tracing::warn!( + "Error getting price feed {:?} with update data: {:?}", + price_id, + e + ); + RestError::UpdateDataNotFound + })?; let vaa = price_feeds_with_update_data .update_data diff --git a/hermes/src/api/rest/get_vaa_ccip.rs b/hermes/src/api/rest/get_vaa_ccip.rs index 88c9550770..c1a67ff941 100644 --- a/hermes/src/api/rest/get_vaa_ccip.rs +++ b/hermes/src/api/rest/get_vaa_ccip.rs @@ -1,4 +1,5 @@ use { + super::verify_price_ids_exist, crate::{ aggregate::{ RequestTime, @@ -70,13 +71,22 @@ pub async fn get_vaa_ccip( .map_err(|_| RestError::InvalidCCIPInput)?, ); + verify_price_ids_exist(&state, &[price_id]).await?; + let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( &*state.state, - vec![price_id], + &[price_id], RequestTime::FirstAfter(publish_time), ) .await - .map_err(|_| RestError::CcipUpdateDataNotFound)?; + .map_err(|e| { + tracing::warn!( + "Error getting price feed {:?} with update data: {:?}", + price_id, + e + ); + RestError::CcipUpdateDataNotFound + })?; let bytes = price_feeds_with_update_data .update_data diff --git a/hermes/src/api/rest/latest_price_feeds.rs b/hermes/src/api/rest/latest_price_feeds.rs index daa7eea00d..134f145704 100644 --- a/hermes/src/api/rest/latest_price_feeds.rs +++ b/hermes/src/api/rest/latest_price_feeds.rs @@ -1,4 +1,5 @@ use { + super::verify_price_ids_exist, crate::{ aggregate::RequestTime, api::{ @@ -63,13 +64,23 @@ pub async fn latest_price_feeds( QsQuery(params): QsQuery, ) -> Result>, RestError> { let price_ids: Vec = params.ids.into_iter().map(|id| id.into()).collect(); + + verify_price_ids_exist(&state, &price_ids).await?; + let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( &*state.state, - price_ids, + &price_ids, RequestTime::Latest, ) .await - .map_err(|_| RestError::UpdateDataNotFound)?; + .map_err(|e| { + tracing::warn!( + "Error getting price feeds {:?} with update data: {:?}", + price_ids, + e + ); + RestError::UpdateDataNotFound + })?; Ok(Json( price_feeds_with_update_data diff --git a/hermes/src/api/rest/latest_vaas.rs b/hermes/src/api/rest/latest_vaas.rs index 572dd7e3c8..72dc296dca 100644 --- a/hermes/src/api/rest/latest_vaas.rs +++ b/hermes/src/api/rest/latest_vaas.rs @@ -1,4 +1,5 @@ use { + super::verify_price_ids_exist, crate::{ aggregate::RequestTime, api::{ @@ -58,13 +59,23 @@ pub async fn latest_vaas( QsQuery(params): QsQuery, ) -> Result>, RestError> { let price_ids: Vec = params.ids.into_iter().map(|id| id.into()).collect(); + + verify_price_ids_exist(&state, &price_ids).await?; + let price_feeds_with_update_data = crate::aggregate::get_price_feeds_with_update_data( &*state.state, - price_ids, + &price_ids, RequestTime::Latest, ) .await - .map_err(|_| RestError::UpdateDataNotFound)?; + .map_err(|e| { + tracing::warn!( + "Error getting price feeds {:?} with update data: {:?}", + price_ids, + e + ); + RestError::UpdateDataNotFound + })?; Ok(Json( price_feeds_with_update_data diff --git a/hermes/src/api/ws.rs b/hermes/src/api/ws.rs index 71b37144c2..e71dcb3354 100644 --- a/hermes/src/api/ws.rs +++ b/hermes/src/api/ws.rs @@ -209,13 +209,25 @@ impl Subscriber { } async fn handle_price_feeds_update(&mut self, event: AggregationEvent) -> Result<()> { - let price_feed_ids = self.price_feeds_with_config.keys().cloned().collect(); + let price_feed_ids = self + .price_feeds_with_config + .keys() + .cloned() + .collect::>(); for update in crate::aggregate::get_price_feeds_with_update_data( &*self.store, - price_feed_ids, + &price_feed_ids, RequestTime::AtSlot(event.slot()), ) - .await? + .await + .map_err(|e| { + tracing::warn!( + "Failed to get price feeds {:?} with update data: {:?}", + price_feed_ids, + e + ); + e + })? .price_feeds { let config = self diff --git a/hermes/src/state/benchmarks.rs b/hermes/src/state/benchmarks.rs index 14969e7f1c..55f170698f 100644 --- a/hermes/src/state/benchmarks.rs +++ b/hermes/src/state/benchmarks.rs @@ -80,7 +80,7 @@ impl TryFrom for PriceFeedsWithUpdateData { pub trait Benchmarks { async fn get_verified_price_feeds( &self, - price_ids: Vec, + price_ids: &[PriceIdentifier], publish_time: UnixTimestamp, ) -> Result; } @@ -89,7 +89,7 @@ pub trait Benchmarks { impl Benchmarks for crate::state::State { async fn get_verified_price_feeds( &self, - price_ids: Vec, + price_ids: &[PriceIdentifier], publish_time: UnixTimestamp, ) -> Result { let endpoint = self