diff --git a/Cargo.lock b/Cargo.lock index 5fce020..cf33ea0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4543,6 +4543,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7cee0529a6d40f580e7a5e6c495c8fbfe21b7b52795ed4bb5e62cdf92bc6380" +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "1.6.4" @@ -4939,6 +4948,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", + "signal-hook-registry", "socket2 0.5.4", "tokio-macros", "windows-sys", diff --git a/Cargo.toml b/Cargo.toml index e56fe71..19a6af1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ serde = { version = "1.0.188", features = ["derive"] } serde_json = { version = "1.0.107", features = ["std"] } serde_json_any_key = "2.0.0" thiserror = "1.0" -tokio = { version = "1.32.0", features = ["macros"] } +tokio = { version = "1.32.0", features = ["macros", "signal"] } tracing = "0.1.37" tracing-subscriber = "0.3.17" zksync_merkle_tree = { git = "https://github.com/matter-labs/zksync-era.git" } diff --git a/src/l1_fetcher.rs b/src/l1_fetcher.rs index ade1e3d..69edc2f 100644 --- a/src/l1_fetcher.rs +++ b/src/l1_fetcher.rs @@ -9,7 +9,7 @@ use eyre::Result; use rand::random; use thiserror::Error; use tokio::{ - sync::{mpsc, Mutex}, + sync::{mpsc, oneshot, Mutex}, time::{sleep, Duration}, }; @@ -126,77 +126,93 @@ impl L1Fetcher { } }); - { - // NOTE: The channel should close once it goes out of scope we move it here. - let hash_tx = hash_tx; - let mut latest_l2_block_number = U256::zero(); - - // If an `end_block` was supplied we shouldn't poll for newer blocks. - if end_block.is_some() { - disable_polling = true; - } + let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); + let main_handle = tokio::spawn({ + let provider_clone = self.provider.clone(); + let snapshot_clone = self.snapshot.clone(); + async move { + let mut latest_l2_block_number = U256::zero(); - let end_block_number = end_block.unwrap_or( - self.provider - .get_block(BlockNumber::Latest) - .await? - .unwrap() - .number - .unwrap(), - ); - - loop { - if disable_polling && current_l1_block_number > end_block_number { - tracing::info!("Successfully reached end block. Shutting down..."); - break; + // If an `end_block` was supplied we shouldn't poll for newer blocks. + if end_block.is_some() { + disable_polling = true; } - // Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`]. - // TODO: Filter by executed blocks too. - let filter = Filter::new() - .address(ZK_SYNC_ADDR.parse::
()?) - .topic0(event.signature()) - .from_block(current_l1_block_number) - .to_block(current_l1_block_number + BLOCK_STEP); - - // Grab all relevant logs. - if let Ok(logs) = - L1Fetcher::retry_call(|| self.provider.get_logs(&filter), L1FetchError::GetLogs) + let end_block_number = end_block.unwrap_or( + provider_clone + .get_block(BlockNumber::Latest) .await - { - for log in logs { - // log.topics: - // topics[1]: L2 block number. - // topics[2]: L2 block hash. - // topics[3]: L2 commitment. - - let new_l2_block_number = - U256::from_big_endian(log.topics[1].as_fixed_bytes()); - if new_l2_block_number <= latest_l2_block_number { - continue; - } + .unwrap() + .unwrap() + .number + .unwrap(), + ); + + loop { + // Break when reaching the `end_block` or on the receivement of a `ctrl_c` signal. + if (disable_polling && current_l1_block_number > end_block_number) + || shutdown_rx.try_recv().is_ok() + { + break; + } - if let Some(tx_hash) = log.transaction_hash { - hash_tx.send(tx_hash).await?; + // Create a filter showing only `BlockCommit`s from the [`ZK_SYNC_ADDR`]. + // TODO: Filter by executed blocks too. + let filter = Filter::new() + .address(ZK_SYNC_ADDR.parse::().unwrap()) + .topic0(event.signature()) + .from_block(current_l1_block_number) + .to_block(current_l1_block_number + BLOCK_STEP); + + // Grab all relevant logs. + if let Ok(logs) = L1Fetcher::retry_call( + || provider_clone.get_logs(&filter), + L1FetchError::GetLogs, + ) + .await + { + for log in logs { + // log.topics: + // topics[1]: L2 block number. + // topics[2]: L2 block hash. + // topics[3]: L2 commitment. + + let new_l2_block_number = + U256::from_big_endian(log.topics[1].as_fixed_bytes()); + if new_l2_block_number <= latest_l2_block_number { + continue; + } + + if let Some(tx_hash) = log.transaction_hash { + hash_tx.send(tx_hash).await.unwrap(); + } + + latest_l2_block_number = new_l2_block_number; } + } else { + tokio::time::sleep(Duration::from_secs(LONG_POLLING_INTERVAL_S)).await; + continue; + }; - latest_l2_block_number = new_l2_block_number; + // Store our current L1 block number so we can resume if the process exits. + if let Some(snapshot) = &snapshot_clone { + snapshot.lock().await.latest_l1_block_number = current_l1_block_number; } - } else { - tokio::time::sleep(Duration::from_secs(LONG_POLLING_INTERVAL_S)).await; - continue; - }; - // Store our current L1 block number so we can resume if the process exits. - if let Some(snapshot) = &self.snapshot { - snapshot.lock().await.latest_l1_block_number = current_l1_block_number; + // Increment current block index. + current_l1_block_number += BLOCK_STEP.into(); } - - // Increment current block index. - current_l1_block_number += BLOCK_STEP.into(); } - } + }); + + // Wait for shutdown signal in background. + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + tracing::info!("Shutdown signal received, finishing up and shutting down..."); + let _ = shutdown_tx.send(""); + }); + main_handle.await?; tx_handle.await?; parse_handle.await?; diff --git a/src/processor/tree/mod.rs b/src/processor/tree/mod.rs index 4de3156..9fa34e0 100644 --- a/src/processor/tree/mod.rs +++ b/src/processor/tree/mod.rs @@ -27,7 +27,6 @@ pub struct TreeProcessor<'a> { impl TreeProcessor<'static> { pub async fn new(db_path: PathBuf, snapshot: Arc