Skip to content

Commit

Permalink
Improve Keeper Reliability (#19)
Browse files Browse the repository at this point in the history
Improves keeper reliability for landing transactions by:
* no longer skipping transactions that initially fail with a
BlockhashNotFound error
* Merging vote accounts with all validator history accounts for epoch
credit cranking, so offline validators that no longer showing up in
getVoteAccounts still get cranked (necessary for Steward program
scoring)

The first part has already been running on mainnet for a week, and for
the last 3 epochs, all vote accounts have been updated each epoch (as
measured by: same number of commissions tracked as stakes).
  • Loading branch information
ebatsell authored Feb 7, 2024
1 parent 6236581 commit 3d34220
Show file tree
Hide file tree
Showing 8 changed files with 449 additions and 417 deletions.
381 changes: 168 additions & 213 deletions keepers/keeper-core/src/lib.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion keepers/validator-keeper/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub async fn update_cluster_info(
client: Arc<RpcClient>,
keypair: Arc<Keypair>,
program_id: &Pubkey,
) -> Result<SubmitStats, (TransactionExecutionError, SubmitStats)> {
) -> Result<SubmitStats, TransactionExecutionError> {
let (cluster_history_account, _) =
Pubkey::find_program_address(&[ClusterHistory::SEED], program_id);

Expand Down
36 changes: 8 additions & 28 deletions keepers/validator-keeper/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub async fn upload_gossip_values(
keypair: Arc<Keypair>,
entrypoint: SocketAddr,
program_id: &Pubkey,
) -> Result<CreateUpdateStats, (Box<dyn std::error::Error>, CreateUpdateStats)> {
) -> Result<CreateUpdateStats, Box<dyn std::error::Error>> {
let gossip_port = 0;

let spy_socket_addr = SocketAddr::new(
Expand All @@ -261,19 +261,13 @@ pub async fn upload_gossip_values(
let (_gossip_service, cluster_info) =
start_spy_server(entrypoint, gossip_port, spy_socket_addr, &keypair, &exit);

let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None)
.await
.map_err(|e| (e.into(), CreateUpdateStats::default()))?;
let vote_accounts = get_vote_accounts_with_retry(&client, MIN_VOTE_EPOCHS, None).await?;

// Wait for all active validators to be received
sleep(Duration::from_secs(30)).await;

let gossip_entries = {
let crds = cluster_info
.gossip
.crds
.read()
.map_err(|e| (e.to_string().into(), CreateUpdateStats::default()))?;
let crds = cluster_info.gossip.crds.read().map_err(|e| e.to_string())?;

vote_accounts
.iter()
Expand All @@ -290,9 +284,7 @@ pub async fn upload_gossip_values(
.iter()
.map(|a| a.address())
.collect::<Vec<Pubkey>>();
let existing_accounts_response = get_multiple_accounts_batched(&addresses, &client)
.await
.map_err(|e| (e.into(), CreateUpdateStats::default()))?;
let existing_accounts_response = get_multiple_accounts_batched(&addresses, &client).await?;

let create_transactions = existing_accounts_response
.iter()
Expand All @@ -311,22 +303,10 @@ pub async fn upload_gossip_values(
.map(|entry| entry.build_update_tx())
.collect::<Vec<_>>();

let mut stats = CreateUpdateStats::default();
stats.creates = submit_transactions(&client, create_transactions, &keypair)
.await
.map_err(|(e, submit_stats)| {
stats.creates = submit_stats;
(e.into(), stats)
})?;

stats.updates = submit_transactions(&client, update_transactions, &keypair)
.await
.map_err(|(e, submit_stats)| {
stats.updates = submit_stats;
(e.into(), stats)
})?;

Ok(stats)
Ok(CreateUpdateStats {
creates: submit_transactions(&client, create_transactions, &keypair).await?,
updates: submit_transactions(&client, update_transactions, &keypair).await?,
})
}

// CODE BELOW SLIGHTLY MODIFIED FROM
Expand Down
96 changes: 72 additions & 24 deletions keepers/validator-keeper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ use std::{
};

use anchor_lang::{AccountDeserialize, Discriminator};
use keeper_core::{CreateUpdateStats, SubmitStats};
use keeper_core::{
get_vote_accounts_with_retry, CreateUpdateStats, MultipleAccountsError, SubmitStats,
TransactionExecutionError,
};
use log::error;
use solana_account_decoder::UiDataSliceConfig;
use solana_client::{
client_error::ClientError,
nonblocking::rpc_client::RpcClient,
rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig},
rpc_filter::{Memcmp, RpcFilterType},
Expand All @@ -25,7 +29,10 @@ use solana_sdk::{
use solana_streamer::socket::SocketAddrSpace;

use jito_tip_distribution::state::TipDistributionAccount;
use validator_history::{ClusterHistory, ValidatorHistory, ValidatorHistoryEntry};
use thiserror::Error as ThisError;
use validator_history::{
constants::MIN_VOTE_EPOCHS, ClusterHistory, ValidatorHistory, ValidatorHistoryEntry,
};

pub mod cluster_info;
pub mod gossip;
Expand All @@ -35,6 +42,18 @@ pub mod vote_account;

pub type Error = Box<dyn std::error::Error>;

#[derive(ThisError, Debug)]
pub enum KeeperError {
#[error(transparent)]
ClientError(#[from] ClientError),
#[error(transparent)]
TransactionExecutionError(#[from] TransactionExecutionError),
#[error(transparent)]
MultipleAccountsError(#[from] MultipleAccountsError),
#[error("Custom: {0}")]
Custom(String),
}

pub async fn get_tip_distribution_accounts(
rpc_client: &RpcClient,
tip_distribution_program: &Pubkey,
Expand Down Expand Up @@ -116,28 +135,7 @@ pub async fn emit_validator_history_metrics(
) -> Result<(), Box<dyn std::error::Error>> {
let epoch = client.get_epoch_info().await?;

// Fetch every ValidatorHistory account
let gpa_config = RpcProgramAccountsConfig {
filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
0,
ValidatorHistory::discriminator().into(),
))]),
account_config: RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
..RpcAccountInfoConfig::default()
},
..RpcProgramAccountsConfig::default()
};
let mut validator_history_accounts = client
.get_program_accounts_with_config(&program_id, gpa_config)
.await?;

let validator_histories = validator_history_accounts
.iter_mut()
.filter_map(|(_, account)| {
ValidatorHistory::try_deserialize(&mut account.data.as_slice()).ok()
})
.collect::<Vec<_>>();
let validator_histories = get_validator_history_accounts(client, program_id).await?;

let mut ips = 0;
let mut versions = 0;
Expand Down Expand Up @@ -195,6 +193,10 @@ pub async fn emit_validator_history_metrics(
}
}

let get_vote_accounts_count = get_vote_accounts_with_retry(client, MIN_VOTE_EPOCHS, None)
.await?
.len();

datapoint_info!(
"validator-history-stats",
("num_validator_histories", num_validators, i64),
Expand All @@ -207,11 +209,57 @@ pub async fn emit_validator_history_metrics(
("num_stakes", stakes, i64),
("cluster_history_blocks", cluster_history_blocks, i64),
("slot_index", epoch.slot_index, i64),
(
"num_get_vote_accounts_responses",
get_vote_accounts_count,
i64
),
);

Ok(())
}

pub async fn get_validator_history_accounts(
client: &RpcClient,
program_id: Pubkey,
) -> Result<Vec<ValidatorHistory>, ClientError> {
let gpa_config = RpcProgramAccountsConfig {
filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes(
0,
ValidatorHistory::discriminator().into(),
))]),
account_config: RpcAccountInfoConfig {
encoding: Some(solana_account_decoder::UiAccountEncoding::Base64),
..RpcAccountInfoConfig::default()
},
..RpcProgramAccountsConfig::default()
};
let mut validator_history_accounts = client
.get_program_accounts_with_config(&program_id, gpa_config)
.await?;

let validator_histories = validator_history_accounts
.iter_mut()
.filter_map(|(_, account)| {
ValidatorHistory::try_deserialize(&mut account.data.as_slice()).ok()
})
.collect::<Vec<_>>();

Ok(validator_histories)
}

pub async fn get_validator_history_accounts_with_retry(
client: &RpcClient,
program_id: Pubkey,
) -> Result<Vec<ValidatorHistory>, ClientError> {
for _ in 0..4 {
if let Ok(validator_histories) = get_validator_history_accounts(client, program_id).await {
return Ok(validator_histories);
}
}
get_validator_history_accounts(client, program_id).await
}

pub fn start_spy_server(
cluster_entrypoint: SocketAddr,
gossip_port: u16,
Expand Down
Loading

0 comments on commit 3d34220

Please sign in to comment.