diff --git a/Cargo.lock b/Cargo.lock index 287749c4d18..0c4b70c838a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5806,6 +5806,8 @@ dependencies = [ "rand 0.8.5", "serde", "tokio", + "tower", + "tracing", "zcash_client_backend", "zcash_note_encryption", "zcash_primitives", diff --git a/zebra-scan/Cargo.toml b/zebra-scan/Cargo.toml index 2c3f2ba527f..3f1471a34bb 100644 --- a/zebra-scan/Cargo.toml +++ b/zebra-scan/Cargo.toml @@ -20,9 +20,14 @@ categories = ["cryptography::cryptocurrencies"] [dependencies] zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.31" } +zebra-state = { path = "../zebra-state", version = "1.0.0-beta.31" } +color-eyre = "0.6.2" indexmap = { version = "2.0.1", features = ["serde"] } serde = { version = "1.0.192", features = ["serde_derive"] } +tokio = { version = "1.34.0", features = ["test-util"] } +tower = "0.4.13" +tracing = "0.1.39" [dev-dependencies] @@ -30,13 +35,10 @@ zcash_client_backend = "0.10.0-rc.1" zcash_primitives = "0.13.0-rc.1" zcash_note_encryption = "0.4.0" -color-eyre = { version = "0.6.2" } rand = "0.8.5" bls12_381 = "0.8.0" jubjub = "0.10.0" ff = "0.13.0" group = "0.13.0" -tokio = { version = "1.34.0", features = ["test-util"] } -zebra-state = { path = "../zebra-state" } zebra-test = { path = "../zebra-test" } diff --git a/zebra-scan/src/lib.rs b/zebra-scan/src/lib.rs index a71810f9f97..3dfaad3662d 100644 --- a/zebra-scan/src/lib.rs +++ b/zebra-scan/src/lib.rs @@ -5,7 +5,8 @@ #![doc(html_root_url = "https://docs.rs/zebra_scan")] pub mod config; -mod storage; +pub mod scan; +pub mod storage; #[cfg(test)] mod tests; diff --git a/zebra-scan/src/scan.rs b/zebra-scan/src/scan.rs new file mode 100644 index 00000000000..9563fbd734a --- /dev/null +++ b/zebra-scan/src/scan.rs @@ -0,0 +1,58 @@ +//! The scan task. + +use std::time::Duration; + +use color_eyre::{eyre::eyre, Report}; +use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt}; +use tracing::info; + +use crate::storage::Storage; + +type State = Buffer< + BoxService, + zebra_state::Request, +>; + +/// Wait a few seconds at startup so tip height is always `Some`. +const INITIAL_WAIT: Duration = Duration::from_secs(10); + +/// The amount of time between checking and starting new scans. +const CHECK_INTERVAL: Duration = Duration::from_secs(10); + +/// Start the scan task given state and storage. +/// +/// - This function is dummy at the moment. It just makes sure we can read the storage and the state. +/// - Modificatiuons here might have an impact in the `scan_task_starts` test. +/// - Real scanning code functionality will be added in the future here. +pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> { + // We want to make sure the state has a tip height available before we start scanning. + tokio::time::sleep(INITIAL_WAIT).await; + + loop { + // Make sure we can query the state + let request = state + .ready() + .await + .map_err(|e| eyre!(e))? + .call(zebra_state::Request::Tip) + .await + .map_err(|e| eyre!(e)); + + let tip = match request? { + zebra_state::Response::Tip(tip) => tip, + _ => unreachable!("unmatched response to a state::Tip request"), + }; + + // Read keys from the storage + let available_keys = storage.get_sapling_keys(); + + for key in available_keys { + info!( + "Scanning the blockchain for key {} from block 1 to {:?}", + key.0, tip, + ); + } + + tokio::time::sleep(CHECK_INTERVAL).await; + } +} diff --git a/zebra-scan/src/storage.rs b/zebra-scan/src/storage.rs index eac776f2bfe..212dddf6ee5 100644 --- a/zebra-scan/src/storage.rs +++ b/zebra-scan/src/storage.rs @@ -52,3 +52,9 @@ impl Storage { self.sapling_keys.clone() } } + +impl Default for Storage { + fn default() -> Self { + Self::new() + } +} diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index 2248f3ff014..371d8f2fb0c 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -195,7 +195,7 @@ impl StartCmd { block_download_peer_set: peer_set.clone(), block_verifier: block_verifier_router.clone(), mempool: mempool.clone(), - state, + state: state.clone(), latest_chain_tip: latest_chain_tip.clone(), }; setup_tx @@ -262,6 +262,7 @@ impl StartCmd { .in_current_span(), ); + // Spawn never ending end of support task. info!("spawning end of support checking task"); let end_of_support_task_handle = tokio::spawn( sync::end_of_support::start(config.network.network, latest_chain_tip).in_current_span(), @@ -285,6 +286,22 @@ impl StartCmd { info!("spawning syncer task"); let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span()); + #[cfg(feature = "zebra-scan")] + // Spawn never ending scan task. + let scan_task_handle = { + info!("spawning zebra_scanner"); + let mut storage = zebra_scan::storage::Storage::new(); + for (key, birthday) in config.shielded_scan.sapling_keys_to_scan.iter() { + storage.add_sapling_key(key.clone(), Some(zebra_chain::block::Height(*birthday))); + } + + tokio::spawn(zebra_scan::scan::start(state, storage).in_current_span()) + }; + #[cfg(not(feature = "zebra-scan"))] + // Spawn a dummy scan task which doesn't do anything and never finishes. + let scan_task_handle: tokio::task::JoinHandle> = + tokio::spawn(std::future::pending().in_current_span()); + info!("spawned initial Zebra tasks"); // TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered? @@ -299,6 +316,7 @@ impl StartCmd { pin!(tx_gossip_task_handle); pin!(progress_task_handle); pin!(end_of_support_task_handle); + pin!(scan_task_handle); // startup tasks let BackgroundTaskHandles { @@ -385,6 +403,10 @@ impl StartCmd { exit_when_task_finishes = false; Ok(()) } + + scan_result = &mut scan_task_handle => scan_result + .expect("unexpected panic in the scan task") + .map(|_| info!("scan task exited")), }; // Stop Zebra if a task finished and returned an error, @@ -410,6 +432,7 @@ impl StartCmd { tx_gossip_task_handle.abort(); progress_task_handle.abort(); end_of_support_task_handle.abort(); + scan_task_handle.abort(); // startup tasks state_checkpoint_verify_handle.abort(); diff --git a/zebrad/tests/acceptance.rs b/zebrad/tests/acceptance.rs index c6d3097a1df..ecc60961408 100644 --- a/zebrad/tests/acceptance.rs +++ b/zebrad/tests/acceptance.rs @@ -2799,3 +2799,46 @@ async fn fully_synced_rpc_z_getsubtreesbyindex_snapshot_test() -> Result<()> { Ok(()) } + +#[cfg(feature = "zebra-scan")] +/// Test that the scanner gets started when the node starts. +#[tokio::test] +async fn scan_task_starts() -> Result<()> { + use indexmap::IndexMap; + + const ZECPAGES_VIEWING_KEY: &str = "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz"; + + let _init_guard = zebra_test::init(); + + let mut config = default_test_config(Mainnet)?; + let mut keys = IndexMap::new(); + keys.insert(ZECPAGES_VIEWING_KEY.to_string(), 1); + config.shielded_scan.sapling_keys_to_scan = keys; + + let testdir = testdir()?.with_config(&mut config)?; + let testdir = &testdir; + + let mut child = testdir.spawn_child(args!["start"])?; + + // Run the program and kill it after the scanner starts and the first scanning is done. + std::thread::sleep(LAUNCH_DELAY * 2); + child.kill(false)?; + + // Check that scan task started and the first scanning is done. + let output = child.wait_with_output()?; + + output.stdout_line_contains("spawning zebra_scanner")?; + output.stdout_line_contains( + format!( + "Scanning the blockchain for key {} from block 1 to", + ZECPAGES_VIEWING_KEY + ) + .as_str(), + )?; + + // Make sure the command was killed + output.assert_was_killed()?; + output.assert_failure()?; + + Ok(()) +}