diff --git a/Cargo.lock b/Cargo.lock index 53b5a13ba46..17954d4d1b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6009,6 +6009,7 @@ dependencies = [ "bincode", "chrono", "color-eyre", + "crossbeam-channel", "dirs", "elasticsearch", "futures", diff --git a/book/src/dev/state-db-upgrades.md b/book/src/dev/state-db-upgrades.md index 15f962e88b4..65eb4744b76 100644 --- a/book/src/dev/state-db-upgrades.md +++ b/book/src/dev/state-db-upgrades.md @@ -326,6 +326,19 @@ We use the following rocksdb column families: | `history_tree` | `()` | `NonEmptyHistoryTree` | Update | | `tip_chain_value_pool` | `()` | `ValueBalance` | Update | +With the following additional modifications when compiled with the `indexer` feature: + +| Column Family | Keys | Values | Changes | +| ---------------------------------- | ---------------------- | ----------------------------- | ------- | +| *Transparent* | | | | +| `tx_loc_by_spent_out_loc` | `OutputLocation` | `TransactionLocation` | Create | +| *Sprout* | | | | +| `sprout_nullifiers` | `sprout::Nullifier` | `TransactionLocation` | Create | +| *Sapling* | | | | +| `sapling_nullifiers` | `sapling::Nullifier` | `TransactionLocation` | Create | +| *Orchard* | | | | +| `orchard_nullifiers` | `orchard::Nullifier` | `TransactionLocation` | Create | + ### Data Formats [rocksdb-data-format]: #rocksdb-data-format diff --git a/zebra-rpc/Cargo.toml b/zebra-rpc/Cargo.toml index 270d7a63ecf..bb888e4dbb4 100644 --- a/zebra-rpc/Cargo.toml +++ b/zebra-rpc/Cargo.toml @@ -27,6 +27,7 @@ indexer-rpcs = [ "tonic-reflection", "prost", "tokio-stream", + "zebra-state/indexer" ] # Production features that activate extra dependencies, or extra features in dependencies diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index 7b3611c72a8..450b8c9cfa9 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -1629,7 +1629,7 @@ impl AddressStrings { AddressStrings { addresses } } - /// Creates a new [`AddessStrings`] from a given vector, returns an error if any addresses are incorrect. + /// Creates a new [`AddressStrings`] from a given vector, returns an error if any addresses are incorrect. pub fn new_valid(addresses: Vec) -> Result { let address_strings = Self { addresses }; address_strings.clone().valid_addresses()?; diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index fc2a6505791..e7d8dc54215 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -27,6 +27,9 @@ getblocktemplate-rpcs = [ "zebra-chain/getblocktemplate-rpcs", ] +# Indexes spending transaction ids by spent outpoints and revealed nullifiers +indexer = [] + # Test-only features proptest-impl = [ "proptest", @@ -63,6 +66,7 @@ regex = "1.11.0" rlimit = "0.10.2" rocksdb = { version = "0.22.0", default-features = false, features = ["lz4"] } semver = "1.0.23" +crossbeam-channel = "0.5.13" serde = { version = "1.0.215", features = ["serde_derive"] } tempfile = "3.14.0" thiserror = "2.0.6" diff --git a/zebra-state/src/config.rs b/zebra-state/src/config.rs index 4cd800f3975..7d175d4d614 100644 --- a/zebra-state/src/config.rs +++ b/zebra-state/src/config.rs @@ -431,15 +431,7 @@ pub(crate) fn database_format_version_at_path( // The database has a version file on disk if let Some(version) = disk_version_file { - let (minor, patch) = version - .split_once('.') - .ok_or("invalid database format version file")?; - - return Ok(Some(Version::new( - major_version, - minor.parse()?, - patch.parse()?, - ))); + return Ok(Some(format!("{major_version}.{version}").parse()?)); } // There's no version file on disk, so we need to guess the version @@ -508,7 +500,11 @@ pub(crate) mod hidden { ) -> Result<(), BoxError> { let version_path = config.version_file_path(db_kind, changed_version.major, network); - let version = format!("{}.{}", changed_version.minor, changed_version.patch); + let mut version = format!("{}.{}", changed_version.minor, changed_version.patch); + + if !changed_version.build.is_empty() { + version.push_str(&format!("+{}", changed_version.build)); + } // Write the version file atomically so the cache is not corrupted if Zebra shuts down or // crashes. diff --git a/zebra-state/src/constants.rs b/zebra-state/src/constants.rs index 167ce011955..2c3671d838f 100644 --- a/zebra-state/src/constants.rs +++ b/zebra-state/src/constants.rs @@ -66,11 +66,16 @@ const DATABASE_FORMAT_PATCH_VERSION: u64 = 0; /// This is the version implemented by the Zebra code that's currently running, /// the minor and patch versions on disk can be different. pub fn state_database_format_version_in_code() -> Version { - Version::new( - DATABASE_FORMAT_VERSION, - DATABASE_FORMAT_MINOR_VERSION, - DATABASE_FORMAT_PATCH_VERSION, - ) + Version { + major: DATABASE_FORMAT_VERSION, + minor: DATABASE_FORMAT_MINOR_VERSION, + patch: DATABASE_FORMAT_PATCH_VERSION, + pre: semver::Prerelease::EMPTY, + #[cfg(feature = "indexer")] + build: semver::BuildMetadata::new("indexer").expect("hard-coded value should be valid"), + #[cfg(not(feature = "indexer"))] + build: semver::BuildMetadata::EMPTY, + } } /// Returns the highest database version that modifies the subtree index format. diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index e93a3b8f905..848d30950ec 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -44,6 +44,10 @@ pub use error::{ pub use request::{ CheckpointVerifiedBlock, HashOrHeight, ReadRequest, Request, SemanticallyVerifiedBlock, }; + +#[cfg(feature = "indexer")] +pub use request::Spend; + pub use response::{KnownBlock, MinedTx, ReadResponse, Response}; pub use service::{ chain_tip::{ChainTipBlock, ChainTipChange, ChainTipSender, LatestChainTip, TipAction}, diff --git a/zebra-state/src/request.rs b/zebra-state/src/request.rs index 5894b7da55a..518243abe2f 100644 --- a/zebra-state/src/request.rs +++ b/zebra-state/src/request.rs @@ -29,6 +29,51 @@ use crate::{ ReadResponse, Response, }; +/// Identify a spend by a transparent outpoint or revealed nullifier. +/// +/// This enum implements `From` for [`transparent::OutPoint`], [`sprout::Nullifier`], +/// [`sapling::Nullifier`], and [`orchard::Nullifier`]. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[cfg(feature = "indexer")] +pub enum Spend { + /// A spend identified by a [`transparent::OutPoint`]. + OutPoint(transparent::OutPoint), + /// A spend identified by a [`sprout::Nullifier`]. + Sprout(sprout::Nullifier), + /// A spend identified by a [`sapling::Nullifier`]. + Sapling(sapling::Nullifier), + /// A spend identified by a [`orchard::Nullifier`]. + Orchard(orchard::Nullifier), +} + +#[cfg(feature = "indexer")] +impl From for Spend { + fn from(outpoint: transparent::OutPoint) -> Self { + Self::OutPoint(outpoint) + } +} + +#[cfg(feature = "indexer")] +impl From for Spend { + fn from(sprout_nullifier: sprout::Nullifier) -> Self { + Self::Sprout(sprout_nullifier) + } +} + +#[cfg(feature = "indexer")] +impl From for Spend { + fn from(sapling_nullifier: sapling::Nullifier) -> Self { + Self::Sapling(sapling_nullifier) + } +} + +#[cfg(feature = "indexer")] +impl From for Spend { + fn from(orchard_nullifier: orchard::Nullifier) -> Self { + Self::Orchard(orchard_nullifier) + } +} + /// Identify a block by hash or height. /// /// This enum implements `From` for [`block::Hash`] and [`block::Height`], @@ -1020,6 +1065,13 @@ pub enum ReadRequest { height_range: RangeInclusive, }, + /// Looks up a spending transaction id by its spent transparent input. + /// + /// Returns [`ReadResponse::TransactionId`] with the hash of the transaction + /// that spent the output at the provided [`transparent::OutPoint`]. + #[cfg(feature = "indexer")] + SpendingTransactionId(Spend), + /// Looks up utxos for the provided addresses. /// /// Returns a type with found utxos and transaction information. @@ -1106,6 +1158,8 @@ impl ReadRequest { } ReadRequest::BestChainNextMedianTimePast => "best_chain_next_median_time_past", ReadRequest::BestChainBlockHash(_) => "best_chain_block_hash", + #[cfg(feature = "indexer")] + ReadRequest::SpendingTransactionId(_) => "spending_transaction_id", #[cfg(feature = "getblocktemplate-rpcs")] ReadRequest::ChainInfo => "chain_info", #[cfg(feature = "getblocktemplate-rpcs")] diff --git a/zebra-state/src/response.rs b/zebra-state/src/response.rs index daa2fbe2829..73b2ab09a96 100644 --- a/zebra-state/src/response.rs +++ b/zebra-state/src/response.rs @@ -175,6 +175,12 @@ pub enum ReadResponse { /// or `None` if the block was not found. TransactionIdsForBlock(Option>), + /// Response to [`ReadRequest::SpendingTransactionId`], + /// with an list of transaction hashes in block order, + /// or `None` if the block was not found. + #[cfg(feature = "indexer")] + TransactionId(Option), + /// Response to [`ReadRequest::BlockLocator`] with a block locator object. BlockLocator(Vec), @@ -343,6 +349,9 @@ impl TryFrom for Response { Err("there is no corresponding Response for this ReadResponse") } + #[cfg(feature = "indexer")] + ReadResponse::TransactionId(_) => Err("there is no corresponding Response for this ReadResponse"), + #[cfg(feature = "getblocktemplate-rpcs")] ReadResponse::ValidBlockProposal => Ok(Response::ValidBlockProposal), diff --git a/zebra-state/src/service.rs b/zebra-state/src/service.rs index be3a78f0772..487d152c62c 100644 --- a/zebra-state/src/service.rs +++ b/zebra-state/src/service.rs @@ -1383,6 +1383,35 @@ impl Service for ReadStateService { .wait_for_panics() } + #[cfg(feature = "indexer")] + ReadRequest::SpendingTransactionId(spend) => { + let state = self.clone(); + + tokio::task::spawn_blocking(move || { + span.in_scope(move || { + let spending_transaction_id = state + .non_finalized_state_receiver + .with_watch_data(|non_finalized_state| { + read::spending_transaction_hash( + non_finalized_state.best_chain(), + &state.db, + spend, + ) + }); + + // The work is done in the future. + timer.finish( + module_path!(), + line!(), + "ReadRequest::TransactionIdForSpentOutPoint", + ); + + Ok(ReadResponse::TransactionId(spending_transaction_id)) + }) + }) + .wait_for_panics() + } + ReadRequest::UnspentBestChainUtxo(outpoint) => { let state = self.clone(); diff --git a/zebra-state/src/service/check/nullifier.rs b/zebra-state/src/service/check/nullifier.rs index 809e78383ba..bd7e2b834be 100644 --- a/zebra-state/src/service/check/nullifier.rs +++ b/zebra-state/src/service/check/nullifier.rs @@ -1,13 +1,16 @@ //! Checks for nullifier uniqueness. -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashMap, sync::Arc}; use tracing::trace; use zebra_chain::transaction::Transaction; use crate::{ error::DuplicateNullifierError, - service::{finalized_state::ZebraDb, non_finalized_state::Chain}, + service::{ + finalized_state::ZebraDb, + non_finalized_state::{Chain, SpendingTransactionId}, + }, SemanticallyVerifiedBlock, ValidateContextError, }; @@ -105,19 +108,22 @@ pub(crate) fn tx_no_duplicates_in_chain( find_duplicate_nullifier( transaction.sprout_nullifiers(), |nullifier| finalized_chain.contains_sprout_nullifier(nullifier), - non_finalized_chain.map(|chain| |nullifier| chain.sprout_nullifiers.contains(nullifier)), + non_finalized_chain + .map(|chain| |nullifier| chain.sprout_nullifiers.contains_key(nullifier)), )?; find_duplicate_nullifier( transaction.sapling_nullifiers(), |nullifier| finalized_chain.contains_sapling_nullifier(nullifier), - non_finalized_chain.map(|chain| |nullifier| chain.sapling_nullifiers.contains(nullifier)), + non_finalized_chain + .map(|chain| |nullifier| chain.sapling_nullifiers.contains_key(nullifier)), )?; find_duplicate_nullifier( transaction.orchard_nullifiers(), |nullifier| finalized_chain.contains_orchard_nullifier(nullifier), - non_finalized_chain.map(|chain| |nullifier| chain.orchard_nullifiers.contains(nullifier)), + non_finalized_chain + .map(|chain| |nullifier| chain.orchard_nullifiers.contains_key(nullifier)), )?; Ok(()) @@ -156,8 +162,9 @@ pub(crate) fn tx_no_duplicates_in_chain( /// [5]: service::non_finalized_state::Chain #[tracing::instrument(skip(chain_nullifiers, shielded_data_nullifiers))] pub(crate) fn add_to_non_finalized_chain_unique<'block, NullifierT>( - chain_nullifiers: &mut HashSet, + chain_nullifiers: &mut HashMap, shielded_data_nullifiers: impl IntoIterator, + revealing_tx_id: SpendingTransactionId, ) -> Result<(), ValidateContextError> where NullifierT: DuplicateNullifierError + Copy + std::fmt::Debug + Eq + std::hash::Hash + 'block, @@ -166,7 +173,10 @@ where trace!(?nullifier, "adding nullifier"); // reject the nullifier if it is already present in this non-finalized chain - if !chain_nullifiers.insert(*nullifier) { + if chain_nullifiers + .insert(*nullifier, revealing_tx_id) + .is_some() + { Err(nullifier.duplicate_nullifier_error(false))?; } } @@ -200,7 +210,7 @@ where /// [1]: service::non_finalized_state::Chain #[tracing::instrument(skip(chain_nullifiers, shielded_data_nullifiers))] pub(crate) fn remove_from_non_finalized_chain<'block, NullifierT>( - chain_nullifiers: &mut HashSet, + chain_nullifiers: &mut HashMap, shielded_data_nullifiers: impl IntoIterator, ) where NullifierT: std::fmt::Debug + Eq + std::hash::Hash + 'block, @@ -209,7 +219,7 @@ pub(crate) fn remove_from_non_finalized_chain<'block, NullifierT>( trace!(?nullifier, "removing nullifier"); assert!( - chain_nullifiers.remove(nullifier), + chain_nullifiers.remove(nullifier).is_some(), "nullifier must be present if block was added to chain" ); } diff --git a/zebra-state/src/service/check/tests/utxo.rs b/zebra-state/src/service/check/tests/utxo.rs index 57d087c552d..3e37fdc8173 100644 --- a/zebra-state/src/service/check/tests/utxo.rs +++ b/zebra-state/src/service/check/tests/utxo.rs @@ -221,7 +221,7 @@ proptest! { .unwrap(); prop_assert!(!chain.unspent_utxos().contains_key(&expected_outpoint)); prop_assert!(chain.created_utxos.contains_key(&expected_outpoint)); - prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); + prop_assert!(chain.spent_utxos.contains_key(&expected_outpoint)); // the finalized state does not have the UTXO prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); @@ -310,14 +310,14 @@ proptest! { if use_finalized_state_output { // the chain has spent the UTXO from the finalized state prop_assert!(!chain.created_utxos.contains_key(&expected_outpoint)); - prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); + prop_assert!(chain.spent_utxos.contains_key(&expected_outpoint)); // the finalized state has the UTXO, but it will get deleted on commit prop_assert!(finalized_state.utxo(&expected_outpoint).is_some()); } else { // the chain has spent its own UTXO prop_assert!(!chain.unspent_utxos().contains_key(&expected_outpoint)); prop_assert!(chain.created_utxos.contains_key(&expected_outpoint)); - prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); + prop_assert!(chain.spent_utxos.contains_key(&expected_outpoint)); // the finalized state does not have the UTXO prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } @@ -650,12 +650,12 @@ proptest! { // the finalized state has the unspent UTXO prop_assert!(finalized_state.utxo(&expected_outpoint).is_some()); // the non-finalized state has spent the UTXO - prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); + prop_assert!(chain.spent_utxos.contains_key(&expected_outpoint)); } else { // the non-finalized state has created and spent the UTXO prop_assert!(!chain.unspent_utxos().contains_key(&expected_outpoint)); prop_assert!(chain.created_utxos.contains_key(&expected_outpoint)); - prop_assert!(chain.spent_utxos.contains(&expected_outpoint)); + prop_assert!(chain.spent_utxos.contains_key(&expected_outpoint)); // the finalized state does not have the UTXO prop_assert!(finalized_state.utxo(&expected_outpoint).is_none()); } diff --git a/zebra-state/src/service/check/utxo.rs b/zebra-state/src/service/check/utxo.rs index df3981ec0b8..4f1a307c402 100644 --- a/zebra-state/src/service/check/utxo.rs +++ b/zebra-state/src/service/check/utxo.rs @@ -1,6 +1,6 @@ //! Consensus rule checks for the finalized state. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use zebra_chain::{ amount, @@ -9,7 +9,7 @@ use zebra_chain::{ use crate::{ constants::MIN_TRANSPARENT_COINBASE_MATURITY, - service::finalized_state::ZebraDb, + service::{finalized_state::ZebraDb, non_finalized_state::SpendingTransactionId}, SemanticallyVerifiedBlock, ValidateContextError::{ self, DuplicateTransparentSpend, EarlyTransparentSpend, ImmatureTransparentCoinbaseSpend, @@ -38,7 +38,7 @@ use crate::{ pub fn transparent_spend( semantically_verified: &SemanticallyVerifiedBlock, non_finalized_chain_unspent_utxos: &HashMap, - non_finalized_chain_spent_utxos: &HashSet, + non_finalized_chain_spent_utxos: &HashMap, finalized_state: &ZebraDb, ) -> Result, ValidateContextError> { let mut block_spends = HashMap::new(); @@ -128,7 +128,7 @@ fn transparent_spend_chain_order( spend_tx_index_in_block: usize, block_new_outputs: &HashMap, non_finalized_chain_unspent_utxos: &HashMap, - non_finalized_chain_spent_utxos: &HashSet, + non_finalized_chain_spent_utxos: &HashMap, finalized_state: &ZebraDb, ) -> Result { if let Some(output) = block_new_outputs.get(&spend) { @@ -148,7 +148,7 @@ fn transparent_spend_chain_order( } } - if non_finalized_chain_spent_utxos.contains(&spend) { + if non_finalized_chain_spent_utxos.contains_key(&spend) { // reject the spend if its UTXO is already spent in the // non-finalized parent chain return Err(DuplicateTransparentSpend { diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index f8c9bade5c1..57d22493cef 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -20,6 +20,7 @@ use std::{ }; use zebra_chain::{block, parallel::tree::NoteCommitmentTrees, parameters::Network}; +use zebra_db::transparent::TX_LOC_BY_SPENT_OUT_LOC; use crate::{ constants::{state_database_format_version_in_code, STATE_DATABASE_KIND}, @@ -77,6 +78,7 @@ pub const STATE_COLUMN_FAMILIES_IN_CODE: &[&str] = &[ "tx_loc_by_transparent_addr_loc", "utxo_by_out_loc", "utxo_loc_by_transparent_addr_loc", + TX_LOC_BY_SPENT_OUT_LOC, // Sprout "sprout_nullifiers", "sprout_anchors", diff --git a/zebra-state/src/service/finalized_state/disk_format/block.rs b/zebra-state/src/service/finalized_state/disk_format/block.rs index 22495ebf332..ed57dae03fc 100644 --- a/zebra-state/src/service/finalized_state/disk_format/block.rs +++ b/zebra-state/src/service/finalized_state/disk_format/block.rs @@ -331,6 +331,16 @@ impl IntoDisk for TransactionLocation { } } +impl FromDisk for Option { + fn from_bytes(disk_bytes: impl AsRef<[u8]>) -> Self { + if disk_bytes.as_ref().len() == TRANSACTION_LOCATION_DISK_BYTES { + Some(TransactionLocation::from_bytes(disk_bytes)) + } else { + None + } + } +} + impl FromDisk for TransactionLocation { fn from_bytes(disk_bytes: impl AsRef<[u8]>) -> Self { let (height_bytes, index_bytes) = disk_bytes.as_ref().split_at(HEIGHT_DISK_BYTES); diff --git a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/column_family_names.snap b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/column_family_names.snap index d37e037cac7..3a1191beda9 100644 --- a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/column_family_names.snap +++ b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/column_family_names.snap @@ -1,6 +1,5 @@ --- source: zebra-state/src/service/finalized_state/disk_format/tests/snapshot.rs -assertion_line: 81 expression: cf_names --- [ @@ -25,6 +24,7 @@ expression: cf_names "tip_chain_value_pool", "tx_by_loc", "tx_loc_by_hash", + "tx_loc_by_spent_out_loc", "tx_loc_by_transparent_addr_loc", "utxo_by_out_loc", "utxo_loc_by_transparent_addr_loc", diff --git a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_0.snap b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_0.snap index 3c333a9fc43..5511807d28c 100644 --- a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_0.snap +++ b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_0.snap @@ -11,6 +11,7 @@ expression: empty_column_families "sapling_nullifiers: no entries", "sprout_nullifiers: no entries", "tip_chain_value_pool: no entries", + "tx_loc_by_spent_out_loc: no entries", "tx_loc_by_transparent_addr_loc: no entries", "utxo_by_out_loc: no entries", "utxo_loc_by_transparent_addr_loc: no entries", diff --git a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_1.snap b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_1.snap index cb8ac5f6aed..8fcb84c844f 100644 --- a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_1.snap +++ b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_1.snap @@ -9,4 +9,5 @@ expression: empty_column_families "sapling_note_commitment_subtree: no entries", "sapling_nullifiers: no entries", "sprout_nullifiers: no entries", + "tx_loc_by_spent_out_loc: no entries", ] diff --git a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_2.snap b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_2.snap index cb8ac5f6aed..8fcb84c844f 100644 --- a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_2.snap +++ b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@mainnet_2.snap @@ -9,4 +9,5 @@ expression: empty_column_families "sapling_note_commitment_subtree: no entries", "sapling_nullifiers: no entries", "sprout_nullifiers: no entries", + "tx_loc_by_spent_out_loc: no entries", ] diff --git a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@no_blocks.snap b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@no_blocks.snap index a2abce2083b..e461b0d0f1e 100644 --- a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@no_blocks.snap +++ b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@no_blocks.snap @@ -1,6 +1,5 @@ --- source: zebra-state/src/service/finalized_state/disk_format/tests/snapshot.rs -assertion_line: 166 expression: empty_column_families --- [ @@ -24,6 +23,7 @@ expression: empty_column_families "tip_chain_value_pool: no entries", "tx_by_loc: no entries", "tx_loc_by_hash: no entries", + "tx_loc_by_spent_out_loc: no entries", "tx_loc_by_transparent_addr_loc: no entries", "utxo_by_out_loc: no entries", "utxo_loc_by_transparent_addr_loc: no entries", diff --git a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_0.snap b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_0.snap index 3c333a9fc43..5511807d28c 100644 --- a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_0.snap +++ b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_0.snap @@ -11,6 +11,7 @@ expression: empty_column_families "sapling_nullifiers: no entries", "sprout_nullifiers: no entries", "tip_chain_value_pool: no entries", + "tx_loc_by_spent_out_loc: no entries", "tx_loc_by_transparent_addr_loc: no entries", "utxo_by_out_loc: no entries", "utxo_loc_by_transparent_addr_loc: no entries", diff --git a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_1.snap b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_1.snap index cb8ac5f6aed..8fcb84c844f 100644 --- a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_1.snap +++ b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_1.snap @@ -9,4 +9,5 @@ expression: empty_column_families "sapling_note_commitment_subtree: no entries", "sapling_nullifiers: no entries", "sprout_nullifiers: no entries", + "tx_loc_by_spent_out_loc: no entries", ] diff --git a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_2.snap b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_2.snap index cb8ac5f6aed..8fcb84c844f 100644 --- a/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_2.snap +++ b/zebra-state/src/service/finalized_state/disk_format/tests/snapshots/empty_column_families@testnet_2.snap @@ -9,4 +9,5 @@ expression: empty_column_families "sapling_note_commitment_subtree: no entries", "sapling_nullifiers: no entries", "sprout_nullifiers: no entries", + "tx_loc_by_spent_out_loc: no entries", ] diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs index f8ce127843f..93625a848dc 100644 --- a/zebra-state/src/service/finalized_state/disk_format/upgrade.rs +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade.rs @@ -2,10 +2,11 @@ use std::{ cmp::Ordering, - sync::{mpsc, Arc}, + sync::Arc, thread::{self, JoinHandle}, }; +use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender, TryRecvError}; use semver::Version; use tracing::Span; @@ -28,6 +29,12 @@ pub(crate) mod add_subtrees; pub(crate) mod cache_genesis_roots; pub(crate) mod fix_tree_key_type; +#[cfg(not(feature = "indexer"))] +pub(crate) mod drop_tx_locs_by_spends; + +#[cfg(feature = "indexer")] +pub(crate) mod track_tx_locs_by_spends; + /// The kind of database format change or validity check we're performing. #[derive(Clone, Debug, Eq, PartialEq)] pub enum DbFormatChange { @@ -96,7 +103,7 @@ pub struct DbFormatChangeThreadHandle { update_task: Option>>>, /// A channel that tells the running format thread to finish early. - cancel_handle: mpsc::SyncSender, + cancel_handle: Sender, } /// Marker type that is sent to cancel a format upgrade, and returned as an error on cancellation. @@ -121,7 +128,7 @@ impl DbFormatChange { return NewlyCreated { running_version }; }; - match disk_version.cmp(&running_version) { + match disk_version.cmp_precedence(&running_version) { Ordering::Less => { info!( %running_version, @@ -228,7 +235,7 @@ impl DbFormatChange { // // Cancel handles must use try_send() to avoid blocking waiting for the format change // thread to shut down. - let (cancel_handle, cancel_receiver) = mpsc::sync_channel(1); + let (cancel_handle, cancel_receiver) = bounded(1); let span = Span::current(); let update_task = thread::spawn(move || { @@ -256,7 +263,7 @@ impl DbFormatChange { self, db: ZebraDb, initial_tip_height: Option, - cancel_receiver: mpsc::Receiver, + cancel_receiver: Receiver, ) -> Result<(), CancelFormatChange> { self.run_format_change_or_check(&db, initial_tip_height, &cancel_receiver)?; @@ -269,7 +276,7 @@ impl DbFormatChange { // But return early if there is a cancel signal. if !matches!( cancel_receiver.recv_timeout(debug_validity_check_interval), - Err(mpsc::RecvTimeoutError::Timeout) + Err(RecvTimeoutError::Timeout) ) { return Err(CancelFormatChange); } @@ -288,7 +295,7 @@ impl DbFormatChange { &self, db: &ZebraDb, initial_tip_height: Option, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result<(), CancelFormatChange> { match self { // Perform any required upgrades, then mark the state as upgraded. @@ -337,6 +344,60 @@ impl DbFormatChange { } } + #[cfg(feature = "indexer")] + if let ( + Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. }, + Some(initial_tip_height), + ) = (self, initial_tip_height) + { + // Indexing transaction locations by their spent outpoints and revealed nullifiers. + let timer = CodeTimer::start(); + + // Add build metadata to on-disk version file just before starting to add indexes + let mut version = db + .format_version_on_disk() + .expect("unable to read database format version file") + .expect("should write database format version file above"); + version.build = db.format_version_in_code().build; + + db.update_format_version_on_disk(&version) + .expect("unable to write database format version file to disk"); + + info!("started checking/adding indexes for spending tx ids"); + track_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?; + info!("finished checking/adding indexes for spending tx ids"); + + timer.finish(module_path!(), line!(), "indexing spending transaction ids"); + }; + + #[cfg(not(feature = "indexer"))] + if let ( + Upgrade { .. } | CheckOpenCurrent { .. } | Downgrade { .. }, + Some(initial_tip_height), + ) = (self, initial_tip_height) + { + let mut version = db + .format_version_on_disk() + .expect("unable to read database format version file") + .expect("should write database format version file above"); + + if version.build.contains("indexer") { + // Indexing transaction locations by their spent outpoints and revealed nullifiers. + let timer = CodeTimer::start(); + + info!("started removing indexes for spending tx ids"); + drop_tx_locs_by_spends::run(initial_tip_height, db, cancel_receiver)?; + info!("finished removing indexes for spending tx ids"); + + // Remove build metadata to on-disk version file after indexes have been dropped. + version.build = db.format_version_in_code().build; + db.update_format_version_on_disk(&version) + .expect("unable to write database format version file to disk"); + + timer.finish(module_path!(), line!(), "removing spending transaction ids"); + } + }; + // These checks should pass for all format changes: // - upgrades should produce a valid format (and they already do that check) // - an empty state should pass all the format checks @@ -381,7 +442,7 @@ impl DbFormatChange { &self, db: &ZebraDb, initial_tip_height: Option, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result<(), CancelFormatChange> { let Upgrade { newer_running_version, @@ -433,7 +494,7 @@ impl DbFormatChange { // The block after genesis is the first possible duplicate. for (height, tree) in db.sapling_tree_by_height_range(Height(1)..=initial_tip_height) { // Return early if there is a cancel signal. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -460,7 +521,7 @@ impl DbFormatChange { // The block after genesis is the first possible duplicate. for (height, tree) in db.orchard_tree_by_height_range(Height(1)..=initial_tip_height) { // Return early if there is a cancel signal. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -598,7 +659,7 @@ impl DbFormatChange { #[allow(clippy::vec_init_then_push)] pub fn format_validity_checks_detailed( db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result, CancelFormatChange> { let timer = CodeTimer::start(); let mut results = Vec::new(); @@ -635,7 +696,7 @@ impl DbFormatChange { #[allow(clippy::unwrap_in_result)] fn check_for_duplicate_trees( db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result, CancelFormatChange> { // Runtime test: make sure we removed all duplicates. // We always run this test, even if the state has supposedly been upgraded. @@ -645,7 +706,7 @@ impl DbFormatChange { let mut prev_tree = None; for (height, tree) in db.sapling_tree_by_height_range(..) { // Return early if the format check is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -667,7 +728,7 @@ impl DbFormatChange { let mut prev_tree = None; for (height, tree) in db.orchard_tree_by_height_range(..) { // Return early if the format check is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade/add_subtrees.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade/add_subtrees.rs index d84392ebf84..8f47e4f28d7 100644 --- a/zebra-state/src/service/finalized_state/disk_format/upgrade/add_subtrees.rs +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade/add_subtrees.rs @@ -1,7 +1,8 @@ //! Fully populate the Sapling and Orchard note commitment subtrees for existing blocks in the database. -use std::sync::{mpsc, Arc}; +use std::sync::Arc; +use crossbeam_channel::{Receiver, TryRecvError}; use hex_literal::hex; use itertools::Itertools; use tracing::instrument; @@ -30,7 +31,7 @@ use crate::service::finalized_state::{ pub fn run( initial_tip_height: Height, upgrade_db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result<(), CancelFormatChange> { // # Consensus // @@ -65,7 +66,7 @@ pub fn run( for (prev_end_height, prev_tree, end_height, tree) in subtrees { // Return early if the upgrade is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -90,7 +91,7 @@ pub fn run( for (prev_end_height, prev_tree, end_height, tree) in subtrees { // Return early if the upgrade is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -110,10 +111,10 @@ pub fn run( pub fn reset( _initial_tip_height: Height, upgrade_db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result<(), CancelFormatChange> { // Return early if the upgrade is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -127,7 +128,7 @@ pub fn reset( .write_batch(batch) .expect("deleting old sapling note commitment subtrees is a valid database operation"); - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -306,7 +307,7 @@ fn quick_check_orchard_subtrees(db: &ZebraDb) -> Result<(), &'static str> { /// Check that note commitment subtrees were correctly added. pub fn subtree_format_validity_checks_detailed( db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result, CancelFormatChange> { // This is redundant in some code paths, but not in others. But it's quick anyway. let quick_result = subtree_format_calculation_pre_checks(db); @@ -332,7 +333,7 @@ pub fn subtree_format_validity_checks_detailed( /// Returns an error if a note commitment subtree is missing or incorrect. fn check_sapling_subtrees( db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result, CancelFormatChange> { let Some(NoteCommitmentSubtreeIndex(mut first_incomplete_subtree_index)) = db.sapling_tree_for_tip().subtree_index() @@ -348,7 +349,7 @@ fn check_sapling_subtrees( let mut result = Ok(()); for index in 0..first_incomplete_subtree_index { // Return early if the format check is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -418,7 +419,7 @@ fn check_sapling_subtrees( }) { // Return early if the format check is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -462,7 +463,7 @@ fn check_sapling_subtrees( /// Returns an error if a note commitment subtree is missing or incorrect. fn check_orchard_subtrees( db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result, CancelFormatChange> { let Some(NoteCommitmentSubtreeIndex(mut first_incomplete_subtree_index)) = db.orchard_tree_for_tip().subtree_index() @@ -478,7 +479,7 @@ fn check_orchard_subtrees( let mut result = Ok(()); for index in 0..first_incomplete_subtree_index { // Return early if the format check is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -548,7 +549,7 @@ fn check_orchard_subtrees( }) { // Return early if the format check is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade/cache_genesis_roots.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade/cache_genesis_roots.rs index 57fcacb9d5b..186cfe5f51c 100644 --- a/zebra-state/src/service/finalized_state/disk_format/upgrade/cache_genesis_roots.rs +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade/cache_genesis_roots.rs @@ -3,8 +3,7 @@ //! This reduces CPU usage when the genesis tree roots are used for transaction validation. //! Since mempool transactions are cheap to create, this is a potential remote denial of service. -use std::sync::mpsc; - +use crossbeam_channel::{Receiver, TryRecvError}; use zebra_chain::{block::Height, sprout}; use crate::service::finalized_state::{disk_db::DiskWriteBatch, ZebraDb}; @@ -23,7 +22,7 @@ use super::CancelFormatChange; pub fn run( _initial_tip_height: Height, upgrade_db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result<(), CancelFormatChange> { let sprout_genesis_tree = sprout::tree::NoteCommitmentTree::default(); let sprout_tip_tree = upgrade_db.sprout_tree_for_tip(); @@ -50,7 +49,7 @@ pub fn run( batch.create_orchard_tree(upgrade_db, &Height(0), &orchard_genesis_tree); // Return before we write if the upgrade is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -126,7 +125,7 @@ pub fn quick_check(db: &ZebraDb) -> Result<(), String> { /// If the state is empty. pub fn detailed_check( db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result, CancelFormatChange> { // This is redundant in some code paths, but not in others. But it's quick anyway. // Check the entire format before returning any errors. @@ -134,7 +133,7 @@ pub fn detailed_check( for (root, tree) in db.sprout_trees_full_map() { // Return early if the format check is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -149,7 +148,7 @@ pub fn detailed_check( for (height, tree) in db.sapling_tree_by_height_range(..) { // Return early if the format check is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -164,7 +163,7 @@ pub fn detailed_check( for (height, tree) in db.orchard_tree_by_height_range(..) { // Return early if the format check is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade/drop_tx_locs_by_spends.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade/drop_tx_locs_by_spends.rs new file mode 100644 index 00000000000..cfc82b1aec1 --- /dev/null +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade/drop_tx_locs_by_spends.rs @@ -0,0 +1,74 @@ +//! Tracks transaction locations by their inputs and revealed nullifiers. + +use crossbeam_channel::{Receiver, TryRecvError}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; + +use zebra_chain::block::Height; + +use crate::service::finalized_state::ZebraDb; + +use super::{super::super::DiskWriteBatch, CancelFormatChange}; + +/// Runs disk format upgrade for tracking transaction locations by their inputs and revealed nullifiers. +/// +/// Returns `Ok` if the upgrade completed, and `Err` if it was cancelled. +#[allow(clippy::unwrap_in_result)] +#[instrument(skip(zebra_db, cancel_receiver))] +pub fn run( + initial_tip_height: Height, + zebra_db: &ZebraDb, + cancel_receiver: &Receiver, +) -> Result<(), CancelFormatChange> { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + let _ = zebra_db + .tx_loc_by_spent_output_loc_cf() + .new_batch_for_writing() + .zs_delete_range( + &crate::OutputLocation::from_output_index(crate::TransactionLocation::MIN, 0), + &crate::OutputLocation::from_output_index(crate::TransactionLocation::MAX, u32::MAX), + ) + .write_batch(); + + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + (0..=initial_tip_height.0) + .into_par_iter() + .try_for_each(|height| { + let height = Height(height); + let mut batch = DiskWriteBatch::new(); + + let transactions = zebra_db.transactions_by_location_range( + crate::TransactionLocation::from_index(height, 1) + ..=crate::TransactionLocation::max_for_height(height), + ); + + for (_tx_loc, tx) in transactions { + if tx.is_coinbase() { + continue; + } + + batch + .prepare_nullifier_batch(zebra_db, &tx) + .expect("method should never return an error"); + } + + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + zebra_db + .write_batch(batch) + .expect("unexpected database write failure"); + + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + Ok(()) + }) +} diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade/fix_tree_key_type.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade/fix_tree_key_type.rs index 4bcd5d8cd4c..25665f419c1 100644 --- a/zebra-state/src/service/finalized_state/disk_format/upgrade/fix_tree_key_type.rs +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade/fix_tree_key_type.rs @@ -2,8 +2,9 @@ //! //! This avoids a potential concurrency bug, and a known database performance issue. -use std::sync::{mpsc, Arc}; +use std::sync::Arc; +use crossbeam_channel::{Receiver, TryRecvError}; use zebra_chain::{block::Height, history_tree::HistoryTree, sprout}; use crate::service::finalized_state::{ @@ -20,7 +21,7 @@ use super::CancelFormatChange; pub fn run( _initial_tip_height: Height, upgrade_db: &ZebraDb, - cancel_receiver: &mpsc::Receiver, + cancel_receiver: &Receiver, ) -> Result<(), CancelFormatChange> { let sprout_tip_tree = upgrade_db.sprout_tree_for_tip(); let history_tip_tree = upgrade_db.history_tree(); @@ -33,7 +34,7 @@ pub fn run( batch.update_history_tree(upgrade_db, &history_tip_tree); // Return before we write if the upgrade is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -51,7 +52,7 @@ pub fn run( batch.delete_range_history_tree(upgrade_db, &Height(0), &MAX_ON_DISK_HEIGHT); // Return before we write if the upgrade is cancelled. - if !matches!(cancel_receiver.try_recv(), Err(mpsc::TryRecvError::Empty)) { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { return Err(CancelFormatChange); } @@ -144,7 +145,7 @@ pub fn quick_check(db: &ZebraDb) -> Result<(), String> { /// If the state is empty. pub fn detailed_check( db: &ZebraDb, - _cancel_receiver: &mpsc::Receiver, + _cancel_receiver: &Receiver, ) -> Result, CancelFormatChange> { // This upgrade only changes two key-value pairs, so checking it is always quick. Ok(quick_check(db)) diff --git a/zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_spends.rs b/zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_spends.rs new file mode 100644 index 00000000000..be754fac452 --- /dev/null +++ b/zebra-state/src/service/finalized_state/disk_format/upgrade/track_tx_locs_by_spends.rs @@ -0,0 +1,109 @@ +//! Tracks transaction locations by their inputs and revealed nullifiers. + +use std::sync::Arc; + +use crossbeam_channel::{Receiver, TryRecvError}; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; + +use zebra_chain::block::Height; + +use crate::{ + service::{finalized_state::ZebraDb, non_finalized_state::Chain, read}, + Spend, +}; + +use super::{super::super::DiskWriteBatch, CancelFormatChange}; + +/// Runs disk format upgrade for tracking transaction locations by their inputs and revealed nullifiers. +/// +/// Returns `Ok` if the upgrade completed, and `Err` if it was cancelled. +#[allow(clippy::unwrap_in_result)] +#[instrument(skip(zebra_db, cancel_receiver))] +pub fn run( + initial_tip_height: Height, + zebra_db: &ZebraDb, + cancel_receiver: &Receiver, +) -> Result<(), CancelFormatChange> { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + (0..=initial_tip_height.0) + .into_par_iter() + .try_for_each(|height| { + let height = Height(height); + let mut batch = DiskWriteBatch::new(); + let mut should_index_at_height = false; + + let transactions = zebra_db.transactions_by_location_range( + crate::TransactionLocation::from_index(height, 1) + ..=crate::TransactionLocation::max_for_height(height), + ); + + for (tx_loc, tx) in transactions { + if tx.is_coinbase() { + continue; + } + + if !should_index_at_height { + if let Some(spend) = tx + .inputs() + .iter() + .filter_map(|input| Some(input.outpoint()?.into())) + .chain(tx.sprout_nullifiers().cloned().map(Spend::from)) + .chain(tx.sapling_nullifiers().cloned().map(Spend::from)) + .chain(tx.orchard_nullifiers().cloned().map(Spend::from)) + .next() + { + if read::spending_transaction_hash::>(None, zebra_db, spend) + .is_some() + { + // Skip transactions in blocks with existing indexes + return Ok(()); + } else { + should_index_at_height = true + } + } else { + continue; + }; + } + + for input in tx.inputs() { + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + let spent_outpoint = input + .outpoint() + .expect("should filter out coinbase transactions"); + + let spent_output_location = zebra_db + .output_location(&spent_outpoint) + .expect("should have location for spent outpoint"); + + let _ = zebra_db + .tx_loc_by_spent_output_loc_cf() + .with_batch_for_writing(&mut batch) + .zs_insert(&spent_output_location, &tx_loc); + } + + batch + .prepare_nullifier_batch(zebra_db, &tx, tx_loc) + .expect("method should never return an error"); + } + + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + zebra_db + .write_batch(batch) + .expect("unexpected database write failure"); + + if !matches!(cancel_receiver.try_recv(), Err(TryRecvError::Empty)) { + return Err(CancelFormatChange); + } + + Ok(()) + }) +} diff --git a/zebra-state/src/service/finalized_state/zebra_db.rs b/zebra-state/src/service/finalized_state/zebra_db.rs index b7ae76ea3a1..951c78ec93c 100644 --- a/zebra-state/src/service/finalized_state/zebra_db.rs +++ b/zebra-state/src/service/finalized_state/zebra_db.rs @@ -9,12 +9,11 @@ //! [`crate::constants::state_database_format_version_in_code()`] must be incremented //! each time the database format (column, serialization, etc) changes. -use std::{ - path::Path, - sync::{mpsc, Arc}, -}; +use std::{path::Path, sync::Arc}; +use crossbeam_channel::bounded; use semver::Version; + use zebra_chain::parameters::Network; use crate::{ @@ -292,7 +291,7 @@ impl ZebraDb { if let Some(disk_version) = disk_version { // We need to keep the cancel handle until the format check has finished, // because dropping it cancels the format check. - let (_never_cancel_handle, never_cancel_receiver) = mpsc::sync_channel(1); + let (_never_cancel_handle, never_cancel_receiver) = bounded(1); // We block here because the checks are quick and database validity is // consensus-critical. diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index 4dc3a801ef3..6ad4cd93a60 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -11,6 +11,7 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, + ops::RangeBounds, sync::Arc, }; @@ -42,6 +43,9 @@ use crate::{ BoxError, HashOrHeight, }; +#[cfg(feature = "indexer")] +use crate::request::Spend; + #[cfg(test)] mod tests; @@ -139,25 +143,17 @@ impl ZebraDb { let header = self.block_header(height.into())?; // Transactions - let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap(); - - // Manually fetch the entire block's transactions - let mut transactions = Vec::new(); // TODO: // - split disk reads from deserialization, and run deserialization in parallel, // this improves performance for blocks with multiple large shielded transactions // - is this loop more efficient if we store the number of transactions? // - is the difference large enough to matter? - for tx_index in 0..=Transaction::max_allocation() { - let tx_loc = TransactionLocation::from_u64(height, tx_index); - - if let Some(tx) = self.db.zs_get(&tx_by_loc, &tx_loc) { - transactions.push(tx); - } else { - break; - } - } + let transactions = self + .transactions_by_height(height) + .map(|(_, tx)| tx) + .map(Arc::new) + .collect(); Some(Arc::new(Block { header, @@ -212,6 +208,45 @@ impl ZebraDb { // Read transaction methods + /// Returns the [`Transaction`] with [`transaction::Hash`], and its [`Height`], + /// if a transaction with that hash exists in the finalized chain. + #[allow(clippy::unwrap_in_result)] + pub fn transaction(&self, hash: transaction::Hash) -> Option<(Arc, Height)> { + let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap(); + + let transaction_location = self.transaction_location(hash)?; + + self.db + .zs_get(&tx_by_loc, &transaction_location) + .map(|tx| (tx, transaction_location.height)) + } + + /// Returns an iterator of all [`Transaction`]s for a provided block height in finalized state. + #[allow(clippy::unwrap_in_result)] + pub fn transactions_by_height( + &self, + height: Height, + ) -> impl Iterator + '_ { + self.transactions_by_location_range( + TransactionLocation::min_for_height(height) + ..=TransactionLocation::max_for_height(height), + ) + } + + /// Returns an iterator of all [`Transaction`]s in the provided range + /// of [`TransactionLocation`]s in finalized state. + #[allow(clippy::unwrap_in_result)] + pub fn transactions_by_location_range( + &self, + range: R, + ) -> impl Iterator + '_ + where + R: RangeBounds, + { + let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap(); + self.db.zs_forward_range_iter(tx_by_loc, range) + } + /// Returns the [`TransactionLocation`] for [`transaction::Hash`], /// if it exists in the finalized chain. #[allow(clippy::unwrap_in_result)] @@ -229,19 +264,18 @@ impl ZebraDb { self.db.zs_get(&hash_by_tx_loc, &location) } - /// Returns the [`Transaction`] with [`transaction::Hash`], and its [`Height`], - /// if a transaction with that hash exists in the finalized chain. - // - // TODO: move this method to the start of the section - #[allow(clippy::unwrap_in_result)] - pub fn transaction(&self, hash: transaction::Hash) -> Option<(Arc, Height)> { - let tx_by_loc = self.db.cf_handle("tx_by_loc").unwrap(); - - let transaction_location = self.transaction_location(hash)?; - - self.db - .zs_get(&tx_by_loc, &transaction_location) - .map(|tx| (tx, transaction_location.height)) + /// Returns the [`transaction::Hash`] of the transaction that spent or revealed the given + /// [`transparent::OutPoint`] or nullifier, if it is spent or revealed in the finalized state. + #[cfg(feature = "indexer")] + pub fn spending_transaction_hash(&self, spend: &Spend) -> Option { + let tx_loc = match spend { + Spend::OutPoint(outpoint) => self.spending_tx_loc(outpoint)?, + Spend::Sprout(nullifier) => self.sprout_revealing_tx_loc(nullifier)?, + Spend::Sapling(nullifier) => self.sapling_revealing_tx_loc(nullifier)?, + Spend::Orchard(nullifier) => self.orchard_revealing_tx_loc(nullifier)?, + }; + + self.transaction_hash(tx_loc) } /// Returns the [`transaction::Hash`]es in the block with `hash_or_height`, @@ -355,6 +389,13 @@ impl ZebraDb { .iter() .map(|(outpoint, _output_loc, utxo)| (*outpoint, utxo.clone())) .collect(); + + // TODO: Add `OutputLocation`s to the values in `spent_utxos_by_outpoint` to avoid creating a second hashmap with the same keys + #[cfg(feature = "indexer")] + let out_loc_by_outpoint: HashMap = spent_utxos + .iter() + .map(|(outpoint, out_loc, _utxo)| (*outpoint, *out_loc)) + .collect(); let spent_utxos_by_out_loc: BTreeMap = spent_utxos .into_iter() .map(|(_outpoint, out_loc, utxo)| (out_loc, utxo)) @@ -392,6 +433,8 @@ impl ZebraDb { new_outputs_by_out_loc, spent_utxos_by_outpoint, spent_utxos_by_out_loc, + #[cfg(feature = "indexer")] + out_loc_by_outpoint, address_balances, self.finalized_value_pool(), prev_note_commitment_trees, @@ -448,6 +491,10 @@ impl DiskWriteBatch { new_outputs_by_out_loc: BTreeMap, spent_utxos_by_outpoint: HashMap, spent_utxos_by_out_loc: BTreeMap, + #[cfg(feature = "indexer")] out_loc_by_outpoint: HashMap< + transparent::OutPoint, + OutputLocation, + >, address_balances: HashMap, value_pool: ValueBalance, prev_note_commitment_trees: Option, @@ -463,7 +510,7 @@ impl DiskWriteBatch { // which is already present from height 1 to the first shielded transaction. // // In Zebra we include the nullifiers and note commitments in the genesis block because it simplifies our code. - self.prepare_shielded_transaction_batch(db, finalized)?; + self.prepare_shielded_transaction_batch(zebra_db, finalized)?; self.prepare_trees_batch(zebra_db, finalized, prev_note_commitment_trees)?; // # Consensus @@ -479,12 +526,14 @@ impl DiskWriteBatch { if !finalized.height.is_min() { // Commit transaction indexes self.prepare_transparent_transaction_batch( - db, + zebra_db, network, finalized, &new_outputs_by_out_loc, &spent_utxos_by_outpoint, &spent_utxos_by_out_loc, + #[cfg(feature = "indexer")] + &out_loc_by_outpoint, address_balances, )?; diff --git a/zebra-state/src/service/finalized_state/zebra_db/shielded.rs b/zebra-state/src/service/finalized_state/zebra_db/shielded.rs index 4bba75b1891..b4036a31e5f 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/shielded.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/shielded.rs @@ -29,11 +29,11 @@ use zebra_chain::{ use crate::{ request::{FinalizedBlock, Treestate}, service::finalized_state::{ - disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}, + disk_db::{DiskWriteBatch, ReadDisk, WriteDisk}, disk_format::RawBytes, zebra_db::ZebraDb, }, - BoxError, + BoxError, TransactionLocation, }; // Doc-only items @@ -61,6 +61,42 @@ impl ZebraDb { self.db.zs_contains(&orchard_nullifiers, &orchard_nullifier) } + /// Returns the [`TransactionLocation`] of the transaction that revealed + /// the given [`sprout::Nullifier`], if it is revealed in the finalized state and its + /// spending transaction hash has been indexed. + #[allow(clippy::unwrap_in_result)] + pub fn sprout_revealing_tx_loc( + &self, + sprout_nullifier: &sprout::Nullifier, + ) -> Option { + let sprout_nullifiers = self.db.cf_handle("sprout_nullifiers").unwrap(); + self.db.zs_get(&sprout_nullifiers, &sprout_nullifier)? + } + + /// Returns the [`TransactionLocation`] of the transaction that revealed + /// the given [`sapling::Nullifier`], if it is revealed in the finalized state and its + /// spending transaction hash has been indexed. + #[allow(clippy::unwrap_in_result)] + pub fn sapling_revealing_tx_loc( + &self, + sapling_nullifier: &sapling::Nullifier, + ) -> Option { + let sapling_nullifiers = self.db.cf_handle("sapling_nullifiers").unwrap(); + self.db.zs_get(&sapling_nullifiers, &sapling_nullifier)? + } + + /// Returns the [`TransactionLocation`] of the transaction that revealed + /// the given [`orchard::Nullifier`], if it is revealed in the finalized state and its + /// spending transaction hash has been indexed. + #[allow(clippy::unwrap_in_result)] + pub fn orchard_revealing_tx_loc( + &self, + orchard_nullifier: &orchard::Nullifier, + ) -> Option { + let orchard_nullifiers = self.db.cf_handle("orchard_nullifiers").unwrap(); + self.db.zs_get(&orchard_nullifiers, &orchard_nullifier)? + } + /// Returns `true` if the finalized state contains `sprout_anchor`. #[allow(dead_code)] pub fn contains_sprout_anchor(&self, sprout_anchor: &sprout::tree::Root) -> bool { @@ -437,14 +473,22 @@ impl DiskWriteBatch { /// - Propagates any errors from updating note commitment trees pub fn prepare_shielded_transaction_batch( &mut self, - db: &DiskDb, + zebra_db: &ZebraDb, finalized: &FinalizedBlock, ) -> Result<(), BoxError> { - let FinalizedBlock { block, .. } = finalized; + #[cfg(feature = "indexer")] + let FinalizedBlock { block, height, .. } = finalized; // Index each transaction's shielded data - for transaction in &block.transactions { - self.prepare_nullifier_batch(db, transaction)?; + #[cfg(feature = "indexer")] + for (tx_index, transaction) in block.transactions.iter().enumerate() { + let tx_loc = TransactionLocation::from_usize(*height, tx_index); + self.prepare_nullifier_batch(zebra_db, transaction, tx_loc)?; + } + + #[cfg(not(feature = "indexer"))] + for transaction in &finalized.block.transactions { + self.prepare_nullifier_batch(zebra_db, transaction)?; } Ok(()) @@ -459,22 +503,29 @@ impl DiskWriteBatch { #[allow(clippy::unwrap_in_result)] pub fn prepare_nullifier_batch( &mut self, - db: &DiskDb, + zebra_db: &ZebraDb, transaction: &Transaction, + #[cfg(feature = "indexer")] transaction_location: TransactionLocation, ) -> Result<(), BoxError> { + let db = &zebra_db.db; let sprout_nullifiers = db.cf_handle("sprout_nullifiers").unwrap(); let sapling_nullifiers = db.cf_handle("sapling_nullifiers").unwrap(); let orchard_nullifiers = db.cf_handle("orchard_nullifiers").unwrap(); + #[cfg(feature = "indexer")] + let insert_value = transaction_location; + #[cfg(not(feature = "indexer"))] + let insert_value = (); + // Mark sprout, sapling and orchard nullifiers as spent for sprout_nullifier in transaction.sprout_nullifiers() { - self.zs_insert(&sprout_nullifiers, sprout_nullifier, ()); + self.zs_insert(&sprout_nullifiers, sprout_nullifier, insert_value); } for sapling_nullifier in transaction.sapling_nullifiers() { - self.zs_insert(&sapling_nullifiers, sapling_nullifier, ()); + self.zs_insert(&sapling_nullifiers, sapling_nullifier, insert_value); } for orchard_nullifier in transaction.orchard_nullifiers() { - self.zs_insert(&orchard_nullifiers, orchard_nullifier, ()); + self.zs_insert(&orchard_nullifiers, orchard_nullifier, insert_value); } Ok(()) diff --git a/zebra-state/src/service/finalized_state/zebra_db/transparent.rs b/zebra-state/src/service/finalized_state/zebra_db/transparent.rs index edfcf509b76..06a09d803e6 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/transparent.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/transparent.rs @@ -1,5 +1,6 @@ //! Provides high-level access to database: -//! - unspent [`transparent::Output`]s (UTXOs), and +//! - unspent [`transparent::Output`]s (UTXOs), +//! - spent [`transparent::Output`]s, and //! - transparent address indexes. //! //! This module makes sure that: @@ -40,9 +41,42 @@ use crate::{ BoxError, }; +use super::super::TypedColumnFamily; + +/// The name of the transaction hash by spent outpoints column family. +/// +/// This constant should be used so the compiler can detect typos. +pub const TX_LOC_BY_SPENT_OUT_LOC: &str = "tx_loc_by_spent_out_loc"; + +/// The type for reading value pools from the database. +/// +/// This constant should be used so the compiler can detect incorrectly typed accesses to the +/// column family. +pub type TransactionLocationBySpentOutputLocationCf<'cf> = + TypedColumnFamily<'cf, OutputLocation, TransactionLocation>; + impl ZebraDb { + // Column family convenience methods + + /// Returns a typed handle to the transaction location by spent output location column family. + pub(crate) fn tx_loc_by_spent_output_loc_cf( + &self, + ) -> TransactionLocationBySpentOutputLocationCf { + TransactionLocationBySpentOutputLocationCf::new(&self.db, TX_LOC_BY_SPENT_OUT_LOC) + .expect("column family was created when database was created") + } + // Read transparent methods + /// Returns the [`TransactionLocation`] for a transaction that spent the output + /// at the provided [`OutputLocation`], if it is in the finalized state. + pub fn tx_location_by_spent_output_location( + &self, + output_location: &OutputLocation, + ) -> Option { + self.tx_loc_by_spent_output_loc_cf().zs_get(output_location) + } + /// Returns the [`AddressBalanceLocation`] for a [`transparent::Address`], /// if it is in the finalized state. #[allow(clippy::unwrap_in_result)] @@ -90,6 +124,14 @@ impl ZebraDb { self.utxo_by_location(output_location) } + /// Returns the [`TransactionLocation`] of the transaction that spent the given + /// [`transparent::OutPoint`], if it is unspent in the finalized state and its + /// spending transaction hash has been indexed. + pub fn spending_tx_loc(&self, outpoint: &transparent::OutPoint) -> Option { + let output_location = self.output_location(outpoint)?; + self.tx_location_by_spent_output_location(&output_location) + } + /// Returns the transparent output for an [`OutputLocation`], /// if it is unspent in the finalized state. #[allow(clippy::unwrap_in_result)] @@ -342,14 +384,19 @@ impl DiskWriteBatch { #[allow(clippy::too_many_arguments)] pub fn prepare_transparent_transaction_batch( &mut self, - db: &DiskDb, + zebra_db: &ZebraDb, network: &Network, finalized: &FinalizedBlock, new_outputs_by_out_loc: &BTreeMap, spent_utxos_by_outpoint: &HashMap, spent_utxos_by_out_loc: &BTreeMap, + #[cfg(feature = "indexer")] out_loc_by_outpoint: &HashMap< + transparent::OutPoint, + OutputLocation, + >, mut address_balances: HashMap, ) -> Result<(), BoxError> { + let db = &zebra_db.db; let FinalizedBlock { block, height, .. } = finalized; // Update created and spent transparent outputs @@ -371,11 +418,13 @@ impl DiskWriteBatch { let spending_tx_location = TransactionLocation::from_usize(*height, tx_index); self.prepare_spending_transparent_tx_ids_batch( - db, + zebra_db, network, spending_tx_location, transaction, spent_utxos_by_outpoint, + #[cfg(feature = "indexer")] + out_loc_by_outpoint, &address_balances, )?; } @@ -531,16 +580,21 @@ impl DiskWriteBatch { /// # Errors /// /// - This method doesn't currently return any errors, but it might in future - #[allow(clippy::unwrap_in_result)] + #[allow(clippy::unwrap_in_result, clippy::too_many_arguments)] pub fn prepare_spending_transparent_tx_ids_batch( &mut self, - db: &DiskDb, + zebra_db: &ZebraDb, network: &Network, spending_tx_location: TransactionLocation, transaction: &Transaction, spent_utxos_by_outpoint: &HashMap, + #[cfg(feature = "indexer")] out_loc_by_outpoint: &HashMap< + transparent::OutPoint, + OutputLocation, + >, address_balances: &HashMap, ) -> Result<(), BoxError> { + let db = &zebra_db.db; let tx_loc_by_transparent_addr_loc = db.cf_handle("tx_loc_by_transparent_addr_loc").unwrap(); @@ -569,6 +623,18 @@ impl DiskWriteBatch { AddressTransaction::new(sending_address_location, spending_tx_location); self.zs_insert(&tx_loc_by_transparent_addr_loc, address_transaction, ()); } + + #[cfg(feature = "indexer")] + { + let spent_output_location = out_loc_by_outpoint + .get(&spent_outpoint) + .expect("spent outpoints must already have output locations"); + + let _ = zebra_db + .tx_loc_by_spent_output_loc_cf() + .with_batch_for_writing(self) + .zs_insert(spent_output_location, &spending_tx_location); + } } Ok(()) diff --git a/zebra-state/src/service/non_finalized_state.rs b/zebra-state/src/service/non_finalized_state.rs index 08d64455024..ebcbb2cfd35 100644 --- a/zebra-state/src/service/non_finalized_state.rs +++ b/zebra-state/src/service/non_finalized_state.rs @@ -26,7 +26,7 @@ mod chain; #[cfg(test)] mod tests; -pub(crate) use chain::Chain; +pub(crate) use chain::{Chain, SpendingTransactionId}; /// The state of the chains in memory, including queued blocks. /// @@ -540,7 +540,7 @@ impl NonFinalizedState { #[allow(dead_code)] pub fn best_contains_sprout_nullifier(&self, sprout_nullifier: &sprout::Nullifier) -> bool { self.best_chain() - .map(|best_chain| best_chain.sprout_nullifiers.contains(sprout_nullifier)) + .map(|best_chain| best_chain.sprout_nullifiers.contains_key(sprout_nullifier)) .unwrap_or(false) } @@ -552,7 +552,11 @@ impl NonFinalizedState { sapling_nullifier: &zebra_chain::sapling::Nullifier, ) -> bool { self.best_chain() - .map(|best_chain| best_chain.sapling_nullifiers.contains(sapling_nullifier)) + .map(|best_chain| { + best_chain + .sapling_nullifiers + .contains_key(sapling_nullifier) + }) .unwrap_or(false) } @@ -564,7 +568,11 @@ impl NonFinalizedState { orchard_nullifier: &zebra_chain::orchard::Nullifier, ) -> bool { self.best_chain() - .map(|best_chain| best_chain.orchard_nullifiers.contains(orchard_nullifier)) + .map(|best_chain| { + best_chain + .orchard_nullifiers + .contains_key(orchard_nullifier) + }) .unwrap_or(false) } diff --git a/zebra-state/src/service/non_finalized_state/chain.rs b/zebra-state/src/service/non_finalized_state/chain.rs index 0dfcd585c12..6ad284a23f5 100644 --- a/zebra-state/src/service/non_finalized_state/chain.rs +++ b/zebra-state/src/service/non_finalized_state/chain.rs @@ -33,6 +33,9 @@ use crate::{ TransactionLocation, ValidateContextError, }; +#[cfg(feature = "indexer")] +use crate::request::Spend; + use self::index::TransparentTransfers; pub mod index; @@ -67,6 +70,14 @@ pub struct Chain { pub(super) last_fork_height: Option, } +/// Spending transaction id type when the `indexer` feature is selected. +#[cfg(feature = "indexer")] +pub(crate) type SpendingTransactionId = transaction::Hash; + +/// Spending transaction id type when the `indexer` feature is not selected. +#[cfg(not(feature = "indexer"))] +pub(crate) type SpendingTransactionId = (); + /// The internal state of [`Chain`]. #[derive(Clone, Debug, PartialEq, Eq, Default)] pub struct ChainInner { @@ -90,9 +101,11 @@ pub struct ChainInner { // // TODO: replace OutPoint with OutputLocation? pub(crate) created_utxos: HashMap, - /// The [`transparent::OutPoint`]s spent by `blocks`, - /// including those created by earlier transactions or blocks in the chain. - pub(crate) spent_utxos: HashSet, + /// The spending transaction ids by [`transparent::OutPoint`]s spent by `blocks`, + /// including spent outputs created by earlier transactions or blocks in the chain. + /// + /// Note: Spending transaction ids are only tracked when the `indexer` feature is selected. + pub(crate) spent_utxos: HashMap, // Note commitment trees // @@ -176,12 +189,15 @@ pub struct ChainInner { // Nullifiers // - /// The Sprout nullifiers revealed by `blocks`. - pub(crate) sprout_nullifiers: HashSet, - /// The Sapling nullifiers revealed by `blocks`. - pub(crate) sapling_nullifiers: HashSet, - /// The Orchard nullifiers revealed by `blocks`. - pub(crate) orchard_nullifiers: HashSet, + /// The Sprout nullifiers revealed by `blocks` and, if the `indexer` feature is selected, + /// the id of the transaction that revealed them. + pub(crate) sprout_nullifiers: HashMap, + /// The Sapling nullifiers revealed by `blocks` and, if the `indexer` feature is selected, + /// the id of the transaction that revealed them. + pub(crate) sapling_nullifiers: HashMap, + /// The Orchard nullifiers revealed by `blocks` and, if the `indexer` feature is selected, + /// the id of the transaction that revealed them. + pub(crate) orchard_nullifiers: HashMap, // Transparent Transfers // TODO: move to the transparent section @@ -1234,7 +1250,7 @@ impl Chain { /// and removed from the relevant chain(s). pub fn unspent_utxos(&self) -> HashMap { let mut unspent_utxos = self.created_utxos.clone(); - unspent_utxos.retain(|outpoint, _utxo| !self.spent_utxos.contains(outpoint)); + unspent_utxos.retain(|outpoint, _utxo| !self.spent_utxos.contains_key(outpoint)); unspent_utxos } @@ -1244,11 +1260,23 @@ impl Chain { /// /// UTXOs are returned regardless of whether they have been spent. pub fn created_utxo(&self, outpoint: &transparent::OutPoint) -> Option { - if let Some(utxo) = self.created_utxos.get(outpoint) { - return Some(utxo.utxo.clone()); + self.created_utxos + .get(outpoint) + .map(|utxo| utxo.utxo.clone()) + } + + /// Returns the [`Hash`](transaction::Hash) of the transaction that spent an output at + /// the provided [`transparent::OutPoint`] or revealed the provided nullifier, if it exists + /// and is spent or revealed by this chain. + #[cfg(feature = "indexer")] + pub fn spending_transaction_hash(&self, spend: &Spend) -> Option { + match spend { + Spend::OutPoint(outpoint) => self.spent_utxos.get(outpoint), + Spend::Sprout(nullifier) => self.sprout_nullifiers.get(nullifier), + Spend::Sapling(nullifier) => self.sapling_nullifiers.get(nullifier), + Spend::Orchard(nullifier) => self.orchard_nullifiers.get(nullifier), } - - None + .cloned() } // Address index queries @@ -1536,10 +1564,17 @@ impl Chain { self.update_chain_tip_with(&(inputs, &transaction_hash, spent_outputs))?; // add the shielded data - self.update_chain_tip_with(joinsplit_data)?; - self.update_chain_tip_with(sapling_shielded_data_per_spend_anchor)?; - self.update_chain_tip_with(sapling_shielded_data_shared_anchor)?; - self.update_chain_tip_with(orchard_shielded_data)?; + + #[cfg(not(feature = "indexer"))] + let transaction_hash = (); + + self.update_chain_tip_with(&(joinsplit_data, &transaction_hash))?; + self.update_chain_tip_with(&( + sapling_shielded_data_per_spend_anchor, + &transaction_hash, + ))?; + self.update_chain_tip_with(&(sapling_shielded_data_shared_anchor, &transaction_hash))?; + self.update_chain_tip_with(&(orchard_shielded_data, &transaction_hash))?; } // update the chain value pool balances @@ -1694,10 +1729,20 @@ impl UpdateWith for Chain { ); // remove the shielded data - self.revert_chain_with(joinsplit_data, position); - self.revert_chain_with(sapling_shielded_data_per_spend_anchor, position); - self.revert_chain_with(sapling_shielded_data_shared_anchor, position); - self.revert_chain_with(orchard_shielded_data, position); + + #[cfg(not(feature = "indexer"))] + let transaction_hash = &(); + + self.revert_chain_with(&(joinsplit_data, transaction_hash), position); + self.revert_chain_with( + &(sapling_shielded_data_per_spend_anchor, transaction_hash), + position, + ); + self.revert_chain_with( + &(sapling_shielded_data_shared_anchor, transaction_hash), + position, + ); + self.revert_chain_with(&(orchard_shielded_data, transaction_hash), position); } // TODO: move these to the shielded UpdateWith.revert...()? @@ -1838,10 +1883,18 @@ impl continue; }; + #[cfg(feature = "indexer")] + let insert_value = *spending_tx_hash; + #[cfg(not(feature = "indexer"))] + let insert_value = (); + // Index the spent outpoint in the chain - let first_spend = self.spent_utxos.insert(spent_outpoint); + let was_spend_newly_inserted = self + .spent_utxos + .insert(spent_outpoint, insert_value) + .is_none(); assert!( - first_spend, + was_spend_newly_inserted, "unexpected duplicate spent output: should be checked earlier" ); @@ -1889,9 +1942,9 @@ impl }; // Revert the spent outpoint in the chain - let spent_outpoint_was_removed = self.spent_utxos.remove(&spent_outpoint); + let was_spent_outpoint_removed = self.spent_utxos.remove(&spent_outpoint).is_some(); assert!( - spent_outpoint_was_removed, + was_spent_outpoint_removed, "spent_utxos must be present if block was added to chain" ); @@ -1926,11 +1979,19 @@ impl } } -impl UpdateWith>> for Chain { +impl + UpdateWith<( + &Option>, + &SpendingTransactionId, + )> for Chain +{ #[instrument(skip(self, joinsplit_data))] fn update_chain_tip_with( &mut self, - joinsplit_data: &Option>, + &(joinsplit_data, revealing_tx_id): &( + &Option>, + &SpendingTransactionId, + ), ) -> Result<(), ValidateContextError> { if let Some(joinsplit_data) = joinsplit_data { // We do note commitment tree updates in parallel rayon threads. @@ -1938,6 +1999,7 @@ impl UpdateWith>> for Chain { check::nullifier::add_to_non_finalized_chain_unique( &mut self.sprout_nullifiers, joinsplit_data.nullifiers(), + *revealing_tx_id, )?; } Ok(()) @@ -1951,7 +2013,10 @@ impl UpdateWith>> for Chain { #[instrument(skip(self, joinsplit_data))] fn revert_chain_with( &mut self, - joinsplit_data: &Option>, + &(joinsplit_data, _revealing_tx_id): &( + &Option>, + &SpendingTransactionId, + ), _position: RevertPosition, ) { if let Some(joinsplit_data) = joinsplit_data { @@ -1967,14 +2032,21 @@ impl UpdateWith>> for Chain { } } -impl UpdateWith>> for Chain +impl + UpdateWith<( + &Option>, + &SpendingTransactionId, + )> for Chain where AnchorV: sapling::AnchorVariant + Clone, { #[instrument(skip(self, sapling_shielded_data))] fn update_chain_tip_with( &mut self, - sapling_shielded_data: &Option>, + &(sapling_shielded_data, revealing_tx_id): &( + &Option>, + &SpendingTransactionId, + ), ) -> Result<(), ValidateContextError> { if let Some(sapling_shielded_data) = sapling_shielded_data { // We do note commitment tree updates in parallel rayon threads. @@ -1982,6 +2054,7 @@ where check::nullifier::add_to_non_finalized_chain_unique( &mut self.sapling_nullifiers, sapling_shielded_data.nullifiers(), + *revealing_tx_id, )?; } Ok(()) @@ -1995,7 +2068,10 @@ where #[instrument(skip(self, sapling_shielded_data))] fn revert_chain_with( &mut self, - sapling_shielded_data: &Option>, + &(sapling_shielded_data, _revealing_tx_id): &( + &Option>, + &SpendingTransactionId, + ), _position: RevertPosition, ) { if let Some(sapling_shielded_data) = sapling_shielded_data { @@ -2011,11 +2087,14 @@ where } } -impl UpdateWith> for Chain { +impl UpdateWith<(&Option, &SpendingTransactionId)> for Chain { #[instrument(skip(self, orchard_shielded_data))] fn update_chain_tip_with( &mut self, - orchard_shielded_data: &Option, + &(orchard_shielded_data, revealing_tx_id): &( + &Option, + &SpendingTransactionId, + ), ) -> Result<(), ValidateContextError> { if let Some(orchard_shielded_data) = orchard_shielded_data { // We do note commitment tree updates in parallel rayon threads. @@ -2023,6 +2102,7 @@ impl UpdateWith> for Chain { check::nullifier::add_to_non_finalized_chain_unique( &mut self.orchard_nullifiers, orchard_shielded_data.nullifiers(), + *revealing_tx_id, )?; } Ok(()) @@ -2036,7 +2116,10 @@ impl UpdateWith> for Chain { #[instrument(skip(self, orchard_shielded_data))] fn revert_chain_with( &mut self, - orchard_shielded_data: &Option, + (orchard_shielded_data, _revealing_tx_id): &( + &Option, + &SpendingTransactionId, + ), _position: RevertPosition, ) { if let Some(orchard_shielded_data) = orchard_shielded_data { diff --git a/zebra-state/src/service/read.rs b/zebra-state/src/service/read.rs index 0188ca1bf5e..0b0ece3a358 100644 --- a/zebra-state/src/service/read.rs +++ b/zebra-state/src/service/read.rs @@ -33,6 +33,10 @@ pub use address::{ pub use block::{ any_utxo, block, block_header, mined_transaction, transaction_hashes_for_block, unspent_utxo, }; + +#[cfg(feature = "indexer")] +pub use block::spending_transaction_hash; + pub use find::{ best_tip, block_locator, depth, finalized_state_contains_block_hash, find_chain_hashes, find_chain_headers, hash_by_height, height_by_hash, next_median_time_past, diff --git a/zebra-state/src/service/read/block.rs b/zebra-state/src/service/read/block.rs index 283fe9ddc4f..99d4189a3e4 100644 --- a/zebra-state/src/service/read/block.rs +++ b/zebra-state/src/service/read/block.rs @@ -30,6 +30,9 @@ use crate::{ HashOrHeight, }; +#[cfg(feature = "indexer")] +use crate::request::Spend; + /// Returns the [`Block`] with [`block::Hash`] or /// [`Height`], if it exists in the non-finalized `chain` or finalized `db`. pub fn block(chain: Option, db: &ZebraDb, hash_or_height: HashOrHeight) -> Option> @@ -176,11 +179,29 @@ where C: AsRef, { match chain { - Some(chain) if chain.as_ref().spent_utxos.contains(&outpoint) => None, + Some(chain) if chain.as_ref().spent_utxos.contains_key(&outpoint) => None, chain => utxo(chain, db, outpoint), } } +/// Returns the [`Hash`](transaction::Hash) of the transaction that spent an output at +/// the provided [`transparent::OutPoint`] or revealed the provided nullifier, if it exists +/// and is spent or revealed in the non-finalized `chain` or finalized `db` and its +/// spending transaction hash has been indexed. +#[cfg(feature = "indexer")] +pub fn spending_transaction_hash( + chain: Option, + db: &ZebraDb, + spend: Spend, +) -> Option +where + C: AsRef, +{ + chain + .and_then(|chain| chain.as_ref().spending_transaction_hash(&spend)) + .or_else(|| db.spending_transaction_hash(&spend)) +} + /// Returns the [`Utxo`] for [`transparent::OutPoint`], if it exists in any chain /// in the `non_finalized_state`, or in the finalized `db`. /// diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index ce5fbde2a21..c0bb6f0873d 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -48,6 +48,8 @@ features = [ "journald", "prometheus", "sentry", + "indexer", + "getblocktemplate-rpcs" ] [features] @@ -59,8 +61,8 @@ default-release-binaries = ["default", "sentry"] # Production features that activate extra dependencies, or extra features in dependencies -# Indexer RPC support -indexer-rpcs = ["zebra-rpc/indexer-rpcs"] +# Indexer support +indexer = ["zebra-rpc/indexer-rpcs", "zebra-state/indexer"] # Mining RPC support getblocktemplate-rpcs = [ diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index ab06e546fc8..035b9a20100 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -271,7 +271,7 @@ impl StartCmd { // TODO: Add a shutdown signal and start the server with `serve_with_incoming_shutdown()` if // any related unit tests sometimes crash with memory errors - #[cfg(feature = "indexer-rpcs")] + #[cfg(feature = "indexer")] let indexer_rpc_task_handle = if let Some(indexer_listen_addr) = config.rpc.indexer_listen_addr { info!("spawning indexer RPC server"); @@ -289,7 +289,7 @@ impl StartCmd { tokio::spawn(std::future::pending().in_current_span()) }; - #[cfg(not(feature = "indexer-rpcs"))] + #[cfg(not(feature = "indexer"))] // Spawn a dummy indexer rpc task which doesn't do anything and never finishes. let indexer_rpc_task_handle: tokio::task::JoinHandle> = tokio::spawn(std::future::pending().in_current_span()); diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index ef2de55dc83..34cd0ba682d 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -122,6 +122,12 @@ //! ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test submit_block --features getblocktemplate-rpcs --release -- --ignored --nocapture //! ``` //! +//! Example of how to run the has_spending_transaction_ids test: +//! +//! ```console +//! RUST_LOG=info ZEBRA_CACHED_STATE_DIR=/path/to/zebra/state cargo test has_spending_transaction_ids --features "indexer" --release -- --ignored --nocapture +//! ``` +//! //! Please refer to the documentation of each test for more information. //! //! ## Checkpoint Generation Tests @@ -3539,6 +3545,151 @@ async fn nu6_funding_streams_and_coinbase_balance() -> Result<()> { Ok(()) } +/// Checks that the cached finalized state has the spending transaction ids for every +/// spent outpoint and revealed nullifier in the last 100 blocks of a cached state. +// +// Note: This test is meant to be run locally with a prepared finalized state that +// has spending transaction ids. This can be done by starting Zebra with the +// `indexer` feature and waiting until the db format upgrade is complete. It +// can be undone (removing the indexes) by starting Zebra without the feature +// and waiting until the db format downgrade is complete. +#[tokio::test(flavor = "multi_thread")] +#[ignore] +#[cfg(feature = "indexer")] +async fn has_spending_transaction_ids() -> Result<()> { + use std::sync::Arc; + use tower::Service; + use zebra_chain::{chain_tip::ChainTip, transparent::Input}; + use zebra_state::{ + ReadRequest, ReadResponse, Request, Response, SemanticallyVerifiedBlock, Spend, + }; + + use common::cached_state::future_blocks; + + let _init_guard = zebra_test::init(); + let test_type = UpdateZebraCachedStateWithRpc; + let test_name = "has_spending_transaction_ids_test"; + let network = Mainnet; + + let Some(zebrad_state_path) = test_type.zebrad_state_path(test_name) else { + // Skip test if there's no cached state. + return Ok(()); + }; + + tracing::info!("loading blocks for non-finalized state"); + + let non_finalized_blocks = future_blocks(&network, test_type, test_name, 100).await?; + + let (mut state, mut read_state, latest_chain_tip, _chain_tip_change) = + common::cached_state::start_state_service_with_cache_dir(&Mainnet, zebrad_state_path) + .await?; + + tracing::info!("committing blocks to non-finalized state"); + + for block in non_finalized_blocks { + let expected_hash = block.hash(); + let block = SemanticallyVerifiedBlock::with_hash(Arc::new(block), expected_hash); + let Response::Committed(block_hash) = state + .ready() + .await + .map_err(|err| eyre!(err))? + .call(Request::CommitSemanticallyVerifiedBlock(block)) + .await + .map_err(|err| eyre!(err))? + else { + panic!("unexpected response to Block request"); + }; + + assert_eq!( + expected_hash, block_hash, + "state should respond with expected block hash" + ); + } + + let mut tip_hash = latest_chain_tip + .best_tip_hash() + .expect("cached state must not be empty"); + + tracing::info!("checking indexes of spending transaction ids"); + + // Read the last 500 blocks - should be greater than the MAX_BLOCK_REORG_HEIGHT so that + // both the finalized and non-finalized state are checked. + let num_blocks_to_check = 500; + let mut is_failure = false; + for i in 0..num_blocks_to_check { + let ReadResponse::Block(block) = read_state + .ready() + .await + .map_err(|err| eyre!(err))? + .call(ReadRequest::Block(tip_hash.into())) + .await + .map_err(|err| eyre!(err))? + else { + panic!("unexpected response to Block request"); + }; + + let block = block.expect("should have block with latest_chain_tip hash"); + + let spends_with_spending_tx_hashes = block.transactions.iter().cloned().flat_map(|tx| { + let tx_hash = tx.hash(); + tx.inputs() + .iter() + .filter_map(Input::outpoint) + .map(Spend::from) + .chain(tx.sprout_nullifiers().cloned().map(Spend::from)) + .chain(tx.sapling_nullifiers().cloned().map(Spend::from)) + .chain(tx.orchard_nullifiers().cloned().map(Spend::from)) + .map(|spend| (spend, tx_hash)) + .collect::>() + }); + + for (spend, expected_transaction_hash) in spends_with_spending_tx_hashes { + let ReadResponse::TransactionId(transaction_hash) = read_state + .ready() + .await + .map_err(|err| eyre!(err))? + .call(ReadRequest::SpendingTransactionId(spend)) + .await + .map_err(|err| eyre!(err))? + else { + panic!("unexpected response to Block request"); + }; + + let Some(transaction_hash) = transaction_hash else { + tracing::warn!( + ?spend, + depth = i, + height = ?block.coinbase_height(), + "querying spending tx id for spend failed" + ); + is_failure = true; + continue; + }; + + assert_eq!( + transaction_hash, expected_transaction_hash, + "spending transaction hash should match expected transaction hash" + ); + } + + if i % 25 == 0 { + tracing::info!( + height = ?block.coinbase_height(), + "has all spending tx ids at and above block" + ); + } + + tip_hash = block.header.previous_block_hash; + } + + assert!( + !is_failure, + "at least one spend was missing a spending transaction id" + ); + + Ok(()) +} + /// Check that Zebra does not depend on any crates from git sources. #[test] #[ignore] diff --git a/zebrad/tests/common/cached_state.rs b/zebrad/tests/common/cached_state.rs index 58f6064cdf5..c290cde2cd9 100644 --- a/zebrad/tests/common/cached_state.rs +++ b/zebrad/tests/common/cached_state.rs @@ -167,13 +167,13 @@ pub async fn load_tip_height_from_state_directory( /// ## Panics /// /// If the provided `test_type` doesn't need an rpc server and cached state, or if `max_num_blocks` is 0 -pub async fn get_future_blocks( +pub async fn future_blocks( network: &Network, test_type: TestType, test_name: &str, max_num_blocks: u32, ) -> Result> { - let blocks: Vec = get_raw_future_blocks(network, test_type, test_name, max_num_blocks) + let blocks: Vec = raw_future_blocks(network, test_type, test_name, max_num_blocks) .await? .into_iter() .map(hex::decode) @@ -198,7 +198,7 @@ pub async fn get_future_blocks( /// ## Panics /// /// If the provided `test_type` doesn't need an rpc server and cached state, or if `max_num_blocks` is 0 -pub async fn get_raw_future_blocks( +pub async fn raw_future_blocks( network: &Network, test_type: TestType, test_name: &str, @@ -211,13 +211,13 @@ pub async fn get_raw_future_blocks( assert!( test_type.needs_zebra_cached_state() && test_type.needs_zebra_rpc_server(), - "get_raw_future_blocks needs zebra cached state and rpc server" + "raw_future_blocks needs zebra cached state and rpc server" ); let should_sync = true; let (zebrad, zebra_rpc_address) = spawn_zebrad_for_rpc(network.clone(), test_name, test_type, should_sync)? - .ok_or_else(|| eyre!("get_raw_future_blocks requires a cached state"))?; + .ok_or_else(|| eyre!("raw_future_blocks requires a cached state"))?; let rpc_address = zebra_rpc_address.expect("test type must have RPC port"); let mut zebrad = check_sync_logs_until( diff --git a/zebrad/tests/common/get_block_template_rpcs/submit_block.rs b/zebrad/tests/common/get_block_template_rpcs/submit_block.rs index 399efc8d99e..d135a4d533e 100644 --- a/zebrad/tests/common/get_block_template_rpcs/submit_block.rs +++ b/zebrad/tests/common/get_block_template_rpcs/submit_block.rs @@ -14,7 +14,7 @@ use zebra_chain::parameters::Network; use zebra_node_services::rpc_client::RpcRequestClient; use crate::common::{ - cached_state::get_raw_future_blocks, + cached_state::raw_future_blocks, launch::{can_spawn_zebrad_for_test_type, spawn_zebrad_for_rpc}, test_type::TestType, }; @@ -42,7 +42,7 @@ pub(crate) async fn run() -> Result<()> { ); let raw_blocks: Vec = - get_raw_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS).await?; + raw_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS).await?; tracing::info!("got raw future blocks, spawning isolated zebrad...",); diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index 6ac031e491b..3a2fdeeb0f0 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -32,7 +32,7 @@ use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY; use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY; use crate::common::{ - cached_state::get_future_blocks, + cached_state::future_blocks, launch::{can_spawn_zebrad_for_test_type, spawn_zebrad_for_rpc}, lightwalletd::{ can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc, @@ -92,15 +92,14 @@ pub async fn run() -> Result<()> { ); let mut count = 0; - let blocks: Vec = - get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS) - .await? - .into_iter() - .take_while(|block| { - count += block.transactions.len() - 1; - count <= max_sent_transactions() - }) - .collect(); + let blocks: Vec = future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS) + .await? + .into_iter() + .take_while(|block| { + count += block.transactions.len() - 1; + count <= max_sent_transactions() + }) + .collect(); tracing::info!( blocks_count = ?blocks.len(),