Skip to content

Commit

Permalink
implemented starting block logic inside the chain scraper itself
Browse files Browse the repository at this point in the history
  • Loading branch information
jstuczyn committed Dec 9, 2024
1 parent d468038 commit 6204014
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 48 additions & 1 deletion common/nyxd-scraper/src/block_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,33 @@ impl PendingSync {
pub struct BlockProcessorConfig {
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub explicit_starting_block_height: Option<u32>,
pub use_best_effort_start_height: bool,
}

impl Default for BlockProcessorConfig {
fn default() -> Self {
Self {
pruning_options: PruningOptions::nothing(),
store_precommits: true,
explicit_starting_block_height: None,
use_best_effort_start_height: false,
}
}
}

impl BlockProcessorConfig {
pub fn new(pruning_options: PruningOptions, store_precommits: bool) -> Self {
pub fn new(
pruning_options: PruningOptions,
store_precommits: bool,
explicit_starting_block_height: Option<u32>,
use_best_effort_start_height: bool,
) -> Self {
Self {
pruning_options,
store_precommits,
explicit_starting_block_height,
use_best_effort_start_height,
}
}
}
Expand Down Expand Up @@ -403,6 +414,42 @@ impl BlockProcessor {
let request_range = self.last_processed_height + 1..latest_block + 1;
info!("we need to request {request_range:?} to resync");
self.request_missing_blocks(request_range).await?;
return Ok(());
}

// this is the first time starting up
if self.last_processed_height == 0 {
let Some(starting_height) = self.config.explicit_starting_block_height else {
// nothing to do
return Ok(());
};

info!("attempting to start the scraper from block {starting_height}");
let earliest_available =
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");

if earliest_available > starting_height && self.config.use_best_effort_start_height {
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
});
}

let starting_height = if earliest_available > starting_height {
// add few additional blocks to account for all the startup waiting
// because the node might have pruned few blocks since
earliest_available + 10
} else {
starting_height
};

let request_range = starting_height..latest_block + 1;

info!("going to start the scraper from block {starting_height}");
info!("we need to request {request_range:?} before properly starting up");

self.request_missing_blocks(request_range).await?;
}

