Skip to content

Commit

Permalink
feat(zebra-scan): Connect with zebrad (#7989)
Browse files Browse the repository at this point in the history
* connect zebrad with zebra-scan

* remove unwrap

* use tokio::sleep

* fix the task handler

* Don't panic on an empty state

---------

Co-authored-by: teor <teor@riseup.net>
  • Loading branch information
oxarbitrage and teor2345 authored Nov 27, 2023
1 parent 681ae68 commit 0f24c31
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 5 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5806,6 +5806,8 @@ dependencies = [
"rand 0.8.5",
"serde",
"tokio",
"tower",
"tracing",
"zcash_client_backend",
"zcash_note_encryption",
"zcash_primitives",
Expand Down
8 changes: 5 additions & 3 deletions zebra-scan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,25 @@ categories = ["cryptography::cryptocurrencies"]

[dependencies]
zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.31" }
zebra-state = { path = "../zebra-state", version = "1.0.0-beta.31" }

color-eyre = "0.6.2"
indexmap = { version = "2.0.1", features = ["serde"] }
serde = { version = "1.0.192", features = ["serde_derive"] }
tokio = { version = "1.34.0", features = ["test-util"] }
tower = "0.4.13"
tracing = "0.1.39"

[dev-dependencies]

zcash_client_backend = "0.10.0-rc.1"
zcash_primitives = "0.13.0-rc.1"
zcash_note_encryption = "0.4.0"

color-eyre = { version = "0.6.2" }
rand = "0.8.5"
bls12_381 = "0.8.0"
jubjub = "0.10.0"
ff = "0.13.0"
group = "0.13.0"
tokio = { version = "1.34.0", features = ["test-util"] }

zebra-state = { path = "../zebra-state" }
zebra-test = { path = "../zebra-test" }
3 changes: 2 additions & 1 deletion zebra-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
#![doc(html_root_url = "https://docs.rs/zebra_scan")]

pub mod config;
mod storage;
pub mod scan;
pub mod storage;

#[cfg(test)]
mod tests;
58 changes: 58 additions & 0 deletions zebra-scan/src/scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
//! The scan task.
use std::time::Duration;

use color_eyre::{eyre::eyre, Report};
use tower::{buffer::Buffer, util::BoxService, Service, ServiceExt};
use tracing::info;

use crate::storage::Storage;

type State = Buffer<
BoxService<zebra_state::Request, zebra_state::Response, zebra_state::BoxError>,
zebra_state::Request,
>;

/// Wait a few seconds at startup so tip height is always `Some`.
const INITIAL_WAIT: Duration = Duration::from_secs(10);

/// The amount of time between checking and starting new scans.
const CHECK_INTERVAL: Duration = Duration::from_secs(10);

/// Start the scan task given state and storage.
///
/// - This function is dummy at the moment. It just makes sure we can read the storage and the state.
/// - Modificatiuons here might have an impact in the `scan_task_starts` test.
/// - Real scanning code functionality will be added in the future here.
pub async fn start(mut state: State, storage: Storage) -> Result<(), Report> {
// We want to make sure the state has a tip height available before we start scanning.
tokio::time::sleep(INITIAL_WAIT).await;

loop {
// Make sure we can query the state
let request = state
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::Tip)
.await
.map_err(|e| eyre!(e));

let tip = match request? {
zebra_state::Response::Tip(tip) => tip,
_ => unreachable!("unmatched response to a state::Tip request"),
};

// Read keys from the storage
let available_keys = storage.get_sapling_keys();

for key in available_keys {
info!(
"Scanning the blockchain for key {} from block 1 to {:?}",
key.0, tip,
);
}

tokio::time::sleep(CHECK_INTERVAL).await;
}
}
6 changes: 6 additions & 0 deletions zebra-scan/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@ impl Storage {
self.sapling_keys.clone()
}
}

impl Default for Storage {
fn default() -> Self {
Self::new()
}
}
25 changes: 24 additions & 1 deletion zebrad/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl StartCmd {
block_download_peer_set: peer_set.clone(),
block_verifier: block_verifier_router.clone(),
mempool: mempool.clone(),
state,
state: state.clone(),
latest_chain_tip: latest_chain_tip.clone(),
};
setup_tx
Expand Down Expand Up @@ -262,6 +262,7 @@ impl StartCmd {
.in_current_span(),
);

