Skip to content

Commit

Permalink
Merge pull request #5274 from nymtech/feature/nyx-chain-watcher
Browse files Browse the repository at this point in the history
Nyx Chain Watcher
  • Loading branch information
tommyv1987 authored Dec 19, 2024
2 parents 67976b1 + d0722e5 commit 2fab3f1
Show file tree
Hide file tree
Showing 85 changed files with 2,194 additions and 743 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,7 @@ nym-network-monitor/__pycache__
nym-network-monitor/*.key
nym-network-monitor/.envrc
nym-network-monitor/.envrc


*.sqlite
.build
60 changes: 35 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ members = [
"nym-credential-proxy/nym-credential-proxy",
"nym-credential-proxy/nym-credential-proxy-requests",
"nym-credential-proxy/vpn-api-lib-wasm",
"nym-data-observatory",
"nym-network-monitor",
"nyx-chain-watcher",
"nym-node",
"nym-node/nym-node-requests",
"nym-node/nym-node-metrics",
Expand Down Expand Up @@ -158,11 +158,11 @@ default-members = [
"explorer-api",
"nym-api",
"nym-credential-proxy/nym-credential-proxy",
"nym-data-observatory",
"nym-node",
"nym-node-status-api/nym-node-status-agent",
"nym-node-status-api/nym-node-status-api",
"nym-validator-rewarder",
"nyx-chain-watcher",
"service-providers/authenticator",
"service-providers/ip-packet-router",
"service-providers/network-requester",
Expand Down
3 changes: 2 additions & 1 deletion common/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ dirs = { workspace = true, optional = true }
handlebars = { workspace = true }
log = { workspace = true }
serde = { workspace = true, features = ["derive"] }
toml = { workspace = true }
thiserror = { workspace = true }
toml = { workspace = true, features = ["display"] }
url = { workspace = true }

nym-network-defaults = { path = "../network-defaults", features = ["utoipa"] }
Expand Down
10 changes: 10 additions & 0 deletions common/config/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use std::io;
use thiserror::Error;

#[derive(Debug, Error)]
pub enum NymConfigTomlError {
#[error(transparent)]
FileIoFailure(#[from] io::Error),
#[error(transparent)]
TomlSerializeFailure(#[from] toml::ser::Error),
}
37 changes: 37 additions & 0 deletions common/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub use helpers::{parse_urls, OptionalSet};
pub use toml::de::Error as TomlDeError;

pub mod defaults;
pub mod error;
pub mod helpers;
pub mod legacy_helpers;
pub mod serde_helpers;
Expand Down Expand Up @@ -95,6 +96,42 @@ where
config.format_to_writer(file)
}

pub fn save_unformatted_config_to_file<C, P>(
config: &C,
path: P,
) -> Result<(), error::NymConfigTomlError>
where
C: Serialize + ?Sized,
P: AsRef<Path>,
{
let path = path.as_ref();
log::info!("saving config file to {}", path.display());

if let Some(parent) = path.parent() {
create_dir_all(parent)?;
}

let mut file = File::create(path)?;

// TODO: check for whether any of our configs store anything sensitive
// and change that to 0o644 instead
#[cfg(target_family = "unix")]
{
use std::os::unix::fs::PermissionsExt;

let mut perms = fs::metadata(path)?.permissions();
perms.set_mode(0o600);
fs::set_permissions(path, perms)?;
}

// let serde format the TOML in whatever ugly way it chooses
// TODO: in https://docs.rs/toml/latest/toml/fn.to_string_pretty.html it recommends using
// https://docs.rs/toml_edit/latest/toml_edit/struct.DocumentMut.html to preserve formatting
let toml_string = toml::to_string_pretty(config)?;

Ok(file.write_all(toml_string.as_bytes())?)
}

pub fn deserialize_config_from_toml_str<C>(raw: &str) -> Result<C, TomlDeError>
where
C: DeserializeOwned,
Expand Down
111 changes: 99 additions & 12 deletions common/nyxd-scraper/src/block_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,43 @@ impl PendingSync {
}
}

#[derive(Debug, Clone)]
pub struct BlockProcessorConfig {
pub pruning_options: PruningOptions,
pub store_precommits: bool,
pub explicit_starting_block_height: Option<u32>,
pub use_best_effort_start_height: bool,
}

impl Default for BlockProcessorConfig {
fn default() -> Self {
Self {
pruning_options: PruningOptions::nothing(),
store_precommits: true,
explicit_starting_block_height: None,
use_best_effort_start_height: false,
}
}
}

impl BlockProcessorConfig {
pub fn new(
pruning_options: PruningOptions,
store_precommits: bool,
explicit_starting_block_height: Option<u32>,
use_best_effort_start_height: bool,
) -> Self {
Self {
pruning_options,
store_precommits,
explicit_starting_block_height,
use_best_effort_start_height,
}
}
}

pub struct BlockProcessor {
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
last_processed_height: u32,
Expand All @@ -65,9 +100,10 @@ pub struct BlockProcessor {
msg_modules: Vec<Box<dyn MsgModule + Send>>,
}

#[allow(clippy::too_many_arguments)]
impl BlockProcessor {
pub async fn new(
pruning_options: PruningOptions,
config: BlockProcessorConfig,
cancel: CancellationToken,
synced: Arc<Notify>,
incoming: UnboundedReceiver<BlockToProcess>,
Expand All @@ -81,8 +117,10 @@ impl BlockProcessor {
let last_pruned = storage.get_pruned_height().await?;
let last_pruned_height = last_pruned.try_into().unwrap_or_default();

debug!(last_processed_height = %last_processed_height, pruned_height = %last_pruned_height, "setting up block processor...");

Ok(BlockProcessor {
pruning_options,
config,
cancel,
synced,
last_processed_height,
Expand All @@ -101,7 +139,7 @@ impl BlockProcessor {
}

pub fn with_pruning(mut self, pruning_options: PruningOptions) -> Self {
self.pruning_options = pruning_options;
self.config.pruning_options = pruning_options;
self
}

Expand All @@ -128,7 +166,7 @@ impl BlockProcessor {
// we won't end up with a corrupted storage.
let mut tx = self.storage.begin_processing_tx().await?;

persist_block(&full_info, &mut tx).await?;
persist_block(&full_info, &mut tx, self.config.store_precommits).await?;

// let the modules do whatever they want
// the ones wanting the full block:
Expand Down Expand Up @@ -241,7 +279,7 @@ impl BlockProcessor {

#[instrument(skip(self))]
async fn prune_storage(&mut self) -> Result<(), ScraperError> {
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = self.last_processed_height - keep_recent;

info!(
Expand Down Expand Up @@ -282,12 +320,12 @@ impl BlockProcessor {
async fn maybe_prune_storage(&mut self) -> Result<(), ScraperError> {
debug!("checking for storage pruning");

if self.pruning_options.strategy.is_nothing() {
if self.config.pruning_options.strategy.is_nothing() {
trace!("the current pruning strategy is 'nothing'");
return Ok(());
}

let interval = self.pruning_options.strategy_interval();
let interval = self.config.pruning_options.strategy_interval();
if self.last_pruned_height + interval <= self.last_processed_height {
self.prune_storage().await?;
}
Expand Down Expand Up @@ -363,28 +401,77 @@ impl BlockProcessor {
// but we need it to help the compiler figure out the future is `Send`
async fn startup_resync(&mut self) -> Result<(), ScraperError> {
assert!(self.pending_sync.is_empty());
info!("attempting to run startup resync...");

self.maybe_prune_storage().await?;

let latest_block = self.rpc_client.current_block_height().await? as u32;
info!("obtained latest block height: {latest_block}");

if latest_block > self.last_processed_height && self.last_processed_height != 0 {
info!("we have already processed some blocks in the past - attempting to resume...");
// in case we were offline for a while,
// make sure we don't request blocks we'd have to prune anyway
let keep_recent = self.pruning_options.strategy_keep_recent();
let keep_recent = self.config.pruning_options.strategy_keep_recent();
let last_to_keep = latest_block - keep_recent;
self.last_processed_height = max(self.last_processed_height, last_to_keep);

if !self.config.pruning_options.strategy.is_nothing() {
self.last_processed_height = max(self.last_processed_height, last_to_keep);
}

let request_range = self.last_processed_height + 1..latest_block + 1;
info!("we need to request {request_range:?} to resync");
info!(
keep_recent = %keep_recent,
last_to_keep = %last_to_keep,
last_processed_height = %self.last_processed_height,
"we need to request {request_range:?} to resync"
);
self.request_missing_blocks(request_range).await?;
return Ok(());
}

// this is the first time starting up
if self.last_processed_height == 0 {
info!("this is the first time starting up");
let Some(starting_height) = self.config.explicit_starting_block_height else {
info!("no starting block height set - will use the default behaviour");
// nothing to do
return Ok(());
};

info!("attempting to start the scraper from block {starting_height}");
let earliest_available =
self.rpc_client.earliest_available_block_height().await? as u32;
info!("earliest available block height: {earliest_available}");

if earliest_available > starting_height && self.config.use_best_effort_start_height {
error!("the earliest available block is higher than the desired starting height");
return Err(ScraperError::BlocksUnavailable {
height: starting_height,
});
}

let starting_height = if earliest_available > starting_height {
// add few additional blocks to account for all the startup waiting
// because the node might have pruned few blocks since
earliest_available + 10
} else {
starting_height
};

let request_range = starting_height..latest_block + 1;

info!("going to start the scraper from block {starting_height}");
info!("we need to request {request_range:?} before properly starting up");

self.request_missing_blocks(request_range).await?;
}

Ok(())
}

pub(crate) async fn run(&mut self) {
info!("starting processing loop");
info!("starting block processor processing loop");

// sure, we could be more efficient and reset it on every processed block,
// but the overhead is so minimal that it doesn't matter
Expand Down
8 changes: 1 addition & 7 deletions common/nyxd-scraper/src/block_processor/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,7 @@ impl TryFrom<Event> for BlockToProcess {

// TODO: we're losing `result_begin_block` and `result_end_block` here but maybe that's fine?
let maybe_block = match event.data {
// we don't care about `NewBlock` until CometBFT 0.38, i.e. until we upgrade to wasmd 0.50
EventData::NewBlock { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
query,
kind: "NewBlock".to_string(),
})
}
EventData::NewBlock { block, .. } => block,
EventData::LegacyNewBlock { block, .. } => block,
EventData::Tx { .. } => {
return Err(ScraperError::InvalidSubscriptionEvent {
Expand Down
Loading

0 comments on commit 2fab3f1

Please sign in to comment.