Skip to content

Commit

Permalink
feat: map l1 batch numbers to their l1 block number equivalent (#113)
Browse files Browse the repository at this point in the history
* feat: map l1 batch numbers to their l1 block number equivalent

* chore: ceil instead of add one

* chore: make mapping task cancellable

* chore: remove `println!`

* chore: clone tokens directly instead of passing around
  • Loading branch information
zeapoz authored Aug 7, 2024
1 parent a347f4b commit ab79f68
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 121 deletions.
3 changes: 0 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ pub enum Command {
/// The path to the storage solution.
#[arg(short, long, default_value = snapshot::DEFAULT_DB_PATH)]
db_path: Option<String>,
/// Number of chunks to split storage chunks into.
#[arg(short, long, default_value_t = snapshot::DEFAULT_NUM_CHUNKS)]
num_chunks: usize,
/// The directory to export the snapshot files to.
directory: String,
},
Expand Down
33 changes: 17 additions & 16 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use std::{
path::{Path, PathBuf},
};

use ::eyre::Result;
use clap::Parser;
use cli::{Cli, Command, ReconstructSource};
use eyre::Result;
use processor::snapshot::{
exporter::SnapshotExporter, importer::SnapshotImporter, SnapshotBuilder,
};
Expand Down Expand Up @@ -75,24 +75,29 @@ async fn main() -> Result<()> {
None => env::current_dir()?.join(storage::DEFAULT_DB_NAME),
};

if let Some(directory) = snapshot {
tracing::info!("Trying to restore state from snapshot...");
let importer = SnapshotImporter::new(PathBuf::from(directory));
importer.run(&db_path.clone()).await?;
}
let snapshot_end_batch = match snapshot {
Some(directory) => {
tracing::info!("Trying to restore state from snapshot...");
let importer = SnapshotImporter::new(PathBuf::from(directory));
let end_batch = importer.run(&db_path.clone()).await?;
Some(end_batch)
}
None => None,
};

match source {
ReconstructSource::L1 { l1_fetcher_options } => {
let fetcher_options = l1_fetcher_options.into();
let processor = TreeProcessor::new(db_path.clone()).await?;
let fetcher = L1Fetcher::new(fetcher_options, Some(processor.get_inner_db()))?;

let (tx, rx) = mpsc::channel::<CommitBlock>(5);

let processor_handle = tokio::spawn(async move {
processor.run(rx).await;
});

fetcher.run(tx).await?;
fetcher.run(tx, snapshot_end_batch).await?;
processor_handle.await?;
}
ReconstructSource::File { file } => {
Expand Down Expand Up @@ -128,7 +133,7 @@ async fn main() -> Result<()> {
processor.run(rx).await;
});

fetcher.run(tx).await?;
fetcher.run(tx, None).await?;
processor_handle.await?;

tracing::info!("Successfully downloaded CommitBlocks to {}", file);
Expand Down Expand Up @@ -159,7 +164,7 @@ async fn main() -> Result<()> {
let processor = SnapshotBuilder::new(db_path);

let mut fetcher_options: L1FetcherOptions = l1_fetcher_options.into();
if let Ok(batch_number) = processor.get_latest_l1_batch_number() {
if let Ok(batch_number) = processor.get_latest_l1_block_number() {
let batch_number = batch_number.as_u64();
if batch_number > ethereum::GENESIS_BLOCK {
tracing::info!(
Expand All @@ -176,18 +181,14 @@ async fn main() -> Result<()> {
processor.run(rx).await;
});

fetcher.run(tx).await?;
fetcher.run(tx, None).await?;
processor_handle.await?;
}
Command::ExportSnapshot {
db_path,
num_chunks,
directory,
} => {
Command::ExportSnapshot { db_path, directory } => {
let export_path = Path::new(&directory);
std::fs::create_dir_all(export_path)?;
let exporter = SnapshotExporter::new(export_path, db_path)?;
exporter.export_snapshot(num_chunks)?;
exporter.export_snapshot()?;

tracing::info!("Succesfully exported snapshot files to \"{directory}\"!");
}
Expand Down
17 changes: 10 additions & 7 deletions src/processor/snapshot/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ use crate::processor::snapshot::{
DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME,
};

/// Number of storage logs included in each chunk.
const SNAPSHOT_CHUNK_SIZE: usize = 1_000_000;

pub struct SnapshotExporter {
basedir: PathBuf,
database: SnapshotDatabase,
Expand All @@ -35,7 +38,7 @@ impl SnapshotExporter {
})
}

pub fn export_snapshot(&self, num_chunks: usize) -> Result<()> {
pub fn export_snapshot(&self) -> Result<()> {
let l1_batch_number = self
.database
.get_latest_l1_batch_number()?
Expand All @@ -50,7 +53,7 @@ impl SnapshotExporter {
..Default::default()
};

self.export_storage_logs(num_chunks, &mut header)?;
self.export_storage_logs(&mut header)?;
self.export_factory_deps(&mut header)?;

let path = self.basedir.join(SNAPSHOT_HEADER_FILE_NAME);
Expand Down Expand Up @@ -97,7 +100,7 @@ impl SnapshotExporter {
Ok(())
}

fn export_storage_logs(&self, num_chunks: usize, header: &mut SnapshotHeader) -> Result<()> {
fn export_storage_logs(&self, header: &mut SnapshotHeader) -> Result<()> {
tracing::info!("Exporting storage logs...");

let num_logs = self.database.get_last_repeated_key_index()?;
Expand All @@ -108,17 +111,17 @@ impl SnapshotExporter {
.database
.iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start);

let chunk_size = num_logs / num_chunks as u64;
let num_chunks = num_logs.div_ceil(SNAPSHOT_CHUNK_SIZE as u64);
for chunk_id in 0..num_chunks {
tracing::info!("Serializing chunk {}/{}...", chunk_id + 1, num_chunks);

let mut chunk = SnapshotStorageLogsChunk::default();
for _ in 0..chunk_size {
for _ in 0..SNAPSHOT_CHUNK_SIZE {
if let Some(Ok((_, key))) = iterator.next() {
let key = U256::from_big_endian(&key);
if let Ok(Some(entry)) = self.database.get_storage_log(&key) {
chunk.storage_logs.push(entry);
}
};
} else {
break;
}
Expand All @@ -131,7 +134,7 @@ impl SnapshotExporter {
header
.storage_logs_chunks
.push(SnapshotStorageLogsChunkMetadata {
chunk_id: chunk_id as u64,
chunk_id,
filepath: path
.clone()
.into_os_string()
Expand Down
11 changes: 5 additions & 6 deletions src/processor/snapshot/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
use ethers::types::U64;
use eyre::Result;
use regex::{Captures, Regex};
use state_reconstruct_fetcher::constants::ethereum::GENESIS_BLOCK;
use state_reconstruct_storage::types::{
Proto, SnapshotFactoryDependencies, SnapshotHeader, SnapshotStorageLogsChunk,
SnapshotStorageLogsChunkMetadata,
Expand All @@ -29,7 +28,8 @@ impl SnapshotImporter {
Self { directory }
}

pub async fn run(self, db_path: &Path) -> Result<()> {
/// Run the snapshot importer task. Returns the batch number contained in the header.
pub async fn run(self, db_path: &Path) -> Result<U64> {
let (tx, rx) = mpsc::channel(1);

let header = self.read_header().expect("failed to read header filepath");
Expand All @@ -46,14 +46,13 @@ impl SnapshotImporter {
}
});

let l1_batch_number = header.l1_batch_number + GENESIS_BLOCK;
let l1_batch_number = U64::from(header.l1_batch_number);
let mut tree = TreeWrapper::new_snapshot_wrapper(db_path)
.await
.expect("can't create tree");
tree.restore_from_snapshot(rx, U64::from(l1_batch_number))
.await?;
tree.restore_from_snapshot(rx, l1_batch_number).await?;

Ok(())
Ok(l1_batch_number)
}

fn read_header(&self) -> Result<SnapshotHeader> {
Expand Down
7 changes: 3 additions & 4 deletions src/processor/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::util::{h256_to_u256, unpack_block_info};
pub const DEFAULT_DB_PATH: &str = "snapshot_db";
pub const SNAPSHOT_HEADER_FILE_NAME: &str = "snapshot-header.json";
pub const SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX: &str = "factory_deps.proto.gzip";
pub const DEFAULT_NUM_CHUNKS: usize = 10;

pub struct SnapshotBuilder {
database: SnapshotDatabase,
Expand Down Expand Up @@ -53,8 +52,8 @@ impl SnapshotBuilder {
Self { database }
}

// Gets the next L1 batch number to be processed for ues in state recovery.
pub fn get_latest_l1_batch_number(&self) -> Result<U64> {
// Gets the next L1 block number to be processed for ues in state recovery.
pub fn get_latest_l1_block_number(&self) -> Result<U64> {
self.database
.get_latest_l1_block_number()
.map(|o| o.unwrap_or(U64::from(0)))
Expand Down Expand Up @@ -107,7 +106,7 @@ impl Processor for SnapshotBuilder {

if self
.database
.update_storage_log_value(index as u64, &value.to_fixed_bytes())
.update_storage_log_value(index as u64, value)
.is_err()
{
let max_idx = self
Expand Down
9 changes: 6 additions & 3 deletions src/processor/tree/tree_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,21 @@ impl TreeWrapper {
total_tree_entries += tree_entries.len();
self.tree.extend(tree_entries);

tracing::info!("Chunk {} was succesfully imported!", i + 1);
tracing::info!("Chunk {} was successfully imported!", i + 1);
i += 1;
}

tracing::info!(
"Succesfully imported snapshot containing {total_tree_entries} storage logs!",
"Successfully imported snapshot containing {total_tree_entries} storage logs!",
);

let root_hash = hex::encode(self.tree.latest_root_hash());
tracing::debug!("Current root hash is: {}", root_hash);

self.inner_db
.lock()
.await
.set_latest_l1_batch_number(l1_batch_number.as_u64() + 1)?;
.set_latest_l1_batch_number(l1_batch_number.as_u64())?;

Ok(())
}
Expand Down
Loading

0 comments on commit ab79f68

Please sign in to comment.