Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: snapshot imports #79

Merged
merged 12 commits into from
Apr 3, 2024
27 changes: 1 addition & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ blake2 = "0.10.6"
bytes = "1.5"
chrono = "0.4.31"
clap = { version = "4.4.7", features = ["derive", "env"] }
deflate = { version = "1.0.0", features = ["gzip"] }
ethers = "1.0.2"
eyre = "0.6.8"
flate2 = "1.0.28"
hex = "0.4.3"
indexmap = { version = "2.0.2" }
primitive-types = "0.12.2"
Expand Down
4 changes: 4 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ pub enum Command {
/// The path to the storage solution.
#[arg(short, long, env = "ZK_SYNC_DB_PATH")]
db_path: Option<String>,
/// If present, try to restore state from snapshot files contained in the specified
/// directory. Note that this will only work when supplied with a fresh database.
#[arg(long)]
snapshot: Option<String>,
},

/// Query the local storage, and optionally, return a JSON-payload of the data.
Expand Down
21 changes: 18 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ use std::{
use clap::Parser;
use cli::{Cli, Command, ReconstructSource};
use eyre::Result;
use processor::snapshot::{SnapshotBuilder, SnapshotExporter};
use processor::snapshot::{
exporter::SnapshotExporter, importer::SnapshotImporter, SnapshotBuilder,
};
use state_reconstruct_fetcher::{constants::storage, l1_fetcher::L1Fetcher, types::CommitBlock};
use tikv_jemallocator::Jemalloc;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -59,12 +61,23 @@ async fn main() -> Result<()> {
let cli = Cli::parse();

match cli.subcommand {
Command::Reconstruct { source, db_path } => {
Command::Reconstruct {
source,
db_path,
snapshot,
} => {
let db_path = match db_path {
Some(path) => PathBuf::from(path),
None => env::current_dir()?.join(storage::DEFAULT_DB_NAME),
};

if let Some(directory) = snapshot {
tracing::info!("Trying to restore state from snapshot...");
let importer =
SnapshotImporter::new(PathBuf::from(directory), &db_path.clone()).await?;
importer.run().await?;
}

match source {
ReconstructSource::L1 { l1_fetcher_options } => {
let fetcher_options = l1_fetcher_options.into();
Expand Down Expand Up @@ -159,8 +172,10 @@ async fn main() -> Result<()> {
} => {
let export_path = Path::new(&directory);
std::fs::create_dir_all(export_path)?;
let exporter = SnapshotExporter::new(export_path, db_path);
let exporter = SnapshotExporter::new(export_path, db_path)?;
exporter.export_snapshot(chunk_size)?;

tracing::info!("Succesfully exported snapshot files to \"{directory}\"!");
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/processor/snapshot/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl SnapshotDB {
idx_bytes[7],
])
} else {
self.put_cf(metadata, LAST_REPEATED_KEY_INDEX, u64::to_be_bytes(1))?;
self.put_cf(metadata, LAST_REPEATED_KEY_INDEX, u64::to_be_bytes(0))?;
0
},
)
Expand Down
187 changes: 187 additions & 0 deletions src/processor/snapshot/exporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use std::{
io::Write,
path::{Path, PathBuf},
};

use bytes::BytesMut;
use eyre::Result;
use flate2::{write::GzEncoder, Compression};
use prost::Message;

use super::{
database::{self, SnapshotDB},
types::{self, SnapshotFactoryDependency, SnapshotHeader},
DEFAULT_DB_PATH, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX, SNAPSHOT_HEADER_FILE_NAME,
};

pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/protobuf.rs"));
}

pub struct SnapshotExporter {
basedir: PathBuf,
database: SnapshotDB,
}

impl SnapshotExporter {
pub fn new(basedir: &Path, db_path: Option<String>) -> Result<Self> {
let db_path = match db_path {
Some(p) => PathBuf::from(p),
None => PathBuf::from(DEFAULT_DB_PATH),
};

let database = SnapshotDB::new_read_only(db_path)?;
Ok(Self {
basedir: basedir.to_path_buf(),
database,
})
}

pub fn export_snapshot(&self, chunk_size: u64) -> Result<()> {
let mut header = SnapshotHeader::default();
self.export_storage_logs(chunk_size, &mut header)?;
self.export_factory_deps(&mut header)?;

let path = self.basedir.join(SNAPSHOT_HEADER_FILE_NAME);
let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

serde_json::to_writer(outfile, &header)?;

Ok(())
}

fn export_factory_deps(&self, header: &mut SnapshotHeader) -> Result<()> {
let mut buf = BytesMut::new();

let storage_logs = self.database.cf_handle(database::FACTORY_DEPS).unwrap();
let mut iterator = self
.database
.iterator_cf(storage_logs, rocksdb::IteratorMode::Start);

let mut factory_deps = protobuf::SnapshotFactoryDependencies::default();
while let Some(Ok((_, bs))) = iterator.next() {
let factory_dep: SnapshotFactoryDependency = bincode::deserialize(&bs)?;
factory_deps
.factory_deps
.push(protobuf::SnapshotFactoryDependency {
bytecode: Some(factory_dep.bytecode),
});
}

let fd_len = factory_deps.encoded_len();
if buf.capacity() < fd_len {
buf.reserve(fd_len - buf.capacity());
}

let path = self.basedir.join(format!(
"snapshot_l1_batch_{}_{}",
header.l1_batch_number, SNAPSHOT_FACTORY_DEPS_FILE_NAME_SUFFIX
));
header.factory_deps_filepath = path
.clone()
.into_os_string()
.into_string()
.expect("path to string");

// Serialize chunk.
factory_deps.encode(&mut buf)?;

let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

// Wrap in gzip compression before writing.
let mut encoder = GzEncoder::new(outfile, Compression::default());
encoder.write_all(&buf)?;
encoder.finish()?;

Ok(())
}

fn export_storage_logs(&self, chunk_size: u64, header: &mut SnapshotHeader) -> Result<()> {
let mut buf = BytesMut::new();
let mut chunk_id = 0;

let index_to_key_map = self.database.cf_handle(database::INDEX_TO_KEY_MAP).unwrap();
let mut iterator = self
.database
.iterator_cf(index_to_key_map, rocksdb::IteratorMode::Start);

let mut has_more = true;

while has_more {
let mut chunk = protobuf::SnapshotStorageLogsChunk {
storage_logs: vec![],
};

for _ in 0..chunk_size {
if let Some(Ok((_, key))) = iterator.next() {
if let Ok(Some(entry)) = self.database.get_storage_log(key.as_ref()) {
let pb = protobuf::SnapshotStorageLog {
account_address: None,
storage_key: Some(key.to_vec()),
storage_value: Some(entry.value.0.to_vec()),
l1_batch_number_of_initial_write: Some(
entry.l1_batch_number_of_initial_write.as_u32(),
),
enumeration_index: Some(entry.enumeration_index),
};

chunk.storage_logs.push(pb);
header.l1_batch_number = entry.l1_batch_number_of_initial_write;
}
} else {
has_more = false;
}
}

// Ensure that write buffer has enough capacity.
let chunk_len = chunk.encoded_len();
if buf.capacity() < chunk_len {
buf.reserve(chunk_len - buf.capacity());
}

let path = PathBuf::new().join(&self.basedir).join(format!(
"snapshot_l1_batch_{}_storage_logs_part_{:0>4}.proto.gzip",
header.l1_batch_number, chunk_id
));
chunk_id += 1;

header
.storage_logs_chunks
.push(types::SnapshotStorageLogsChunkMetadata {
chunk_id,
filepath: path
.clone()
.into_os_string()
.into_string()
.expect("path to string"),
});

// Serialize chunk.
chunk.encode(&mut buf)?;

let outfile = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(path)?;

// Wrap in gzip compression before writing.
let mut encoder = GzEncoder::new(outfile, Compression::default());
encoder.write_all(&buf)?;
encoder.finish()?;

// Clear $tmp buffer.
buf.truncate(0);
}

Ok(())
}
}
Loading