Ok(())
Expand Down
3 changes: 3 additions & 0 deletions common/nyxd-scraper/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ pub enum ScraperError {
#[error("the block scraper is already running")]
ScraperAlreadyRunning,

#[error("block information for height {height} is not available on the provided rpc endpoint")]
BlocksUnavailable { height: u32 },

#[error("failed to establish websocket connection to {url}: {source}")]
WebSocketConnectionFailure {
url: String,
Expand Down
2 changes: 1 addition & 1 deletion common/nyxd-scraper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ pub mod storage;

pub use block_processor::pruning::{PruningOptions, PruningStrategy};
pub use modules::{BlockModule, MsgModule, TxModule};
pub use scraper::{Config, NyxdScraper};
pub use scraper::{Config, NyxdScraper, StartingBlockOpts};
pub use storage::models;
11 changes: 11 additions & 0 deletions common/nyxd-scraper/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,17 @@ impl RpcClient {
Ok(info.last_block_height.value())
}

pub(crate) async fn earliest_available_block_height(&self) -> Result<u64, ScraperError> {
debug!("getting earliest available block height");

let status = self
.inner
.status()
.await
.map_err(|source| ScraperError::AbciInfoQueryFailure { source })?;
Ok(status.sync_info.earliest_block_height.value())
}

async fn get_transaction_results(
&self,
raw: &[Vec<u8>],
Expand Down
38 changes: 26 additions & 12 deletions common/nyxd-scraper/src/scraper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ use url::Url;

mod subscriber;

#[derive(Default, Clone, Copy)]
pub struct StartingBlockOpts {
pub start_block_height: Option<u32>,

/// If the scraper fails to start from the desired height, rather than failing,
/// attempt to use the next available height
pub use_best_effort_start_height: bool,
}

pub struct Config {
/// Url to the websocket endpoint of a validator, for example `wss://rpc.nymtech.net/websocket`
pub websocket_url: Url,
Expand All @@ -37,7 +46,7 @@ pub struct Config {

pub store_precommits: bool,

pub start_block_height: Option<u32>,
pub start_block: StartingBlockOpts,
}

pub struct NyxdScraperBuilder {
Expand All @@ -50,8 +59,6 @@ pub struct NyxdScraperBuilder {

impl NyxdScraperBuilder {
pub async fn build_and_start(self) -> Result<NyxdScraper, ScraperError> {
let start_block_height = self.config.start_block_height.clone();

let scraper = NyxdScraper::new(self.config).await?;

let (processing_tx, processing_rx) = unbounded_channel();
Expand All @@ -70,6 +77,8 @@ impl NyxdScraperBuilder {
let block_processor_config = BlockProcessorConfig::new(
scraper.config.pruning_options,
scraper.config.store_precommits,
scraper.config.start_block.start_block_height,
scraper.config.start_block.use_best_effort_start_height,
);

let mut block_processor = BlockProcessor::new(
Expand All @@ -94,11 +103,6 @@ impl NyxdScraperBuilder {
)
.await?;

// TODO: decide if this should be removed?
if let Some(height) = start_block_height {
scraper.process_block_range(Some(height), None).await?;
}

scraper.start_tasks(block_requester, block_processor, chain_subscriber);

Ok(scraper)
Expand Down Expand Up @@ -175,7 +179,10 @@ impl NyxdScraper {
self.task_tracker.close();
}

pub async fn process_single_block(&self, height: u32) -> Result<(), ScraperError> {
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_single_block(&self, height: u32) -> Result<(), ScraperError> {
info!(height = height, "attempting to process a single block");
if !self.task_tracker.is_empty() {
return Err(ScraperError::ScraperAlreadyRunning);
Expand All @@ -194,7 +201,10 @@ impl NyxdScraper {
block_processor.process_block(block.into()).await
}

pub async fn process_block_range(
// DO NOT USE UNLESS YOU KNOW EXACTLY WHAT YOU'RE DOING
// AS THIS WILL NOT USE ANY OF YOUR REGISTERED MODULES
// YOU WILL BE FIRED IF YOU USE IT : )
pub async fn unsafe_process_block_range(
&self,
starting_height: Option<u32>,
end_height: Option<u32>,
Expand Down Expand Up @@ -323,8 +333,12 @@ impl NyxdScraper {
req_tx: Sender<BlockRequest>,
processing_rx: UnboundedReceiver<BlockToProcess>,
) -> Result<BlockProcessor, ScraperError> {
let block_processor_config =
BlockProcessorConfig::new(self.config.pruning_options, self.config.store_precommits);
let block_processor_config = BlockProcessorConfig::new(
self.config.pruning_options,
self.config.store_precommits,
self.config.start_block.start_block_height,
self.config.start_block.use_best_effort_start_height,
);

BlockProcessor::new(
block_processor_config,
Expand Down
2 changes: 1 addition & 1 deletion nym-validator-rewarder/src/cli/process_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {

NyxdScraper::new(config.scraper_config())
.await?
.process_single_block(args.height)
.unsafe_process_single_block(args.height)
.await?;
Ok(())
}
2 changes: 1 addition & 1 deletion nym-validator-rewarder/src/cli/process_until.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(crate) async fn execute(args: Args) -> Result<(), NymRewarderError> {

NyxdScraper::new(config.scraper_config())
.await?
.process_block_range(args.start_height, args.stop_height)
.unsafe_process_block_range(args.start_height, args.stop_height)
.await?;
Ok(())
}
22 changes: 18 additions & 4 deletions nyx-chain-watcher/src/chain_scraper/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::env::vars::{NYXD_SCRAPER_START_HEIGHT, NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT};
use nyxd_scraper::{storage::ScraperStorage, NyxdScraper, PruningOptions};

pub(crate) async fn run_chain_scraper(
Expand All @@ -9,17 +10,30 @@ pub(crate) async fn run_chain_scraper(
let websocket_url = reqwest::Url::parse(&websocket_url)?;
let rpc_url = reqwest::Url::parse(&rpc_url)?;

let start_block_height = std::env::var("NYXD_SCRAPER_START_HEIGHT")
.ok()
.and_then(|value| value.parse::<u32>().ok());
// why are those not part of CLI? : (
let start_block_height = match std::env::var(NYXD_SCRAPER_START_HEIGHT).ok() {
None => None,
// blow up if passed malformed env value
Some(raw) => Some(raw.parse()?),
};

let use_best_effort_start_height =
match std::env::var(NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT).ok() {
None => false,
// blow up if passed malformed env value
Some(raw) => raw.parse()?,
};

let scraper = NyxdScraper::builder(nyxd_scraper::Config {
websocket_url,
rpc_url,
database_path: config.chain_scraper_database_path().into(),
pruning_options: PruningOptions::nothing(),
store_precommits: false,
start_block_height,
start_block: nyxd_scraper::StartingBlockOpts {
start_block_height,
use_best_effort_start_height,
},
});

let instance = scraper.build_and_start().await?;
Expand Down
2 changes: 2 additions & 0 deletions nyx-chain-watcher/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub mod vars {
"NYX_CHAIN_WATCHER_HISTORY_DATABASE_PATH";

pub const NYXD_SCRAPER_START_HEIGHT: &str = "NYXD_SCRAPER_START_HEIGHT";
pub const NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT: &str =
"NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT";

pub const NYX_CHAIN_WATCHER_ID_ARG: &str = "NYX_CHAIN_WATCHER_ID";
pub const NYX_CHAIN_WATCHER_OUTPUT_ARG: &str = "NYX_CHAIN_WATCHER_OUTPUT";
Expand Down
5 changes: 5 additions & 0 deletions nyx-chain-watcher/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use clap::{crate_name, crate_version, Parser};
use nym_bin_common::bin_info_owned;
use nym_bin_common::logging::maybe_print_banner;
use nym_network_defaults::setup_env;
use tracing::info;

mod chain_scraper;
mod cli;
Expand All @@ -24,6 +26,9 @@ async fn main() -> anyhow::Result<()> {
maybe_print_banner(crate_name!(), crate_version!());
}

let bin_info = bin_info_owned!();
info!("using the following version: {bin_info}");

cli.execute().await?;

Ok(())
Expand Down

0 comments on commit 6204014

Please sign in to comment.