// Spawn never ending end of support task.
info!("spawning end of support checking task");
let end_of_support_task_handle = tokio::spawn(
sync::end_of_support::start(config.network.network, latest_chain_tip).in_current_span(),
Expand All @@ -285,6 +286,22 @@ impl StartCmd {
info!("spawning syncer task");
let syncer_task_handle = tokio::spawn(syncer.sync().in_current_span());

#[cfg(feature = "zebra-scan")]
// Spawn never ending scan task.
let scan_task_handle = {
info!("spawning zebra_scanner");
let mut storage = zebra_scan::storage::Storage::new();
for (key, birthday) in config.shielded_scan.sapling_keys_to_scan.iter() {
storage.add_sapling_key(key.clone(), Some(zebra_chain::block::Height(*birthday)));
}

tokio::spawn(zebra_scan::scan::start(state, storage).in_current_span())
};
#[cfg(not(feature = "zebra-scan"))]
// Spawn a dummy scan task which doesn't do anything and never finishes.
let scan_task_handle: tokio::task::JoinHandle<Result<(), Report>> =
tokio::spawn(std::future::pending().in_current_span());

info!("spawned initial Zebra tasks");

// TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered?
Expand All @@ -299,6 +316,7 @@ impl StartCmd {
pin!(tx_gossip_task_handle);
pin!(progress_task_handle);
pin!(end_of_support_task_handle);
pin!(scan_task_handle);

// startup tasks
let BackgroundTaskHandles {
Expand Down Expand Up @@ -385,6 +403,10 @@ impl StartCmd {
exit_when_task_finishes = false;
Ok(())
}

scan_result = &mut scan_task_handle => scan_result
.expect("unexpected panic in the scan task")
.map(|_| info!("scan task exited")),
};

// Stop Zebra if a task finished and returned an error,
Expand All @@ -410,6 +432,7 @@ impl StartCmd {
tx_gossip_task_handle.abort();
progress_task_handle.abort();
end_of_support_task_handle.abort();
scan_task_handle.abort();

// startup tasks
state_checkpoint_verify_handle.abort();
Expand Down
43 changes: 43 additions & 0 deletions zebrad/tests/acceptance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2799,3 +2799,46 @@ async fn fully_synced_rpc_z_getsubtreesbyindex_snapshot_test() -> Result<()> {

Ok(())
}

#[cfg(feature = "zebra-scan")]
/// Test that the scanner gets started when the node starts.
#[tokio::test]
async fn scan_task_starts() -> Result<()> {
use indexmap::IndexMap;

const ZECPAGES_VIEWING_KEY: &str = "zxviews1q0duytgcqqqqpqre26wkl45gvwwwd706xw608hucmvfalr759ejwf7qshjf5r9aa7323zulvz6plhttp5mltqcgs9t039cx2d09mgq05ts63n8u35hyv6h9nc9ctqqtue2u7cer2mqegunuulq2luhq3ywjcz35yyljewa4mgkgjzyfwh6fr6jd0dzd44ghk0nxdv2hnv4j5nxfwv24rwdmgllhe0p8568sgqt9ckt02v2kxf5ahtql6s0ltjpkckw8gtymxtxuu9gcr0swvz";

let _init_guard = zebra_test::init();

let mut config = default_test_config(Mainnet)?;
let mut keys = IndexMap::new();
keys.insert(ZECPAGES_VIEWING_KEY.to_string(), 1);
config.shielded_scan.sapling_keys_to_scan = keys;

let testdir = testdir()?.with_config(&mut config)?;
let testdir = &testdir;

let mut child = testdir.spawn_child(args!["start"])?;

// Run the program and kill it after the scanner starts and the first scanning is done.
std::thread::sleep(LAUNCH_DELAY * 2);
child.kill(false)?;

// Check that scan task started and the first scanning is done.
let output = child.wait_with_output()?;

output.stdout_line_contains("spawning zebra_scanner")?;
output.stdout_line_contains(
format!(
"Scanning the blockchain for key {} from block 1 to",
ZECPAGES_VIEWING_KEY
)
.as_str(),
)?;

// Make sure the command was killed
output.assert_was_killed()?;
output.assert_failure()?;

Ok(())
}

0 comments on commit 0f24c31

Please sign in to comment.