Skip to content

Commit

Permalink
fix(fortuna): retry blocks (#1579)
Browse files Browse the repository at this point in the history
* add retry blocks

* add comment

* add comment

* add processed
  • Loading branch information
Dev Kalra authored May 15, 2024
1 parent 0e62490 commit 23f8d02
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 16 deletions.
2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/fortuna/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fortuna"
version = "5.3.0"
version = "5.3.1"
edition = "2021"

[dependencies]
Expand Down
140 changes: 126 additions & 14 deletions apps/fortuna/src/keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,12 @@ use {
},
registry::Registry,
},
std::sync::{
atomic::AtomicU64,
Arc,
std::{
collections::HashMap,
sync::{
atomic::AtomicU64,
Arc,
},
},
tokio::{
spawn,
Expand Down Expand Up @@ -75,6 +78,8 @@ const BLOCK_BATCH_SIZE: u64 = 100;
const POLL_INTERVAL: Duration = Duration::from_secs(2);
/// Track metrics in this interval
const TRACK_INTERVAL: Duration = Duration::from_secs(10);
/// Rety last N blocks
const RETRY_PREVIOUS_BLOCKS: u64 = 100;

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct AccountLabel {
Expand All @@ -91,6 +96,7 @@ pub struct KeeperMetrics {
pub total_gas_spent: Family<AccountLabel, Gauge<f64, AtomicU64>>,
pub requests: Family<AccountLabel, Counter>,
pub requests_processed: Family<AccountLabel, Counter>,
pub requests_reprocessed: Family<AccountLabel, Counter>,
pub reveals: Family<AccountLabel, Counter>,
}

Expand Down Expand Up @@ -147,6 +153,12 @@ impl KeeperMetrics {
keeper_metrics.total_gas_spent.clone(),
);

writable_registry.register(
"requests_reprocessed",
"Number of requests reprocessed",
keeper_metrics.requests_reprocessed.clone(),
);

keeper_metrics
}
}
Expand All @@ -157,6 +169,16 @@ pub struct BlockRange {
pub to: BlockNumber,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestState {
/// Fulfilled means that the request was either revealed or we are sure we
/// will not be able to reveal it.
Fulfilled,
/// We have already processed the request but couldn't fulfill it and we are
/// unsure if we can fulfill it or not.
Processed,
}

/// Get the latest safe block number for the chain. Retry internally if there is an error.
async fn get_latest_safe_block(chain_state: &BlockchainState) -> BlockNumber {
loop {
Expand Down Expand Up @@ -203,6 +225,8 @@ pub async fn run_keeper_threads(
);
let keeper_address = contract.client().inner().inner().signer().address();

let fulfilled_requests_cache = Arc::new(RwLock::new(HashMap::<u64, RequestState>::new()));

// Spawn a thread to handle the events from last BACKLOG_RANGE blocks.
spawn(
process_backlog(
Expand All @@ -214,6 +238,7 @@ pub async fn run_keeper_threads(
chain_eth_config.gas_limit,
chain_state.clone(),
keeper_metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span(),
);
Expand All @@ -237,6 +262,7 @@ pub async fn run_keeper_threads(
Arc::clone(&contract),
chain_eth_config.gas_limit,
keeper_metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span(),
);
Expand Down Expand Up @@ -290,8 +316,13 @@ pub async fn process_event(
contract: &Arc<SignablePythContract>,
gas_limit: U256,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) -> Result<()> {
if chain_config.provider_address != event.provider_address {
fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Fulfilled);
return Ok(());
}
let provider_revelation = match chain_config.state.reveal(event.sequence_number) {
Expand All @@ -302,6 +333,10 @@ pub async fn process_event(
"Error while revealing with error: {:?}",
e
);
fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Fulfilled);
return Ok(());
}
};
Expand Down Expand Up @@ -330,6 +365,11 @@ pub async fn process_event(
sequence_number = &event.sequence_number,
"Gas estimate for reveal with callback is higher than the gas limit"
);

fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Fulfilled);
return Ok(());
}

Expand All @@ -354,6 +394,10 @@ pub async fn process_event(
// ever. We will return an Ok(()) to signal that we have processed this reveal
// and concluded that its Ok to not reveal.
_ => {
fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Processed);
tracing::error!(
sequence_number = &event.sequence_number,
"Error while revealing with error: {:?}",
Expand Down Expand Up @@ -399,6 +443,11 @@ pub async fn process_event(
address: chain_config.provider_address.to_string(),
})
.inc();

fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Fulfilled);
Ok(())
}
None => {
Expand All @@ -425,8 +474,12 @@ pub async fn process_event(
None => {
tracing::info!(
sequence_number = &event.sequence_number,
"Not processing event"
"Not fulfilling event"
);
fulfilled_requests_cache
.write()
.await
.insert(event.sequence_number, RequestState::Processed);
Ok(())
}
},
Expand All @@ -450,6 +503,7 @@ pub async fn process_block_range(
gas_limit: U256,
chain_state: api::BlockchainState,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) {
let BlockRange {
from: first_block,
Expand All @@ -462,6 +516,7 @@ pub async fn process_block_range(
to_block = last_block;
}

// TODO: this is handling all blocks sequentially we might want to handle them in parallel in future.
process_single_block_batch(
BlockRange {
from: current_block,
Expand All @@ -471,6 +526,7 @@ pub async fn process_block_range(
gas_limit,
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
Expand All @@ -480,14 +536,17 @@ pub async fn process_block_range(
}

/// Process a batch of blocks for a chain. It will fetch events for all the blocks in a single call for the provided batch
/// and then try to process them one by one. If the process fails, it will retry indefinitely.
/// and then try to process them one by one. It checks the `fulfilled_request_cache`. If the request was already fulfilled.
/// It won't reprocess it. If the request was already processed, it will reprocess it.
/// If the process fails, it will retry indefinitely.
#[tracing::instrument(name="batch", skip_all, fields(batch_from_block=block_range.from, batch_to_block=block_range.to))]
pub async fn process_single_block_batch(
block_range: BlockRange,
contract: Arc<SignablePythContract>,
gas_limit: U256,
chain_state: api::BlockchainState,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) {
loop {
let events_res = chain_state
Expand All @@ -499,6 +558,34 @@ pub async fn process_single_block_batch(
Ok(events) => {
tracing::info!(num_of_events = &events.len(), "Processing",);
for event in &events {
if let Some(state) = fulfilled_requests_cache
.read()
.await
.get(&event.sequence_number)
{
match state {
RequestState::Fulfilled => {
tracing::info!(
sequence_number = &event.sequence_number,
"Skipping already fulfilled request",
);
continue;
}
RequestState::Processed => {
tracing::info!(
sequence_number = &event.sequence_number,
"Reprocessing already processed request",
);
metrics
.requests_reprocessed
.get_or_create(&AccountLabel {
chain_id: chain_state.id.clone(),
address: chain_state.provider_address.to_string(),
})
.inc();
}
}
}
metrics
.requests
.get_or_create(&AccountLabel {
Expand All @@ -513,6 +600,7 @@ pub async fn process_single_block_batch(
&contract,
gas_limit,
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await
Expand Down Expand Up @@ -627,26 +715,40 @@ pub async fn watch_blocks(

let latest_safe_block = get_latest_safe_block(&chain_state).in_current_span().await;
if latest_safe_block > *last_safe_block_processed {
let mut from = latest_safe_block
.checked_sub(RETRY_PREVIOUS_BLOCKS)
.unwrap_or(0);

// In normal situation, the difference between latest and last safe block should not be more than 2-3 (for arbitrum it can be 10)
// TODO: add a metric for this in separate PR. We need alerts
// But in extreme situation, where we were unable to send the block range multiple times, the difference between latest_safe_block and
// last_safe_block_processed can grow. It is fine to not have the retry mechanisms for those earliest blocks as we expect the rpc
// to be in consistency after this much time.
if from > *last_safe_block_processed {
from = *last_safe_block_processed;
}
match tx
.send(BlockRange {
from: *last_safe_block_processed + 1,
to: latest_safe_block,
from,
to: latest_safe_block,
})
.await
{
Ok(_) => {
tracing::info!(
from_block = *last_safe_block_processed + 1,
from_block = from,
to_block = &latest_safe_block,
"Block range sent to handle events",
);
*last_safe_block_processed = latest_safe_block;
}
Err(e) => {
tracing::error!(
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
from_block = from,
to_block = &latest_safe_block,
"Error while sending block range to handle events. These will be handled in next call. error: {:?}",
e
);
}
};
}
Expand All @@ -661,6 +763,7 @@ pub async fn process_new_blocks(
contract: Arc<SignablePythContract>,
gas_limit: U256,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) {
tracing::info!("Waiting for new block ranges to process");
loop {
Expand All @@ -671,6 +774,7 @@ pub async fn process_new_blocks(
gas_limit,
chain_state.clone(),
metrics.clone(),
fulfilled_requests_cache.clone(),
)
.in_current_span()
.await;
Expand All @@ -686,11 +790,19 @@ pub async fn process_backlog(
gas_limit: U256,
chain_state: BlockchainState,
metrics: Arc<KeeperMetrics>,
fulfilled_requests_cache: Arc<RwLock<HashMap<u64, RequestState>>>,
) {
tracing::info!("Processing backlog");
process_block_range(backlog_range, contract, gas_limit, chain_state, metrics)
.in_current_span()
.await;
process_block_range(
backlog_range,
contract,
gas_limit,
chain_state,
metrics,
fulfilled_requests_cache,
)
.in_current_span()
.await;
tracing::info!("Backlog processed");
}

Expand Down

0 comments on commit 23f8d02

Please sign in to comment.