Skip to content

Commit

Permalink
change(scan): Use the on-disk database for keys and results (#8036)
Browse files Browse the repository at this point in the history
* Expose IntoDisk and FromDisk in zebra-state

* Implement database serialization for SaplingScanningKey Strings

* Implement serialization for Vec<SaplingScannedResult> (Vec<transaction::Hash>)

* Implement seralization for SaplingScannedDatabaseIndex

* Add an is_empty() method

* Add a read method for a specific index, and document it

* Implement writing scanner results to the database

* Make read name more explicit

* Implement writing scanner keys

* Implement reading sapling keys

* Spawn blocking tasks correctly in async code

* Change storage results methods to use the database

* Update tests that use storage

* Use spawn_blocking() for database methods

* Change the check interval to slightly less than the block interval

* Expose raw database methods with shielded-scan

* fix `scan_task_starts` test

* minor doc change in test

---------

Co-authored-by: Alfredo Garcia <oxarbitrage@gmail.com>
  • Loading branch information
teor2345 and oxarbitrage authored Nov 30, 2023
1 parent 8c717c9 commit db05845
Show file tree
Hide file tree
Showing 16 changed files with 546 additions and 85 deletions.
22 changes: 17 additions & 5 deletions zebra-scan/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,30 @@ use color_eyre::Report;
use tokio::task::JoinHandle;
use tracing::Instrument;

use zebra_chain::parameters::Network;
use zebra_chain::{diagnostic::task::WaitForPanics, parameters::Network};

use crate::{scan, storage::Storage, Config};

/// Initialize the scanner based on its config.
pub fn init(
/// Initialize the scanner based on its config, and spawn a task for it.
///
/// TODO: add a test for this function.
pub fn spawn_init(
config: &Config,
network: Network,
state: scan::State,
) -> JoinHandle<Result<(), Report>> {
let storage = Storage::new(config, network);
let config = config.clone();
tokio::spawn(init(config, network, state).in_current_span())
}

/// Initialize the scanner based on its config.
///
/// TODO: add a test for this function.
pub async fn init(config: Config, network: Network, state: scan::State) -> Result<(), Report> {
let storage = tokio::task::spawn_blocking(move || Storage::new(&config, network))
.wait_for_panics()
.await;

// TODO: add more tasks here?
tokio::spawn(scan::start(state, storage).in_current_span())
scan::start(state, storage).await
}
2 changes: 1 addition & 1 deletion zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ pub mod storage;
mod tests;

pub use config::Config;
pub use init::init;
pub use init::{init, spawn_init};
21 changes: 15 additions & 6 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use zcash_client_backend::{
use zcash_primitives::zip32::AccountId;

use zebra_chain::{
block::Block, parameters::Network, serialization::ZcashSerialize, transaction::Transaction,
block::Block, diagnostic::task::WaitForPanics, parameters::Network,
serialization::ZcashSerialize, transaction::Transaction,
};

use crate::storage::Storage;
Expand All @@ -31,7 +32,7 @@ pub type State = Buffer<
const INITIAL_WAIT: Duration = Duration::from_secs(10);

/// The amount of time between checking and starting new scans.
const CHECK_INTERVAL: Duration = Duration::from_secs(10);
const CHECK_INTERVAL: Duration = Duration::from_secs(30);

/// Start the scan task given state and storage.
///
Expand All @@ -57,13 +58,16 @@ pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {
_ => unreachable!("unmatched response to a state::Tip request"),
};

// Read keys from the storage
let available_keys = storage.get_sapling_keys();
// Read keys from the storage on disk, which can block.
let key_storage = storage.clone();
let available_keys = tokio::task::spawn_blocking(move || key_storage.sapling_keys())
.wait_for_panics()
.await;

for key in available_keys {
info!(
"Scanning the blockchain for key {} from block 1 to {:?}",
key.0, tip,
"Scanning the blockchain for key {} from block {:?} to {:?}",
key.0, key.1, tip,
);
}

Expand All @@ -73,6 +77,11 @@ pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {

/// Returns transactions belonging to the given `ScanningKey`.
///
/// # Performance / Hangs
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
///
/// TODO:
/// - Remove the `sapling_tree_size` parameter or turn it into an `Option` once we have access to
/// Zebra's state, and we can retrieve the tree size ourselves.
Expand Down
130 changes: 91 additions & 39 deletions zebra-scan/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
//! Store viewing keys and results of the scan.
#![allow(dead_code)]
use std::collections::{BTreeMap, HashMap};

use std::collections::HashMap;

use zebra_chain::{block::Height, parameters::Network, transaction::Hash};
use zebra_chain::{
block::Height,
parameters::{Network, NetworkUpgrade},
};
use zebra_state::{SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex};

use crate::config::Config;

pub mod db;

/// The type used in Zebra to store Sapling scanning keys.
/// It can represent a full viewing key or an individual viewing key.
pub type SaplingScanningKey = String;
// Public types and APIs
pub use db::{SaplingScannedResult, SaplingScanningKey};

use self::db::ScannerWriteBatch;

/// Store key info and results of the scan.
///
Expand All @@ -37,27 +40,19 @@ pub struct Storage {
/// `rocksdb` allows reads and writes via a shared reference,
/// so this database object can be freely cloned.
/// The last instance that is dropped will close the underlying database.
//
// This database is created but not actually used for results.
// TODO: replace the fields below with a database instance.
db: db::ScannerDb,

/// The sapling key and an optional birthday for it.
sapling_keys: HashMap<SaplingScanningKey, Option<Height>>,

/// The sapling key and the related transaction id.
sapling_results: HashMap<SaplingScanningKey, Vec<Hash>>,
}

impl Storage {
/// Opens and returns the on-disk scanner results storage for `config` and `network`.
/// If there is no existing storage, creates a new storage on disk.
///
/// TODO:
/// New keys in `config` are inserted into the database with their birthday heights. Shielded
/// activation is the minimum birthday height.
///
/// Birthdays and scanner progress are marked by inserting an empty result for that height.
///
/// # Performance / Hangs
///
/// This method can block while creating or reading database files, so it must be inside
/// spawn_blocking() in async code.
pub fn new(config: &Config, network: Network) -> Self {
let mut storage = Self::new_db(config, network);

Expand All @@ -69,32 +64,89 @@ impl Storage {
}

/// Add a sapling key to the storage.
///
/// # Performance / Hangs
///
/// This method can block while writing database files, so it must be inside spawn_blocking()
/// in async code.
pub fn add_sapling_key(&mut self, key: SaplingScanningKey, birthday: Option<Height>) {
self.sapling_keys.insert(key, birthday);
// It's ok to write some keys and not others during shutdown, so each key can get its own
// batch. (They will be re-written on startup anyway.)
let mut batch = ScannerWriteBatch::default();

batch.insert_sapling_key(self, key, birthday);

self.write_batch(batch);
}

/// Returns all the keys and their birthdays.
///
/// Birthdays are adjusted to sapling activation if they are too low or missing.
///
/// # Performance / Hangs
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
pub fn sapling_keys(&self) -> HashMap<SaplingScanningKey, Height> {
self.sapling_keys_and_birthday_heights()
}

/// Add a sapling result to the storage.
pub fn add_sapling_result(&mut self, key: SaplingScanningKey, txid: Hash) {
if let Some(results) = self.sapling_results.get_mut(&key) {
results.push(txid);
} else {
self.sapling_results.insert(key, vec![txid]);
}
///
/// # Performance / Hangs
///
/// This method can block while writing database files, so it must be inside spawn_blocking()
/// in async code.
pub fn add_sapling_result(
&mut self,
sapling_key: SaplingScanningKey,
height: Height,
result: Vec<SaplingScannedResult>,
) {
// It's ok to write some results and not others during shutdown, so each result can get its
// own batch. (They will be re-scanned on startup anyway.)
let mut batch = ScannerWriteBatch::default();

let index = SaplingScannedDatabaseIndex {
sapling_key,
height,
};

let entry = SaplingScannedDatabaseEntry {
index,
value: result,
};

batch.insert_sapling_result(self, entry);

self.write_batch(batch);
}

/// Get the results of a sapling key.
//
// TODO: Rust style - remove "get_" from these names
pub fn get_sapling_results(&self, key: &str) -> Vec<Hash> {
self.sapling_results.get(key).cloned().unwrap_or_default()
/// Returns all the results for a sapling key, for every scanned block height.
///
/// # Performance / Hangs
///
/// This method can block while reading database files, so it must be inside spawn_blocking()
/// in async code.
pub fn sapling_results(
&self,
sapling_key: &SaplingScanningKey,
) -> BTreeMap<Height, Vec<SaplingScannedResult>> {
self.sapling_results_for_key(sapling_key)
}

/// Get all keys and their birthdays.
//
// TODO: any value below sapling activation as the birthday height, or `None`, should default
// to sapling activation. This requires the configured network.
// Return Height not Option<Height>.
pub fn get_sapling_keys(&self) -> HashMap<String, Option<Height>> {
self.sapling_keys.clone()
// Parameters

/// Returns the minimum sapling birthday height for the configured network.
pub fn min_sapling_birthday_height(&self) -> Height {
// Assume that the genesis block never contains shielded inputs or outputs.
//
// # Consensus
//
// For Zcash mainnet and the public testnet, Sapling activates above genesis,
// so this is always true.
NetworkUpgrade::Sapling
.activation_height(self.network())
.unwrap_or(Height(0))
}
}
66 changes: 52 additions & 14 deletions zebra-scan/src/storage/db.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
//! Persistent storage for scanner results.
use std::{collections::HashMap, path::Path};
use std::path::Path;

use semver::Version;

use zebra_chain::parameters::Network;
use zebra_state::{DiskWriteBatch, ReadDisk};

use crate::Config;

use super::Storage;

// Public types and APIs
pub use zebra_state::ZebraDb as ScannerDb;
pub use zebra_state::{
SaplingScannedDatabaseEntry, SaplingScannedDatabaseIndex, SaplingScannedResult,
SaplingScanningKey, ZebraDb as ScannerDb,
};

pub mod sapling;

/// The directory name used to distinguish the scanner database from Zebra's other databases or
/// flat files.
Expand All @@ -24,12 +30,14 @@ pub const SCANNER_DATABASE_KIND: &str = "private-scan";
/// Existing column families that aren't listed here are preserved when the database is opened.
pub const SCANNER_COLUMN_FAMILIES_IN_CODE: &[&str] = &[
// Sapling
"sapling_tx_ids",
sapling::SAPLING_TX_IDS,
// Orchard
// TODO
// TODO: add Orchard support
];

impl Storage {
// Creation

/// Opens and returns an on-disk scanner results database instance for `config` and `network`.
/// If there is no existing database, creates a new database on disk.
///
Expand Down Expand Up @@ -64,23 +72,15 @@ impl Storage {
.map(ToString::to_string),
);

let new_storage = Self {
db,
sapling_keys: HashMap::new(),
sapling_results: HashMap::new(),
};
let new_storage = Self { db };

// TODO: report the last scanned height here?
tracing::info!("loaded Zebra scanner cache");

new_storage
}

/// The database format version in the running scanner code.
pub fn database_format_version_in_code() -> Version {
// TODO: implement scanner database versioning
Version::new(0, 0, 0)
}
// Config

/// Returns the configured network for this database.
pub fn network(&self) -> Network {
Expand All @@ -92,6 +92,14 @@ impl Storage {
self.db.path()
}

// Versioning & Upgrades

/// The database format version in the running scanner code.
pub fn database_format_version_in_code() -> Version {
// TODO: implement scanner database versioning
Version::new(0, 0, 0)
}

/// Check for panics in code running in spawned threads.
/// If a thread exited with a panic, resume that panic.
///
Expand All @@ -101,4 +109,34 @@ impl Storage {
pub fn check_for_panics(&mut self) {
self.db.check_for_panics()
}

// General database status

/// Returns true if the database is empty.
pub fn is_empty(&self) -> bool {
// Any column family that is populated at (or near) startup can be used here.
self.db.zs_is_empty(&self.sapling_tx_ids_cf())
}
}

// General writing

/// Wrapper type for scanner database writes.
#[must_use = "batches must be written to the database"]
#[derive(Default)]
pub struct ScannerWriteBatch(pub DiskWriteBatch);

// Redirect method calls to DiskWriteBatch for convenience.
impl std::ops::Deref for ScannerWriteBatch {
type Target = DiskWriteBatch;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl std::ops::DerefMut for ScannerWriteBatch {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
Loading

0 comments on commit db05845

Please sign in to comment.