From 9a1414a55888a9bf517d8392c09f560bf6759b67 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 4 Dec 2023 14:59:55 +1000 Subject: [PATCH] Split out a scan_height...() function --- zebra-scan/src/scan.rs | 214 ++++++++++++++++++++++++----------------- 1 file changed, 127 insertions(+), 87 deletions(-) diff --git a/zebra-scan/src/scan.rs b/zebra-scan/src/scan.rs index f9975d595c7..25946199cec 100644 --- a/zebra-scan/src/scan.rs +++ b/zebra-scan/src/scan.rs @@ -22,7 +22,7 @@ use zcash_primitives::{ }; use zebra_chain::{ - block::Block, + block::{Block, Height}, chain_tip::ChainTip, diagnostic::task::WaitForPanics, parameters::Network, @@ -55,7 +55,7 @@ const INFO_LOG_INTERVAL: u32 = 100_000; /// Start a scan task that reads blocks from `state`, scans them with the configured keys in /// `storage`, and then writes the results to `storage`. pub async fn start( - mut state: State, + state: State, chain_tip_change: ChainTipChange, storage: Storage, ) -> Result<(), Report> { @@ -67,6 +67,7 @@ pub async fn start( let key_birthdays = tokio::task::spawn_blocking(move || key_storage.sapling_keys()) .wait_for_panics() .await; + let key_birthdays = Arc::new(key_birthdays); // Parse and convert keys once, then use them to scan all blocks. // There is some cryptography here, but it should be fast even with thousands of keys. @@ -80,103 +81,142 @@ pub async fn start( Ok::<_, Report>((key.clone(), parsed_keys)) }) .try_collect()?; + let parsed_keys = Arc::new(parsed_keys); // Give empty states time to verify some blocks before we start scanning. tokio::time::sleep(INITIAL_WAIT).await; loop { - // Get a block from the state. - // We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init(). - let block = state - .ready() - .await - .map_err(|e| eyre!(e))? - .call(zebra_state::Request::Block(height.into())) - .await - .map_err(|e| eyre!(e))?; - - let block = match block { - zebra_state::Response::Block(Some(block)) => block, - zebra_state::Response::Block(None) => { - // If we've reached the tip, sleep for a while then try and get the same block. - tokio::time::sleep(CHECK_INTERVAL).await; - continue; - } - _ => unreachable!("unmatched response to a state::Tip request"), - }; - - // Only log at info level every 100,000 blocks - let is_info_log = - height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0; - - // TODO: add debug logs? + let scanned_height = scan_height_and_store_results( + height, + state.clone(), + chain_tip_change.clone(), + storage.clone(), + key_birthdays.clone(), + parsed_keys.clone(), + ) + .await?; + + // If we've reached the tip, sleep for a while then try and get the same block. + if scanned_height.is_none() { + tokio::time::sleep(CHECK_INTERVAL).await; + continue; + } + + height = height + .next() + .expect("a valid blockchain never reaches the max height"); + } +} + +/// Get the block at `height` from `state`, scan it with the keys in `parsed_keys`, and store the +/// results in `storage`. If `height` is lower than the `key_birthdays` for that key, skip it. +/// +/// Returns: +/// - `Ok(Some(height))` if the height was scanned, +/// - `Ok(None)` if the height was not in the database, and +/// - `Err(error)` on fatal errors. +pub async fn scan_height_and_store_results( + height: Height, + mut state: State, + chain_tip_change: ChainTipChange, + storage: Storage, + key_birthdays: Arc>, + parsed_keys: Arc< + HashMap, Vec)>, + >, +) -> Result, Report> { + let network = storage.network(); + + // Only log at info level every 100,000 blocks. + // + // TODO: also log progress every 5 minutes once we reach the tip? + let is_info_log = + height == storage.min_sapling_birthday_height() || height.0 % INFO_LOG_INTERVAL == 0; + + // TODO: add debug logs? + if is_info_log { + info!( + "Scanning the blockchain: now at block {:?}, current tip {:?}", + height, + chain_tip_change + .latest_chain_tip() + .best_tip_height_and_hash(), + ); + } + + // Get a block from the state. + // We can't use ServiceExt::oneshot() here, because it causes lifetime errors in init(). + let block = state + .ready() + .await + .map_err(|e| eyre!(e))? + .call(zebra_state::Request::Block(height.into())) + .await + .map_err(|e| eyre!(e))?; + + let block = match block { + zebra_state::Response::Block(Some(block)) => block, + zebra_state::Response::Block(None) => return Ok(None), + _ => unreachable!("unmatched response to a state::Tip request"), + }; + + // Scan it with all the keys. + // + // TODO: scan each key in parallel (after MVP?) + for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() { + // # Security + // + // We can't log `sapling_key` here because it is a private viewing key. Anyone who reads + // the logs could use the key to view those transactions. if is_info_log { info!( - "Scanning the blockchain: now at block {:?}, current tip {:?}", - height, - chain_tip_change - .latest_chain_tip() - .best_tip_height_and_hash(), + "Scanning the blockchain for key {}, started at block {:?}", + key_num, birthday_height, ); } - for (key_num, (sapling_key, birthday_height)) in key_birthdays.iter().enumerate() { - // # Security - // - // We can't log `sapling_key` here because it is a private viewing key. Anyone who reads - // the logs could use the key to view those transactions. - if is_info_log { - info!( - "Scanning the blockchain for key {}, started at block {:?}", - key_num, birthday_height, - ); - } - - // Get the pre-parsed keys for this configured key. - let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default(); - - // Scan the block, which blocks async execution until the scan is complete. - // - // TODO: skip scanning before birthday height (#8022) - // TODO: scan each key in parallel (after MVP?) - let sapling_key = sapling_key.clone(); - let block = block.clone(); - let mut storage = storage.clone(); - - // We use a dummy size of the Sapling note commitment tree. - // - // We can't set the size to zero, because the underlying scanning function would return - // `zcash_client_backeng::scanning::ScanError::TreeSizeUnknown`. - // - // And we can't set them close to 0, because the scanner subtracts the number of notes - // in the block, and panics with "attempt to subtract with overflow". The number of - // notes in a block must be less than this value, this is a consensus rule. - // - // TODO: use the real sapling tree size: `zs::Response::SaplingTree().position() + 1` - let sapling_tree_size = 1 << 16; - - tokio::task::spawn_blocking(move || { - let dfvk_res = - scan_block(network, &block, sapling_tree_size, &dfvks).map_err(|e| eyre!(e))?; - let ivk_res = - scan_block(network, &block, sapling_tree_size, &ivks).map_err(|e| eyre!(e))?; - - let dfvk_res = scanned_block_to_db_result(dfvk_res); - let ivk_res = scanned_block_to_db_result(ivk_res); - - storage.add_sapling_result(sapling_key.clone(), height, dfvk_res); - storage.add_sapling_result(sapling_key, height, ivk_res); - - Ok::<_, Report>(()) - }) - .wait_for_panics() - .await?; - } + // Get the pre-parsed keys for this configured key. + let (dfvks, ivks) = parsed_keys.get(sapling_key).cloned().unwrap_or_default(); - height = height - .next() - .expect("a valid blockchain never reaches the max height"); + // Scan the block, which blocks async execution until the scan is complete. + // + // TODO: skip scanning before birthday height (#8022) + let sapling_key = sapling_key.clone(); + let block = block.clone(); + let mut storage = storage.clone(); + + // We use a dummy size of the Sapling note commitment tree. + // + // We can't set the size to zero, because the underlying scanning function would return + // `zcash_client_backeng::scanning::ScanError::TreeSizeUnknown`. + // + // And we can't set them close to 0, because the scanner subtracts the number of notes + // in the block, and panics with "attempt to subtract with overflow". The number of + // notes in a block must be less than this value, this is a consensus rule. + // + // TODO: use the real sapling tree size: `zs::Response::SaplingTree().position() + 1` + let sapling_tree_size = 1 << 16; + + tokio::task::spawn_blocking(move || { + let dfvk_res = + scan_block(network, &block, sapling_tree_size, &dfvks).map_err(|e| eyre!(e))?; + let ivk_res = + scan_block(network, &block, sapling_tree_size, &ivks).map_err(|e| eyre!(e))?; + + let dfvk_res = scanned_block_to_db_result(dfvk_res); + let ivk_res = scanned_block_to_db_result(ivk_res); + + storage.add_sapling_result(sapling_key.clone(), height, dfvk_res); + storage.add_sapling_result(sapling_key, height, ivk_res); + + Ok::<_, Report>(()) + }) + .wait_for_panics() + .await?; } + + Ok(Some(height)) } /// Returns the transactions from `block` belonging to the given `scanning_keys`.