diff --git a/src/cli.rs b/src/cli.rs index f5c5380..3b756e4 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -109,9 +109,6 @@ pub enum Command { /// The path to the storage solution. #[arg(short, long, default_value = snapshot::DEFAULT_DB_PATH)] db_path: Option, - /// Number of chunks to split storage chunks into. - #[arg(short, long, default_value_t = snapshot::DEFAULT_NUM_CHUNKS)] - num_chunks: usize, /// The directory to export the snapshot files to. directory: String, }, diff --git a/src/main.rs b/src/main.rs index 78c544c..0648b4a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,9 +12,9 @@ use std::{ path::{Path, PathBuf}, }; +use ::eyre::Result; use clap::Parser; use cli::{Cli, Command, ReconstructSource}; -use eyre::Result; use processor::snapshot::{ exporter::SnapshotExporter, importer::SnapshotImporter, SnapshotBuilder, }; @@ -75,24 +75,29 @@ async fn main() -> Result<()> { None => env::current_dir()?.join(storage::DEFAULT_DB_NAME), }; - if let Some(directory) = snapshot { - tracing::info!("Trying to restore state from snapshot..."); - let importer = SnapshotImporter::new(PathBuf::from(directory)); - importer.run(&db_path.clone()).await?; - } + let snapshot_end_batch = match snapshot { + Some(directory) => { + tracing::info!("Trying to restore state from snapshot..."); + let importer = SnapshotImporter::new(PathBuf::from(directory)); + let end_batch = importer.run(&db_path.clone()).await?; + Some(end_batch) + } + None => None, + }; match source { ReconstructSource::L1 { l1_fetcher_options } => { let fetcher_options = l1_fetcher_options.into(); let processor = TreeProcessor::new(db_path.clone()).await?; let fetcher = L1Fetcher::new(fetcher_options, Some(processor.get_inner_db()))?; + let (tx, rx) = mpsc::channel::(5); let processor_handle = tokio::spawn(async move { processor.run(rx).await; }); - fetcher.run(tx).await?; + fetcher.run(tx, snapshot_end_batch).await?; processor_handle.await?; } ReconstructSource::File { file } => { @@ -128,7 +133,7 @@ async fn main() -> Result<()> { processor.run(rx).await; }); - fetcher.run(tx).await?; + fetcher.run(tx, None).await?; processor_handle.await?; tracing::info!("Successfully downloaded CommitBlocks to {}", file); @@ -159,7 +164,7 @@ async fn main() -> Result<()> { let processor = SnapshotBuilder::new(db_path); let mut fetcher_options: L1FetcherOptions = l1_fetcher_options.into(); - if let Ok(batch_number) = processor.get_latest_l1_batch_number() { + if let Ok(batch_number) = processor.get_latest_l1_block_number() { let batch_number = batch_number.as_u64(); if batch_number > ethereum::GENESIS_BLOCK { tracing::info!( @@ -176,18 +181,14 @@ async fn main() -> Result<()> { processor.run(rx).await; }); - fetcher.run(tx).await?; + fetcher.run(tx, None).await?; processor_handle.await?; } - Command::ExportSnapshot { - db_path, - num_chunks, - directory, - } => { + Command::ExportSnapshot { db_path, directory } => { let export_path = Path::new(&directory); std::fs::create_dir_all(export_path)?; let exporter = SnapshotExporter::new(export_path, db_path)?; - exporter.export_snapshot(num_chunks)?; + exporter.export_snapshot()?; tracing::info!("Succesfully exported snapshot files to \"{directory}\"!"); } diff --git a/src/processor/snapshot/exporter.rs b/src/processor/snapshot/exporter.rs index a37ab82..5f5abec 100644 --- a/src/processor/snapshot/exporter.rs +++ b/src/processor/snapshot/exporter.rs @@ -16,6 +16,9 @@ use crate::processor::snapshot::{ DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME, }; +/// Number of storage logs included in each chunk. +const SNAPSHOT_CHUNK_SIZE: usize = 1_000_000; + pub struct SnapshotExporter { basedir: PathBuf, database: SnapshotDatabase, @@ -35,7 +38,7 @@ impl SnapshotExporter { }) } - pub fn export_snapshot(&self, num_chunks: usize) -> Result<()> { + pub fn export_snapshot(&self) -> Result<()> { let l1_batch_number = self .database .get_latest_l1_batch_number()? @@ -50,7 +53,7 @@ impl SnapshotExporter { ..Default::default() }; - self.export_storage_logs(num_chunks, &mut header)?; + self.export_storage_logs(&mut header)?; self.export_factory_deps(&mut header)?; let path = self.basedir.join(SNAPSHOT_HEADER_FILE_NAME); @@ -97,7 +100,7 @@ impl SnapshotExporter { Ok(()) } - fn export_storage_logs(&self, num_chunks: usize, header: &mut SnapshotHeader) -> Result<()> { + fn export_storage_logs(&self, header: &mut SnapshotHeader) -> Result<()> { tracing::info!("Exporting storage logs..."); let num_logs = self.database.get_last_repeated_key_index()?; @@ -108,17 +111,17 @@ impl SnapshotExporter { .database .iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start); - let chunk_size = num_logs / num_chunks as u64; + let num_chunks = (num_logs / SNAPSHOT_CHUNK_SIZE as u64) + 1; for chunk_id in 0..num_chunks { tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, num_chunks); let mut chunk = SnapshotStorageLogsChunk::default(); - for _ in 0..chunk_size { + for _ in 0..SNAPSHOT_CHUNK_SIZE { if let Some(Ok((_, key))) = iterator.next() { let key = U256::from_big_endian(&key); if let Ok(Some(entry)) = self.database.get_storage_log(&key) { chunk.storage_logs.push(entry); - } + }; } else { break; } @@ -131,7 +134,7 @@ impl SnapshotExporter { header .storage_logs_chunks .push(SnapshotStorageLogsChunkMetadata { - chunk_id: chunk_id as u64, + chunk_id, filepath: path .clone() .into_os_string() diff --git a/src/processor/snapshot/importer.rs b/src/processor/snapshot/importer.rs index 140a55f..9a39457 100644 --- a/src/processor/snapshot/importer.rs +++ b/src/processor/snapshot/importer.rs @@ -6,7 +6,6 @@ use std::{ use ethers::types::U64; use eyre::Result; use regex::{Captures, Regex}; -use state_reconstruct_fetcher::constants::ethereum::GENESIS_BLOCK; use state_reconstruct_storage::types::{ Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk, SnapshotStorageLogsChunkMetadata, @@ -29,7 +28,8 @@ impl SnapshotImporter { Self { directory } } - pub async fn run(self, db_path: &Path) -> Result<()> { + /// Run the snapshot importer task. Returns the batch number contained in the header. + pub async fn run(self, db_path: &Path) -> Result { let (tx, rx) = mpsc::channel(1); let header = self.read_header().expect("failed to read header filepath"); @@ -46,14 +46,13 @@ impl SnapshotImporter { } }); - let l1_batch_number = header.l1_batch_number + GENESIS_BLOCK; + let l1_batch_number = U64::from(header.l1_batch_number); let mut tree = TreeWrapper::new_snapshot_wrapper(db_path) .await .expect("can't create tree"); - tree.restore_from_snapshot(rx, U64::from(l1_batch_number)) - .await?; + tree.restore_from_snapshot(rx, l1_batch_number).await?; - Ok(()) + Ok(l1_batch_number) } fn read_header(&self) -> Result { diff --git a/src/processor/snapshot/mod.rs b/src/processor/snapshot/mod.rs index 07c2943..eda7a65 100644 --- a/src/processor/snapshot/mod.rs +++ b/src/processor/snapshot/mod.rs @@ -25,7 +25,6 @@ use crate::util::{h256_to_u256, unpack_block_info}; pub const DEFAULT_DB_PATH: &str = "snapshot_db"; pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json"; pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip"; -pub const DEFAULT_NUM_CHUNKS: usize = 10; pub struct SnapshotBuilder { database: SnapshotDatabase, @@ -53,8 +52,8 @@ impl SnapshotBuilder { Self { database } } - // Gets the next L1 batch number to be processed for ues in state recovery. - pub fn get_latest_l1_batch_number(&self) -> Result { + // Gets the next L1 block number to be processed for ues in state recovery. + pub fn get_latest_l1_block_number(&self) -> Result { self.database .get_latest_l1_block_number() .map(|o| o.unwrap_or(U64::from(0))) @@ -107,7 +106,7 @@ impl Processor for SnapshotBuilder { if self .database - .update_storage_log_value(index as u64, &value.to_fixed_bytes()) + .update_storage_log_value(index as u64, value) .is_err() { let max_idx = self diff --git a/src/processor/tree/tree_wrapper.rs b/src/processor/tree/tree_wrapper.rs index 26bbc50..3577d0c 100644 --- a/src/processor/tree/tree_wrapper.rs +++ b/src/processor/tree/tree_wrapper.rs @@ -168,18 +168,21 @@ impl TreeWrapper { total_tree_entries += tree_entries.len(); self.tree.extend(tree_entries); - tracing::info!("Chunk {} was succesfully imported!", i + 1); + tracing::info!("Chunk {} was successfully imported!", i + 1); i += 1; } tracing::info!( - "Succesfully imported snapshot containing {total_tree_entries} storage logs!", + "Successfully imported snapshot containing {total_tree_entries} storage logs!", ); + let root_hash = hex::encode(self.tree.latest_root_hash()); + tracing::debug!("Current root hash is: {}", root_hash); + self.inner_db .lock() .await - .set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?; + .set_latest_l1_batch_number(l1_batch_number.as_u64())?; Ok(()) } diff --git a/state-reconstruct-fetcher/src/l1_fetcher.rs b/state-reconstruct-fetcher/src/l1_fetcher.rs index 522ff31..f37dd52 100644 --- a/state-reconstruct-fetcher/src/l1_fetcher.rs +++ b/state-reconstruct-fetcher/src/l1_fetcher.rs @@ -4,7 +4,7 @@ use ethers::{ abi::{Contract, Function}, prelude::*, }; -use eyre::Result; +use eyre::{eyre, OptionExt, Result}; use rand::random; use state_reconstruct_storage::reconstruction::ReconstructionDatabase; use thiserror::Error; @@ -16,7 +16,7 @@ use tokio_util::sync::CancellationToken; use crate::{ blob_http_client::BlobHttpClient, - constants::ethereum::{BLOB_BLOCK, BOOJUM_BLOCK, GENESIS_BLOCK, ZK_SYNC_ADDR}, + constants::ethereum::{BLOB_BLOCK, BLOCK_STEP, BOOJUM_BLOCK, GENESIS_BLOCK, ZK_SYNC_ADDR}, metrics::L1Metrics, types::{v1::V1, v2::V2, CommitBlock, ParseError}, }; @@ -124,26 +124,56 @@ impl L1Fetcher { }) } - pub async fn run(&self, sink: mpsc::Sender) -> Result<()> { + /// Decide which block to start fetching from based on the following criteria: + /// - Has the tool already made progress before? + /// - Was a snapshot just imported? + /// - Did the user set an explicit start block? + /// + /// Returns the block number to start fetching from. + async fn decide_start_block(&self, snapshot_end_batch: Option) -> Result { // Start fetching from the `GENESIS_BLOCK` unless the `start_block` argument is supplied, // in which case, start from that instead. If no argument was supplied and a state snapshot // exists, start from the block number specified in that snapshot. - let mut current_l1_block_number = U64::from(self.config.start_block); + let mut start_block = U64::from(self.config.start_block); + + // We also have to check if a snapshot was recently imported. If so we + // should continue from the last imported batch. + if let Some(target_batch_number) = snapshot_end_batch { + tracing::info!( + "Trying to map snapshots latest L1 batch number, this might take a while..." + ); + match self.map_l1_batch_to_l1_block(U256::from(target_batch_number.as_u64())).await { + Ok(block_number) => return Ok(block_number), + Err(e) => tracing::error!("Unable to find a corresponding L1 block number for the latest imported L1 batch: {e}"), + } + } + // User might have supplied their own start block, in that case we shouldn't enforce the // use of the snapshot value. - if current_l1_block_number == GENESIS_BLOCK.into() { + if start_block == GENESIS_BLOCK.into() { if let Some(snapshot) = &self.inner_db { let snapshot_latest_l1_block_number = snapshot.lock().await.get_latest_l1_block_number()?; - if snapshot_latest_l1_block_number > current_l1_block_number { - current_l1_block_number = snapshot_latest_l1_block_number; - tracing::info!( - "Found snapshot, starting from L1 block {current_l1_block_number}" - ); + if snapshot_latest_l1_block_number > start_block { + start_block = snapshot_latest_l1_block_number; } }; } + Ok(start_block) + } + + pub async fn run( + &self, + sink: mpsc::Sender, + snapshot_end_batch: Option, + ) -> Result<()> { + let current_l1_block_number = self.decide_start_block(snapshot_end_batch).await?; + tracing::info!( + "Starting fetching from block number: {}", + current_l1_block_number + ); + let end_block = self .config .block_count @@ -263,7 +293,7 @@ impl L1Fetcher { let metrics = self.metrics.clone(); let event = self.contracts.v1.events_by_name("BlockCommit")?[0].clone(); - let provider_clone = self.provider.clone(); + let provider = self.provider.clone(); let block_step = self.config.block_step; Ok(tokio::spawn({ @@ -279,28 +309,20 @@ impl L1Fetcher { } let Some(end_block_number) = end_block else { - if let Ok(new_end) = L1Fetcher::retry_call( - || provider_clone.get_block(BlockNumber::Finalized), - L1FetchError::GetEndBlockNumber, - ) - .await + if let Ok(new_end_block_number_candidate) = + L1Fetcher::get_last_l1_block_number(&provider).await { - if let Some(found_block) = new_end { - if let Some(ebn) = found_block.number { - let end_block_number = - if let Some(end_block_limit) = max_end_block { - if end_block_limit < ebn { - end_block_limit - } else { - ebn - } - } else { - ebn - }; - end_block = Some(end_block_number); - metrics.lock().await.last_l1_block = end_block_number.as_u64(); + let end_block_number = if let Some(end_block_limit) = max_end_block { + if end_block_limit < new_end_block_number_candidate { + end_block_limit + } else { + new_end_block_number_candidate } - } + } else { + new_end_block_number_candidate + }; + end_block = Some(end_block_number); + metrics.lock().await.last_l1_block = end_block_number.as_u64(); } else { tracing::debug!("Cannot get latest block number..."); cancellation_token.cancelled_else_long_timeout().await; @@ -335,11 +357,9 @@ impl L1Fetcher { // Grab all relevant logs. let before = Instant::now(); - if let Ok(logs) = L1Fetcher::retry_call( - || provider_clone.get_logs(&filter), - L1FetchError::GetLogs, - ) - .await + if let Ok(logs) = + L1Fetcher::retry_call(|| provider.get_logs(&filter), L1FetchError::GetLogs) + .await { let duration = before.elapsed(); metrics.lock().await.log_acquisition.add(duration); @@ -437,13 +457,8 @@ impl L1Fetcher { while let Some(hash) = hash_rx.recv().await { let tx = loop { let before = Instant::now(); - match L1Fetcher::retry_call( - || provider.get_transaction(hash), - L1FetchError::GetTx, - ) - .await - { - Ok(Some(tx)) => { + match L1Fetcher::get_transaction_by_hash(&provider, hash).await { + Ok(tx) => { let duration = before.elapsed(); metrics.lock().await.tx_acquisition.add(duration); break tx; @@ -582,6 +597,94 @@ impl L1Fetcher { })) } + /// Use binary-search to find the Ethereum block on which a particular batch + /// was published. + pub async fn map_l1_batch_to_l1_block(&self, target_batch_number: U256) -> Result { + let event = self.contracts.v1.events_by_name("BlockCommit")?[0].clone(); + let provider = self.provider.clone(); + + let mut lower_block_number = U64::from(GENESIS_BLOCK); + let mut upper_block_number = L1Fetcher::get_last_l1_block_number(&provider).await?; + + let mut current_block_number = (upper_block_number + lower_block_number) / 2; + loop { + let mut target_is_higher = false; + let mut target_is_lower = false; + + let filter = Filter::new() + .address(ZK_SYNC_ADDR.parse::
().unwrap()) + .topic0(event.signature()) + .from_block(current_block_number) + .to_block(current_block_number + BLOCK_STEP); + + if let Ok(logs) = + L1Fetcher::retry_call(|| provider.get_logs(&filter), L1FetchError::GetLogs).await + { + for log in logs { + let l1_batch_number = U256::from_big_endian(log.topics[1].as_fixed_bytes()); + let tx_hash = if let Some(hash) = log.transaction_hash { + hash + } else { + continue; + }; + + match l1_batch_number.cmp(&target_batch_number) { + cmp::Ordering::Equal => { + let block_number = + L1Fetcher::get_transaction_by_hash(&provider, tx_hash) + .await + .map(|tx| tx.block_number)?; + return block_number + .ok_or_eyre("found transaction, but it has no block number"); + } + cmp::Ordering::Less => target_is_higher = true, + cmp::Ordering::Greater => target_is_lower = true, + } + } + } + + if target_is_higher { + lower_block_number = current_block_number; + current_block_number = (upper_block_number + lower_block_number) / 2; + } else if target_is_lower { + upper_block_number = current_block_number; + current_block_number = (upper_block_number + lower_block_number) / 2; + } + + // Batch number was not found. + if upper_block_number.saturating_sub(lower_block_number) <= U64::from(1) { + return Err(eyre!( + "provided batch number ({target_batch_number}) does not exist yet!" + )); + }; + } + } + + /// Get a specified transaction on L1 by its hash. + async fn get_transaction_by_hash(provider: &Provider, hash: H256) -> Result { + match L1Fetcher::retry_call(|| provider.get_transaction(hash), L1FetchError::GetTx).await { + Ok(Some(tx)) => Ok(tx), + Ok(None) => Err(eyre!("unable to find transaction with hash: {}", hash)), + Err(e) => Err(e), + } + } + + /// Get the last published L1 block marked as `Finalized`. + async fn get_last_l1_block_number(provider: &Provider) -> Result { + let Some(last_block) = L1Fetcher::retry_call( + || provider.get_block(BlockNumber::Finalized), + L1FetchError::GetEndBlockNumber, + ) + .await? + else { + return Err(eyre!("latest finalized block was not found")); + }; + + last_block + .number + .ok_or_eyre("found latest finalized block, but it contained no block number") + } + async fn retry_call(callback: impl Fn() -> Fut, err: L1FetchError) -> Result where Fut: Future>, @@ -598,7 +701,6 @@ impl L1Fetcher { Err(err.into()) } } - pub async fn parse_calldata( l1_block_number: u64, commit_candidates: &[Function], diff --git a/state-reconstruct-fetcher/src/metrics.rs b/state-reconstruct-fetcher/src/metrics.rs index f2dd3be..3c8a487 100644 --- a/state-reconstruct-fetcher/src/metrics.rs +++ b/state-reconstruct-fetcher/src/metrics.rs @@ -62,7 +62,7 @@ impl L1Metrics { }; tracing::info!( - "PROGRESS: [{}] CUR L1 BLOCK: {} L2 BATCH: {} TOTAL PROCESSED L1 BLOCKS: {} L2 BATCHES: {}", + "PROGRESS: [{}] CUR L1 BLOCK: {} L1 BATCH: {} TOTAL PROCESSED L1 BLOCKS: {} L1 BATCHES: {}", progress, self.latest_l1_block_num, self.latest_l1_batch_num, diff --git a/state-reconstruct-storage/src/snapshot.rs b/state-reconstruct-storage/src/snapshot.rs index c974369..7078cd1 100644 --- a/state-reconstruct-storage/src/snapshot.rs +++ b/state-reconstruct-storage/src/snapshot.rs @@ -74,7 +74,7 @@ impl SnapshotDatabase { PackingType::NoCompression(v) | PackingType::Transform(v) => v, PackingType::Add(_) | PackingType::Sub(_) => { let existing_value = if let Some(log) = self.get_storage_log(&key)? { - U256::from(log.value.to_fixed_bytes()) + U256::from_big_endian(log.value.as_bytes()) } else { U256::from(0) }; @@ -134,7 +134,7 @@ impl SnapshotDatabase { } } - pub fn update_storage_log_value(&self, key_idx: u64, value: &[u8]) -> Result<()> { + pub fn update_storage_log_value(&self, key_idx: u64, value: H256) -> Result<()> { // Unwrapping column family handle here is safe because presence of // those CFs is ensured in construction of this DB. let storage_logs = self.cf_handle(snapshot_columns::STORAGE_LOGS).unwrap(); @@ -143,7 +143,7 @@ impl SnapshotDatabase { // XXX: These should really be inside a transaction... let entry_bs = self.get_cf(storage_logs, &key)?.unwrap(); let mut entry: SnapshotStorageLog = bincode::deserialize(&entry_bs)?; - entry.value = H256::from(<&[u8; 32]>::try_from(value).unwrap()); + entry.value = value; self.put_cf(storage_logs, key, bincode::serialize(&entry)?) .map_err(Into::into) } diff --git a/state-reconstruct-storage/src/types.rs b/state-reconstruct-storage/src/types.rs index e0c4806..e837539 100644 --- a/state-reconstruct-storage/src/types.rs +++ b/state-reconstruct-storage/src/types.rs @@ -169,7 +169,7 @@ impl Proto for SnapshotStorageLog { fn from_proto(proto: Self::ProtoStruct) -> Result { let value_bytes: [u8; 32] = proto.storage_value().try_into()?; Ok(Self { - key: U256::from_big_endian(proto.storage_key()), + key: U256::from_big_endian(proto.hashed_key()), value: StorageValue::from(&value_bytes), l1_batch_number_of_initial_write: proto.l1_batch_number_of_initial_write().into(), enumeration_index: proto.enumeration_index(),