From db05845f983d6697af10b98674e5f81752b5a73f Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 1 Dec 2023 07:27:46 +1000 Subject: [PATCH] change(scan): Use the on-disk database for keys and results (#8036) * Expose IntoDisk and FromDisk in zebra-state * Implement database serialization for SaplingScanningKey Strings * Implement serialization for Vec (Vec) * Implement seralization for SaplingScannedDatabaseIndex * Add an is_empty() method * Add a read method for a specific index, and document it * Implement writing scanner results to the database * Make read name more explicit * Implement writing scanner keys * Implement reading sapling keys * Spawn blocking tasks correctly in async code * Change storage results methods to use the database * Update tests that use storage * Use spawn_blocking() for database methods * Change the check interval to slightly less than the block interval * Expose raw database methods with shielded-scan * fix `scan_task_starts` test * minor doc change in test --------- Co-authored-by: Alfredo Garcia --- zebra-scan/src/init.rs | 22 ++- zebra-scan/src/lib.rs | 2 +- zebra-scan/src/scan.rs | 21 ++- zebra-scan/src/storage.rs | 130 +++++++++---- zebra-scan/src/storage/db.rs | 66 +++++-- zebra-scan/src/storage/db/sapling.rs | 174 ++++++++++++++++++ zebra-scan/src/tests.rs | 22 ++- zebra-state/src/lib.rs | 7 +- zebra-state/src/service/finalized_state.rs | 15 +- .../service/finalized_state/disk_format.rs | 9 + .../finalized_state/disk_format/scan.rs | 144 +++++++++++++++ .../src/service/finalized_state/zebra_db.rs | 3 +- .../finalized_state/zebra_db/arbitrary.rs | 6 +- .../service/finalized_state/zebra_db/block.rs | 2 +- zebrad/src/commands/start.rs | 2 +- zebrad/tests/acceptance.rs | 6 +- 16 files changed, 546 insertions(+), 85 deletions(-) create mode 100644 zebra-scan/src/storage/db/sapling.rs create mode 100644 zebra-state/src/service/finalized_state/disk_format/scan.rs diff --git a/zebra-scan/src/init.rs b/zebra-scan/src/init.rs index 271200d0c2c..bb6193ec879 100644 --- a/zebra-scan/src/init.rs +++ b/zebra-scan/src/init.rs @@ -4,18 +4,30 @@ use color_eyre::Report; use tokio::task::JoinHandle; use tracing::Instrument; -use zebra_chain::parameters::Network; +use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network}; use crate::{scan, storage::Storage, Config}; -/// Initialize the scanner based on its config. -pub fn init( +/// Initialize the scanner based on its config, and spawn a task for it. +/// +/// TODO: add a test for this function. +pub fn spawn_init( config: &Config, network: Network, state: scan::State, ) -> JoinHandle> { - let storage = Storage::new(config, network); + let config = config.clone(); + tokio::spawn(init(config, network, state).in_current_span()) +} + +/// Initialize the scanner based on its config. +/// +/// TODO: add a test for this function. +pub async fn init(config: Config, network: Network, state: scan::State) -> Result<(), Report> { + let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network)) + .wait_for_panics() + .await; // TODO: add more tasks here? - tokio::spawn(scan::start(state, storage).in_current_span()) + scan::start(state, storage).await } diff --git a/zebra-scan/src/lib.rs b/zebra-scan/src/lib.rs index dab31d31197..b31fa99d8dc 100644 --- a/zebra-scan/src/lib.rs +++ b/zebra-scan/src/lib.rs @@ -13,4 +13,4 @@ pub mod storage; mod tests; pub use config::Config; -pub use init::init; +pub use init::{init, spawn_init}; diff --git a/zebra-scan/src/scan.rs b/zebra-scan/src/scan.rs index 33de7ff6b58..111a8fa2e88 100644 --- a/zebra-scan/src/scan.rs +++ b/zebra-scan/src/scan.rs @@ -16,7 +16,8 @@ use zcash_client_backend::{ use zcash_primitives::zip32::AccountId; use zebra_chain::{ - block::Block, parameters::Network, serialization::ZcashSerialize, transaction::Transaction, + block::Block, diagnostic::task::WaitForPanics, parameters::Network, + serialization::ZcashSerialize, transaction::Transaction, }; use crate::storage::Storage; @@ -31,7 +32,7 @@ pub type State = Buffer< const INITIAL_WAIT: Duration = Duration::from_secs(10); /// The amount of time between checking and starting new scans. -const CHECK_INTERVAL: Duration = Duration::from_secs(10); +const CHECK_INTERVAL: Duration = Duration::from_secs(30); /// Start the scan task given state and storage. /// @@ -57,13 +58,16 @@ pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> { _ => unreachable!("unmatched response to a state::Tip request"), }; - // Read keys from the storage - let available_keys = storage.get_sapling_keys(); + // Read keys from the storage on disk, which can block. + let key_storage = storage.clone(); + let available_keys = tokio::task::spawn_blocking(move || key_storage.sapling_keys()) + .wait_for_panics() + .await; for key in available_keys { info!( - "Scanning the blockchain for key {} from block 1 to {:?}", - key.0, tip, + "Scanning the blockchain for key {} from block {:?} to {:?}", + key.0, key.1, tip, ); } @@ -73,6 +77,11 @@ pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> { /// Returns transactions belonging to the given `ScanningKey`. /// +/// # Performance / Hangs +/// +/// This method can block while reading database files, so it must be inside spawn_blocking() +/// in async code. +/// /// TODO: /// - Remove the `sapling_tree_size` parameter or turn it into an `Option` once we have access to /// Zebra's state, and we can retrieve the tree size ourselves. diff --git a/zebra-scan/src/storage.rs b/zebra-scan/src/storage.rs index 8bcd0f2363f..1b4eba8e9b7 100644 --- a/zebra-scan/src/storage.rs +++ b/zebra-scan/src/storage.rs @@ -1,18 +1,21 @@ //! Store viewing keys and results of the scan. -#![allow(dead_code)] +use std::collections::{BTreeMap, HashMap}; -use std::collections::HashMap; - -use zebra_chain::{block::Height, parameters::Network, transaction::Hash}; +use zebra_chain::{ + block::Height, + parameters::{Network, NetworkUpgrade}, +}; +use zebra_state::{SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex}; use crate::config::Config; pub mod db; -/// The type used in Zebra to store Sapling scanning keys. -/// It can represent a full viewing key or an individual viewing key. -pub type SaplingScanningKey = String; +// Public types and APIs +pub use db::{SaplingScannedResult, SaplingScanningKey}; + +use self::db::ScannerWriteBatch; /// Store key info and results of the scan. /// @@ -37,27 +40,19 @@ pub struct Storage { /// `rocksdb` allows reads and writes via a shared reference, /// so this database object can be freely cloned. /// The last instance that is dropped will close the underlying database. - // - // This database is created but not actually used for results. - // TODO: replace the fields below with a database instance. db: db::ScannerDb, - - /// The sapling key and an optional birthday for it. - sapling_keys: HashMap>, - - /// The sapling key and the related transaction id. - sapling_results: HashMap>, } impl Storage { /// Opens and returns the on-disk scanner results storage for `config` and `network`. /// If there is no existing storage, creates a new storage on disk. /// - /// TODO: - /// New keys in `config` are inserted into the database with their birthday heights. Shielded - /// activation is the minimum birthday height. - /// /// Birthdays and scanner progress are marked by inserting an empty result for that height. + /// + /// # Performance / Hangs + /// + /// This method can block while creating or reading database files, so it must be inside + /// spawn_blocking() in async code. pub fn new(config: &Config, network: Network) -> Self { let mut storage = Self::new_db(config, network); @@ -69,32 +64,89 @@ impl Storage { } /// Add a sapling key to the storage. + /// + /// # Performance / Hangs + /// + /// This method can block while writing database files, so it must be inside spawn_blocking() + /// in async code. pub fn add_sapling_key(&mut self, key: SaplingScanningKey, birthday: Option) { - self.sapling_keys.insert(key, birthday); + // It's ok to write some keys and not others during shutdown, so each key can get its own + // batch. (They will be re-written on startup anyway.) + let mut batch = ScannerWriteBatch::default(); + + batch.insert_sapling_key(self, key, birthday); + + self.write_batch(batch); + } + + /// Returns all the keys and their birthdays. + /// + /// Birthdays are adjusted to sapling activation if they are too low or missing. + /// + /// # Performance / Hangs + /// + /// This method can block while reading database files, so it must be inside spawn_blocking() + /// in async code. + pub fn sapling_keys(&self) -> HashMap { + self.sapling_keys_and_birthday_heights() } /// Add a sapling result to the storage. - pub fn add_sapling_result(&mut self, key: SaplingScanningKey, txid: Hash) { - if let Some(results) = self.sapling_results.get_mut(&key) { - results.push(txid); - } else { - self.sapling_results.insert(key, vec![txid]); - } + /// + /// # Performance / Hangs + /// + /// This method can block while writing database files, so it must be inside spawn_blocking() + /// in async code. + pub fn add_sapling_result( + &mut self, + sapling_key: SaplingScanningKey, + height: Height, + result: Vec, + ) { + // It's ok to write some results and not others during shutdown, so each result can get its + // own batch. (They will be re-scanned on startup anyway.) + let mut batch = ScannerWriteBatch::default(); + + let index = SaplingScannedDatabaseIndex { + sapling_key, + height, + }; + + let entry = SaplingScannedDatabaseEntry { + index, + value: result, + }; + + batch.insert_sapling_result(self, entry); + + self.write_batch(batch); } - /// Get the results of a sapling key. - // - // TODO: Rust style - remove "get_" from these names - pub fn get_sapling_results(&self, key: &str) -> Vec { - self.sapling_results.get(key).cloned().unwrap_or_default() + /// Returns all the results for a sapling key, for every scanned block height. + /// + /// # Performance / Hangs + /// + /// This method can block while reading database files, so it must be inside spawn_blocking() + /// in async code. + pub fn sapling_results( + &self, + sapling_key: &SaplingScanningKey, + ) -> BTreeMap> { + self.sapling_results_for_key(sapling_key) } - /// Get all keys and their birthdays. - // - // TODO: any value below sapling activation as the birthday height, or `None`, should default - // to sapling activation. This requires the configured network. - // Return Height not Option. - pub fn get_sapling_keys(&self) -> HashMap> { - self.sapling_keys.clone() + // Parameters + + /// Returns the minimum sapling birthday height for the configured network. + pub fn min_sapling_birthday_height(&self) -> Height { + // Assume that the genesis block never contains shielded inputs or outputs. + // + // # Consensus + // + // For Zcash mainnet and the public testnet, Sapling activates above genesis, + // so this is always true. + NetworkUpgrade::Sapling + .activation_height(self.network()) + .unwrap_or(Height(0)) } } diff --git a/zebra-scan/src/storage/db.rs b/zebra-scan/src/storage/db.rs index 41f40f0c3b1..38feb0e0253 100644 --- a/zebra-scan/src/storage/db.rs +++ b/zebra-scan/src/storage/db.rs @@ -1,17 +1,23 @@ //! Persistent storage for scanner results. -use std::{collections::HashMap, path::Path}; +use std::path::Path; use semver::Version; use zebra_chain::parameters::Network; +use zebra_state::{DiskWriteBatch, ReadDisk}; use crate::Config; use super::Storage; // Public types and APIs -pub use zebra_state::ZebraDb as ScannerDb; +pub use zebra_state::{ + SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult, + SaplingScanningKey, ZebraDb as ScannerDb, +}; + +pub mod sapling; /// The directory name used to distinguish the scanner database from Zebra's other databases or /// flat files. @@ -24,12 +30,14 @@ pub const SCANNER_DATABASE_KIND: &str = "private-scan"; /// Existing column families that aren't listed here are preserved when the database is opened. pub const SCANNER_COLUMN_FAMILIES_IN_CODE: &[&str] = &[ // Sapling - "sapling_tx_ids", + sapling::SAPLING_TX_IDS, // Orchard - // TODO + // TODO: add Orchard support ]; impl Storage { + // Creation + /// Opens and returns an on-disk scanner results database instance for `config` and `network`. /// If there is no existing database, creates a new database on disk. /// @@ -64,11 +72,7 @@ impl Storage { .map(ToString::to_string), ); - let new_storage = Self { - db, - sapling_keys: HashMap::new(), - sapling_results: HashMap::new(), - }; + let new_storage = Self { db }; // TODO: report the last scanned height here? tracing::info!("loaded Zebra scanner cache"); @@ -76,11 +80,7 @@ impl Storage { new_storage } - /// The database format version in the running scanner code. - pub fn database_format_version_in_code() -> Version { - // TODO: implement scanner database versioning - Version::new(0, 0, 0) - } + // Config /// Returns the configured network for this database. pub fn network(&self) -> Network { @@ -92,6 +92,14 @@ impl Storage { self.db.path() } + // Versioning & Upgrades + + /// The database format version in the running scanner code. + pub fn database_format_version_in_code() -> Version { + // TODO: implement scanner database versioning + Version::new(0, 0, 0) + } + /// Check for panics in code running in spawned threads. /// If a thread exited with a panic, resume that panic. /// @@ -101,4 +109,34 @@ impl Storage { pub fn check_for_panics(&mut self) { self.db.check_for_panics() } + + // General database status + + /// Returns true if the database is empty. + pub fn is_empty(&self) -> bool { + // Any column family that is populated at (or near) startup can be used here. + self.db.zs_is_empty(&self.sapling_tx_ids_cf()) + } +} + +// General writing + +/// Wrapper type for scanner database writes. +#[must_use = "batches must be written to the database"] +#[derive(Default)] +pub struct ScannerWriteBatch(pub DiskWriteBatch); + +// Redirect method calls to DiskWriteBatch for convenience. +impl std::ops::Deref for ScannerWriteBatch { + type Target = DiskWriteBatch; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::ops::DerefMut for ScannerWriteBatch { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } } diff --git a/zebra-scan/src/storage/db/sapling.rs b/zebra-scan/src/storage/db/sapling.rs new file mode 100644 index 00000000000..d67437a023c --- /dev/null +++ b/zebra-scan/src/storage/db/sapling.rs @@ -0,0 +1,174 @@ +//! Sapling-specific database reading and writing. +//! +//! The sapling scanner database has the following format: +//! +//! | name | key | value | +//! |------------------|-------------------------------|--------------------------| +//! | `sapling_tx_ids` | `SaplingScannedDatabaseIndex` | `Vec` | +//! +//! And types: +//! SaplingScannedDatabaseIndex = `SaplingScanningKey` | `Height` +//! +//! This format allows us to efficiently find all the results for each key, and the latest height +//! for each key. +//! +//! If there are no results for a height, we store an empty list of results. This allows is to scan +//! each key from the next height after we restart. We also use this mechanism to store key +//! birthday heights, by storing the height before the birthday as the "last scanned" block. + +use std::collections::{BTreeMap, HashMap}; + +use zebra_chain::block::Height; +use zebra_state::{ + AsColumnFamilyRef, ReadDisk, SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, + SaplingScannedResult, SaplingScanningKey, WriteDisk, +}; + +use crate::storage::Storage; + +use super::ScannerWriteBatch; + +/// The name of the sapling transaction IDs result column family. +/// +/// This constant should be used so the compiler can detect typos. +pub const SAPLING_TX_IDS: &str = "sapling_tx_ids"; + +impl Storage { + // Reading Sapling database entries + + /// Returns the results for a specific key and block height. + // + // TODO: add tests for this method + pub fn sapling_result_for_key_and_block( + &self, + index: &SaplingScannedDatabaseIndex, + ) -> Vec { + self.db + .zs_get(&self.sapling_tx_ids_cf(), &index) + .unwrap_or_default() + } + + /// Returns all the results for a specific key, indexed by height. + pub fn sapling_results_for_key( + &self, + sapling_key: &SaplingScanningKey, + ) -> BTreeMap> { + let k_min = SaplingScannedDatabaseIndex::min_for_key(sapling_key); + let k_max = SaplingScannedDatabaseIndex::max_for_key(sapling_key); + + self.db + .zs_items_in_range_ordered(&self.sapling_tx_ids_cf(), k_min..=k_max) + .into_iter() + .map(|(index, result)| (index.height, result)) + .collect() + } + + /// Returns all the keys and their birthday heights. + pub fn sapling_keys_and_birthday_heights(&self) -> HashMap { + // This code is a bit complex because we don't have a separate column family for keys + // and their birthday heights. + // + // TODO: make a separate column family after the MVP. + + let sapling_tx_ids = self.sapling_tx_ids_cf(); + let mut keys = HashMap::new(); + + // The minimum key is invalid or a dummy key, so we will never have an entry for it. + let mut find_next_key_index = SaplingScannedDatabaseIndex::min(); + + loop { + // Find the next key, and the first height we have for it. + let Some(entry) = self + .db + .zs_next_key_value_from(&sapling_tx_ids, &find_next_key_index) + else { + break; + }; + + let (index, results): (_, Vec) = entry; + let SaplingScannedDatabaseIndex { + sapling_key, + mut height, + } = index; + + // If there are no results, then it's a "skip up to height" marker, and the birthday + // height is the next height. If there are some results, it's the actual birthday + // height. + if results.is_empty() { + height = height + .next() + .expect("results should only be stored for validated block heights"); + } + + keys.insert(sapling_key.clone(), height); + + // Skip all the results before the next key. + find_next_key_index = SaplingScannedDatabaseIndex::max_for_key(&sapling_key); + } + + keys + } + + // Column family convenience methods + + /// Returns a handle to the `sapling_tx_ids` column family. + pub(crate) fn sapling_tx_ids_cf(&self) -> impl AsColumnFamilyRef + '_ { + self.db + .cf_handle(SAPLING_TX_IDS) + .expect("column family was created when database was created") + } + + // Writing batches + + /// Write `batch` to the database for this storage. + pub(crate) fn write_batch(&self, batch: ScannerWriteBatch) { + // Just panic on errors for now + self.db + .write_batch(batch.0) + .expect("unexpected database error") + } +} + +// Writing database entries +// +// TODO: split the write type into state and scanner, so we can't call state write methods on +// scanner databases +impl ScannerWriteBatch { + /// Inserts a scanned sapling result for a key and height. + /// If a result already exists for that key and height, it is replaced. + pub(crate) fn insert_sapling_result( + &mut self, + storage: &Storage, + entry: SaplingScannedDatabaseEntry, + ) { + self.zs_insert(&storage.sapling_tx_ids_cf(), entry.index, entry.value); + } + + /// Insert a sapling scanning `key`, and mark all heights before `birthday_height` so they + /// won't be scanned. + /// + /// If a result already exists for the height before the birthday, it is replaced with an empty + /// result. + pub(crate) fn insert_sapling_key( + &mut self, + storage: &Storage, + sapling_key: SaplingScanningKey, + birthday_height: Option, + ) { + let min_birthday_height = storage.min_sapling_birthday_height(); + + // The birthday height must be at least the minimum height for that pool. + let birthday_height = birthday_height + .unwrap_or(min_birthday_height) + .max(min_birthday_height); + // And we want to skip up to the height before it. + let skip_up_to_height = birthday_height.previous().unwrap_or(Height(0)); + + let index = SaplingScannedDatabaseIndex { + sapling_key, + height: skip_up_to_height, + }; + + self.zs_insert(&storage.sapling_tx_ids_cf(), index, Vec::new()); + } +} diff --git a/zebra-scan/src/tests.rs b/zebra-scan/src/tests.rs index 77c84434be7..dd4a7fdf8ac 100644 --- a/zebra-scan/src/tests.rs +++ b/zebra-scan/src/tests.rs @@ -32,7 +32,10 @@ use zcash_primitives::{ }; use zebra_chain::{ - block::Block, chain_tip::ChainTip, parameters::Network, serialization::ZcashDeserializeInto, + block::{Block, Height}, + chain_tip::ChainTip, + parameters::Network, + serialization::ZcashDeserializeInto, transaction::Hash, }; @@ -187,8 +190,11 @@ fn scanning_fake_blocks_store_key_and_results() -> Result<()> { s.add_sapling_key(key_to_be_stored.clone(), None); // Check key was added - assert_eq!(s.get_sapling_keys().len(), 1); - assert_eq!(s.get_sapling_keys().get(&key_to_be_stored), Some(&None)); + assert_eq!(s.sapling_keys().len(), 1); + assert_eq!( + s.sapling_keys().get(&key_to_be_stored), + Some(&s.min_sapling_birthday_height()) + ); let vks: Vec<(&AccountId, &SaplingIvk)> = vec![]; let nf = Nullifier([7; 32]); @@ -219,12 +225,16 @@ fn scanning_fake_blocks_store_key_and_results() -> Result<()> { let found_transaction_hash = Hash::from_bytes_in_display_order(found_transaction); // Add result to database - s.add_sapling_result(key_to_be_stored.clone(), found_transaction_hash); + s.add_sapling_result( + key_to_be_stored.clone(), + Height(1), + vec![found_transaction_hash], + ); // Check the result was added assert_eq!( - s.get_sapling_results(key_to_be_stored.as_str())[0], - found_transaction_hash + s.sapling_results(&key_to_be_stored).get(&Height(1)), + Some(&vec![found_transaction_hash]) ); Ok(()) diff --git a/zebra-state/src/lib.rs b/zebra-state/src/lib.rs index 3ceb08035f7..8fe8ecb7676 100644 --- a/zebra-state/src/lib.rs +++ b/zebra-state/src/lib.rs @@ -60,7 +60,12 @@ pub use service::{ }; #[cfg(feature = "shielded-scan")] -pub use service::finalized_state::{ReadDisk, ZebraDb}; +pub use rocksdb::AsColumnFamilyRef; +#[cfg(feature = "shielded-scan")] +pub use service::finalized_state::{ + FromDisk, IntoDisk, ReadDisk, SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, + SaplingScannedResult, SaplingScanningKey, ZebraDb, +}; #[cfg(any(test, feature = "proptest-impl", feature = "shielded-scan"))] pub use service::finalized_state::{DiskWriteBatch, WriteDisk}; diff --git a/zebra-state/src/service/finalized_state.rs b/zebra-state/src/service/finalized_state.rs index 72e575e5e83..d6094a420d9 100644 --- a/zebra-state/src/service/finalized_state.rs +++ b/zebra-state/src/service/finalized_state.rs @@ -39,15 +39,18 @@ mod arbitrary; mod tests; #[allow(unused_imports)] -pub use disk_db::{DiskDb, DiskWriteBatch, WriteDisk}; -pub use disk_format::{OutputIndex, OutputLocation, TransactionLocation}; +pub use disk_db::{DiskDb, DiskWriteBatch, ReadDisk, WriteDisk}; +#[allow(unused_imports)] +pub use disk_format::{ + FromDisk, IntoDisk, OutputIndex, OutputLocation, TransactionLocation, MAX_ON_DISK_HEIGHT, +}; pub use zebra_db::ZebraDb; #[cfg(feature = "shielded-scan")] -pub use disk_db::ReadDisk; - -#[cfg(any(test, feature = "proptest-impl"))] -pub use disk_format::MAX_ON_DISK_HEIGHT; +pub use disk_format::{ + SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult, + SaplingScanningKey, +}; /// The column families supported by the running `zebra-state` database code. /// diff --git a/zebra-state/src/service/finalized_state/disk_format.rs b/zebra-state/src/service/finalized_state/disk_format.rs index f35bcd0027d..baf209fc9d5 100644 --- a/zebra-state/src/service/finalized_state/disk_format.rs +++ b/zebra-state/src/service/finalized_state/disk_format.rs @@ -13,12 +13,21 @@ pub mod shielded; pub mod transparent; pub mod upgrade; +#[cfg(feature = "shielded-scan")] +pub mod scan; + #[cfg(test)] mod tests; pub use block::{TransactionLocation, MAX_ON_DISK_HEIGHT}; pub use transparent::{OutputIndex, OutputLocation}; +#[cfg(feature = "shielded-scan")] +pub use scan::{ + SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult, + SaplingScanningKey, +}; + /// Helper type for writing types to disk as raw bytes. /// Also used to convert key types to raw bytes for disk lookups. pub trait IntoDisk { diff --git a/zebra-state/src/service/finalized_state/disk_format/scan.rs b/zebra-state/src/service/finalized_state/disk_format/scan.rs new file mode 100644 index 00000000000..2a866b69164 --- /dev/null +++ b/zebra-state/src/service/finalized_state/disk_format/scan.rs @@ -0,0 +1,144 @@ +//! Serialization formats for the shielded scanner results database. +//! +//! Due to Rust's orphan rule, these serializations must be implemented in this crate. +//! +//! # Correctness +//! +//! Once format versions are implemented for the scanner database, +//! `zebra_scan::Storage::database_format_version_in_code()` must be incremented +//! each time the database format (column, serialization, etc) changes. + +use zebra_chain::{block::Height, transaction}; + +use crate::{FromDisk, IntoDisk}; + +use super::block::HEIGHT_DISK_BYTES; + +/// The fixed length of the scanning result. +/// +/// TODO: If the scanning result doesn't have a fixed length, either: +/// - deserialize using internal length or end markers, +/// - prefix it with a length, or +/// - stop storing vectors of results on disk, instead store each result with a unique key. +pub const SAPLING_SCANNING_RESULT_LENGTH: usize = 32; + +/// The type used in Zebra to store Sapling scanning keys. +/// It can represent a full viewing key or an individual viewing key. +pub type SaplingScanningKey = String; + +/// The type used in Zebra to store Sapling scanning results. +pub type SaplingScannedResult = transaction::Hash; + +/// A database column family entry for a block scanned with a Sapling vieweing key. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct SaplingScannedDatabaseEntry { + /// The database column family key. Must be unique for each scanning key and scanned block. + pub index: SaplingScannedDatabaseIndex, + + /// The database column family value. + pub value: Vec, +} + +/// A database column family key for a block scanned with a Sapling vieweing key. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +pub struct SaplingScannedDatabaseIndex { + /// The Sapling viewing key used to scan the block. + pub sapling_key: SaplingScanningKey, + + /// The height of the block. + pub height: Height, +} + +impl SaplingScannedDatabaseIndex { + /// The minimum value of a sapling scanned database index. + /// This value is guarateed to be the minimum, and not correspond to a valid key. + pub const fn min() -> Self { + Self { + // The empty string is the minimum value in RocksDB lexicographic order. + sapling_key: String::new(), + // Genesis is the minimum height, and never has valid shielded transfers. + height: Height(0), + } + } + + /// The minimum value of a sapling scanned database index for `sapling_key`. + /// This value is guarateed to be the minimum, and not correspond to a valid entry. + pub fn min_for_key(sapling_key: &SaplingScanningKey) -> Self { + Self { + sapling_key: sapling_key.clone(), + // Genesis is the minimum height, and never has valid shielded transfers. + height: Height(0), + } + } + + /// The maximum value of a sapling scanned database index for `sapling_key`. + /// This value is guarateed to be the maximum, and not correspond to a valid entry. + pub fn max_for_key(sapling_key: &SaplingScanningKey) -> Self { + Self { + sapling_key: sapling_key.clone(), + // The maximum height will never be mined - we'll increase it before that happens. + height: Height::MAX, + } + } +} + +impl IntoDisk for SaplingScanningKey { + type Bytes = Vec; + + fn as_bytes(&self) -> Self::Bytes { + SaplingScanningKey::as_bytes(self).to_vec() + } +} + +impl FromDisk for SaplingScanningKey { + fn from_bytes(bytes: impl AsRef<[u8]>) -> Self { + SaplingScanningKey::from_utf8(bytes.as_ref().to_vec()) + .expect("only valid UTF-8 strings are written to the database") + } +} + +impl IntoDisk for SaplingScannedDatabaseIndex { + type Bytes = Vec; + + fn as_bytes(&self) -> Self::Bytes { + let mut bytes = Vec::new(); + + bytes.extend(self.sapling_key.as_bytes()); + bytes.extend(self.height.as_bytes()); + + bytes + } +} + +impl FromDisk for SaplingScannedDatabaseIndex { + fn from_bytes(bytes: impl AsRef<[u8]>) -> Self { + let bytes = bytes.as_ref(); + + let (sapling_key, height) = bytes.split_at(bytes.len() - HEIGHT_DISK_BYTES); + + Self { + sapling_key: SaplingScanningKey::from_bytes(sapling_key), + height: Height::from_bytes(height), + } + } +} + +impl IntoDisk for Vec { + type Bytes = Vec; + + fn as_bytes(&self) -> Self::Bytes { + self.iter() + .flat_map(SaplingScannedResult::as_bytes) + .collect() + } +} + +impl FromDisk for Vec { + fn from_bytes(bytes: impl AsRef<[u8]>) -> Self { + bytes + .as_ref() + .chunks(SAPLING_SCANNING_RESULT_LENGTH) + .map(SaplingScannedResult::from_bytes) + .collect() + } +} diff --git a/zebra-state/src/service/finalized_state/zebra_db.rs b/zebra-state/src/service/finalized_state/zebra_db.rs index f02de7d678d..6b7c6823dcd 100644 --- a/zebra-state/src/service/finalized_state/zebra_db.rs +++ b/zebra-state/src/service/finalized_state/zebra_db.rs @@ -35,7 +35,8 @@ pub mod metrics; pub mod shielded; pub mod transparent; -#[cfg(any(test, feature = "proptest-impl"))] +#[cfg(any(test, feature = "proptest-impl", feature = "shielded-scan"))] +// TODO: when the database is split out of zebra-state, always expose these methods. pub mod arbitrary; /// Wrapper struct to ensure high-level `zebra-state` database access goes through the correct API. diff --git a/zebra-state/src/service/finalized_state/zebra_db/arbitrary.rs b/zebra-state/src/service/finalized_state/zebra_db/arbitrary.rs index 2b9f03526a8..8a83a2894c5 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/arbitrary.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/arbitrary.rs @@ -1,5 +1,7 @@ //! Arbitrary value generation and test harnesses for high-level typed database access. +#![allow(unused_imports)] + use std::ops::Deref; use zebra_chain::{amount::NonNegative, block::Block, sprout, value_balance::ValueBalance}; @@ -21,13 +23,14 @@ impl Deref for ZebraDb { impl ZebraDb { /// Returns the inner database. /// - /// This is a test-only method, because it allows write access + /// This is a test-only and shielded-scan-only method, because it allows write access /// and raw read access to the RocksDB instance. pub fn db(&self) -> &DiskDb { &self.db } /// Allow to set up a fake value pool in the database for testing purposes. + #[cfg(any(test, feature = "proptest-impl"))] pub fn set_finalized_value_pool(&self, fake_value_pool: ValueBalance) { let mut batch = DiskWriteBatch::new(); let value_pool_cf = self.db().cf_handle("tip_chain_value_pool").unwrap(); @@ -38,6 +41,7 @@ impl ZebraDb { /// Artificially prime the note commitment tree anchor sets with anchors /// referenced in a block, for testing purposes _only_. + #[cfg(any(test, feature = "proptest-impl"))] pub fn populate_with_anchors(&self, block: &Block) { let mut batch = DiskWriteBatch::new(); diff --git a/zebra-state/src/service/finalized_state/zebra_db/block.rs b/zebra-state/src/service/finalized_state/zebra_db/block.rs index 37365c77c00..bae89ed0fe4 100644 --- a/zebra-state/src/service/finalized_state/zebra_db/block.rs +++ b/zebra-state/src/service/finalized_state/zebra_db/block.rs @@ -402,7 +402,7 @@ impl ZebraDb { } /// Writes the given batch to the database. - pub(crate) fn write_batch(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> { + pub fn write_batch(&self, batch: DiskWriteBatch) -> Result<(), rocksdb::Error> { self.db.write(batch) } } diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index fd08a226755..4e0f71b7ff1 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -292,7 +292,7 @@ impl StartCmd { // Spawn never ending scan task. let scan_task_handle = { info!("spawning shielded scanner with configured viewing keys"); - zebra_scan::init(&config.shielded_scan, config.network.network, state) + zebra_scan::spawn_init(&config.shielded_scan, config.network.network, state) }; #[cfg(not(feature = "zebra-scan"))] diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index 2acad35b30f..fc117dd14e9 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -2804,7 +2804,7 @@ async fn fully_synced_rpc_z_getsubtreesbyindex_snapshot_test() -> Result<()> { Ok(()) } -/// Test that the scanner gets started when the node starts. +/// Test that the scanner task gets started when the node starts. #[cfg(feature = "zebra-scan")] #[test] fn scan_task_starts() -> Result<()> { @@ -2831,10 +2831,10 @@ fn scan_task_starts() -> Result<()> { // Check that scan task started and the first scanning is done. let output = child.wait_with_output()?; - output.stdout_line_contains("spawning zebra_scanner")?; + output.stdout_line_contains("spawning shielded scanner with configured viewing keys")?; output.stdout_line_contains( format!( - "Scanning the blockchain for key {} from block 1 to", + "Scanning the blockchain for key {} from block", ZECPAGES_VIEWING_KEY ) .as_str(),