From 354d45cd5d3d36c7f0493e6c668959f67f4873a2 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 23 Nov 2023 18:27:44 -0300 Subject: [PATCH] connect zebrad with zebra-scan --- Cargo.lock | 2 ++ zebra-scan/Cargo.toml | 6 ++-- zebra-scan/src/lib.rs | 3 +- zebra-scan/src/scan.rs | 58 ++++++++++++++++++++++++++++++++++++ zebra-scan/src/storage.rs | 6 ++++ zebrad/src/commands/start.rs | 22 +++++++++++++- zebrad/tests/acceptance.rs | 43 ++++++++++++++++++++++++++ 7 files changed, 136 insertions(+), 4 deletions(-) create mode 100644 zebra-scan/src/scan.rs diff --git a/Cargo.lock b/Cargo.lock index 413174c856f..f704aaa15c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5785,6 +5785,8 @@ dependencies = [ "rand 0.8.5", "serde", "tokio", + "tower", + "tracing", "zcash_client_backend", "zcash_note_encryption", "zcash_primitives", diff --git a/zebra-scan/Cargo.toml b/zebra-scan/Cargo.toml index 2c3f2ba527f..9969d4635cf 100644 --- a/zebra-scan/Cargo.toml +++ b/zebra-scan/Cargo.toml @@ -20,9 +20,13 @@ categories = ["cryptography::cryptocurrencies"] [dependencies] zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.31" } +zebra-state = { path = "../zebra-state", version = "1.0.0-beta.31" } +color-eyre = "0.6.2" indexmap = { version = "2.0.1", features = ["serde"] } serde = { version = "1.0.192", features = ["serde_derive"] } +tower = "0.4.13" +tracing = "0.1.39" [dev-dependencies] @@ -30,7 +34,6 @@ zcash_client_backend = "0.10.0-rc.1" zcash_primitives = "0.13.0-rc.1" zcash_note_encryption = "0.4.0" -color-eyre = { version = "0.6.2" } rand = "0.8.5" bls12_381 = "0.8.0" jubjub = "0.10.0" @@ -38,5 +41,4 @@ ff = "0.13.0" group = "0.13.0" tokio = { version = "1.34.0", features = ["test-util"] } -zebra-state = { path = "../zebra-state" } zebra-test = { path = "../zebra-test" } diff --git a/zebra-scan/src/lib.rs b/zebra-scan/src/lib.rs index a71810f9f97..3dfaad3662d 100644 --- a/zebra-scan/src/lib.rs +++ b/zebra-scan/src/lib.rs @@ -5,7 +5,8 @@ #![doc(html_root_url = "https://docs.rs/zebra_scan")] pub mod config; -mod storage; +pub mod scan; +pub mod storage; #[cfg(test)] mod tests; diff --git a/zebra-scan/src/scan.rs b/zebra-scan/src/scan.rs new file mode 100644 index 00000000000..14984c453ac --- /dev/null +++ b/zebra-scan/src/scan.rs @@ -0,0 +1,58 @@ +//! The scan task. + +use std::time::Duration; + +use color_eyre::{eyre::eyre, Report}; +use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; +use tracing::info; + +use crate::storage::Storage; + +type State = Buffer< + BoxService, + zebra_state::Request, +>; + +/// Wait a few seconds at startup so tip height is always `Some`. +const INITIAL_WAIT: Duration = Duration::from_secs(10); + +/// The amount of time between checking and starting new scans. +const CHECK_INTERVAL: Duration = Duration::from_secs(10); + +/// Start the scan task given state and storage. +/// +/// - This function is dummy at the moment. It just makes sure we can read the storage and the state. +/// - Modificatiuons here might have an impact in the `scan_task_starts` test. +/// - Real scanning code functionality will be added in the future here. +pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> { + // We want to make sure the state has a tip height available before we start scanning. + std::thread::sleep(INITIAL_WAIT); + + loop { + // Make sure we can query the state + let request = state + .ready() + .await + .map_err(|e| eyre!(e))? + .call(zebra_state::Request::Tip) + .await + .map_err(|e| eyre!(e)); + + let tip_height = match request? { + zebra_state::Response::Tip(tip) => tip.unwrap().0, + _ => unreachable!("unmatched response to a state::Tip request"), + }; + + // Read keys from the storage + let available_keys = storage.get_sapling_keys(); + + for key in available_keys { + info!( + "Scanning the blockchain for key {} from block 1 to {}", + key.0, tip_height.0 + ); + } + + std::thread::sleep(CHECK_INTERVAL); + } +} diff --git a/zebra-scan/src/storage.rs b/zebra-scan/src/storage.rs index eac776f2bfe..212dddf6ee5 100644 --- a/zebra-scan/src/storage.rs +++ b/zebra-scan/src/storage.rs @@ -52,3 +52,9 @@ impl Storage { self.sapling_keys.clone() } } + +impl Default for Storage { + fn default() -> Self { + Self::new() + } +} diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 2248f3ff014..7276c17a396 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -195,7 +195,7 @@ impl StartCmd { block_download_peer_set: peer_set.clone(), block_verifier: block_verifier_router.clone(), mempool: mempool.clone(), - state, + state: state.clone(), latest_chain_tip: latest_chain_tip.clone(), }; setup_tx @@ -262,6 +262,7 @@ impl StartCmd { .in_current_span(), ); + // Spawn never ending end of support task. info!("spawning end of support checking task"); let end_of_support_task_handle = tokio::spawn( sync::end_of_support::start(config.network.network, latest_chain_tip).in_current_span(), @@ -285,6 +286,18 @@ impl StartCmd { info!("spawning syncer task"); let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span()); + #[cfg(feature = "zebra-scan")] + // Spawn never ending scan task. + let scan_task_handle = { + info!("spawning zebra_scanner"); + let mut storage = zebra_scan::storage::Storage::new(); + for (key, birthday) in config.shielded_scan.sapling_keys_to_scan.iter() { + storage.add_sapling_key(key.clone(), Some(zebra_chain::block::Height(*birthday))); + } + + tokio::spawn(zebra_scan::scan::start(state, storage).in_current_span()) + }; + info!("spawned initial Zebra tasks"); // TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered? @@ -299,6 +312,8 @@ impl StartCmd { pin!(tx_gossip_task_handle); pin!(progress_task_handle); pin!(end_of_support_task_handle); + #[cfg(feature = "zebra-scan")] + pin!(scan_task_handle); // startup tasks let BackgroundTaskHandles { @@ -385,6 +400,8 @@ impl StartCmd { exit_when_task_finishes = false; Ok(()) } + + // TODO: add scan task which is tricky because it needs to be behind a feature. }; // Stop Zebra if a task finished and returned an error, @@ -411,6 +428,9 @@ impl StartCmd { progress_task_handle.abort(); end_of_support_task_handle.abort(); + #[cfg(feature = "zebra-scan")] + scan_task_handle.abort(); + // startup tasks state_checkpoint_verify_handle.abort(); old_databases_task_handle.abort(); diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index c6d3097a1df..ecc60961408 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -2799,3 +2799,46 @@ async fn fully_synced_rpc_z_getsubtreesbyindex_snapshot_test() -> Result<()> { Ok(()) } + +#[cfg(feature = "zebra-scan")] +/// Test that the scanner gets started when the node starts. +#[tokio::test] +async fn scan_task_starts() -> Result<()> { + use indexmap::IndexMap; + + const ZECPAGES_VIEWING_KEY: &str = "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz"; + + let _init_guard = zebra_test::init(); + + let mut config = default_test_config(Mainnet)?; + let mut keys = IndexMap::new(); + keys.insert(ZECPAGES_VIEWING_KEY.to_string(), 1); + config.shielded_scan.sapling_keys_to_scan = keys; + + let testdir = testdir()?.with_config(&mut config)?; + let testdir = &testdir; + + let mut child = testdir.spawn_child(args!["start"])?; + + // Run the program and kill it after the scanner starts and the first scanning is done. + std::thread::sleep(LAUNCH_DELAY * 2); + child.kill(false)?; + + // Check that scan task started and the first scanning is done. + let output = child.wait_with_output()?; + + output.stdout_line_contains("spawning zebra_scanner")?; + output.stdout_line_contains( + format!( + "Scanning the blockchain for key {} from block 1 to", + ZECPAGES_VIEWING_KEY + ) + .as_str(), + )?; + + // Make sure the command was killed + output.assert_was_killed()?; + output.assert_failure()?; + + Ok(()) +}