Skip to content

Commit

Permalink
feat(vm-runner): Implement batch data prefetching (#2724)
Browse files Browse the repository at this point in the history
## What ❔

- Implements prefetching of storage slots / bytecodes accessed by a VM
in a batch. Enables it for the VM playground. Optionally shadows
prefetched snapshot storage.
- Makes RocksDB cache optional for VM playground.

## Why ❔

- Prefetching will allow to load storage slots / bytecodes for a batch
in O(1) DB queries, which is very efficient for local debugging etc. It
may be on par or faster than using RocksDB cache. (There's a caveat:
prefetching doesn't work w/o protective reads.)
- Disabling RocksDB cache is useful for local testing, since the cache
won't catch up during a single batch run anyway.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli authored Aug 30, 2024
1 parent 755fc4a commit d01840d
Show file tree
Hide file tree
Showing 23 changed files with 734 additions and 206 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 2 additions & 7 deletions core/lib/config/src/configs/experimental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ pub struct ExperimentalVmPlaygroundConfig {
#[serde(default)]
pub fast_vm_mode: FastVmMode,
/// Path to the RocksDB cache directory.
#[serde(default = "ExperimentalVmPlaygroundConfig::default_db_path")]
pub db_path: String,
pub db_path: Option<String>,
/// First L1 batch to consider processed. Will not be used if the processing cursor is persisted, unless the `reset` flag is set.
#[serde(default)]
pub first_processed_batch: L1BatchNumber,
Expand All @@ -83,7 +82,7 @@ impl Default for ExperimentalVmPlaygroundConfig {
fn default() -> Self {
Self {
fast_vm_mode: FastVmMode::default(),
db_path: Self::default_db_path(),
db_path: None,
first_processed_batch: L1BatchNumber(0),
window_size: Self::default_window_size(),
reset: false,
Expand All @@ -92,10 +91,6 @@ impl Default for ExperimentalVmPlaygroundConfig {
}

impl ExperimentalVmPlaygroundConfig {
pub fn default_db_path() -> String {
"./db/vm_playground".to_owned()
}

pub fn default_window_size() -> NonZeroU32 {
NonZeroU32::new(1).unwrap()
}
Expand Down
4 changes: 2 additions & 2 deletions core/lib/env_config/src/vm_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ mod tests {
let config = ExperimentalVmConfig::from_env().unwrap();
assert_eq!(config.state_keeper_fast_vm_mode, FastVmMode::New);
assert_eq!(config.playground.fast_vm_mode, FastVmMode::Shadow);
assert_eq!(config.playground.db_path, "/db/vm_playground");
assert_eq!(config.playground.db_path.unwrap(), "/db/vm_playground");
assert_eq!(config.playground.first_processed_batch, L1BatchNumber(123));
assert!(config.playground.reset);

Expand All @@ -83,6 +83,6 @@ mod tests {

lock.remove_env(&["EXPERIMENTAL_VM_PLAYGROUND_DB_PATH"]);
let config = ExperimentalVmConfig::from_env().unwrap();
assert!(!config.playground.db_path.is_empty());
assert!(config.playground.db_path.is_none());
}
}
7 changes: 2 additions & 5 deletions core/lib/protobuf_config/src/experimental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,7 @@ impl ProtoRepr for proto::VmPlayground {
.transpose()
.context("fast_vm_mode")?
.map_or_else(FastVmMode::default, |mode| mode.parse()),
db_path: self
.db_path
.clone()
.unwrap_or_else(Self::Type::default_db_path),
db_path: self.db_path.clone(),
first_processed_batch: L1BatchNumber(self.first_processed_batch.unwrap_or(0)),
window_size: NonZeroU32::new(self.window_size.unwrap_or(1))
.context("window_size cannot be 0")?,
Expand All @@ -94,7 +91,7 @@ impl ProtoRepr for proto::VmPlayground {
fn build(this: &Self::Type) -> Self {
Self {
fast_vm_mode: Some(proto::FastVmMode::new(this.fast_vm_mode).into()),
db_path: Some(this.db_path.clone()),
db_path: this.db_path.clone(),
first_processed_batch: Some(this.first_processed_batch.0),
window_size: Some(this.window_size.get()),
reset: Some(this.reset),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ enum FastVmMode {
// Experimental VM configuration
message VmPlayground {
optional FastVmMode fast_vm_mode = 1; // optional; if not set, fast VM is not used
optional string db_path = 2; // optional; defaults to `./db/vm_playground`
optional string db_path = 2; // optional; if not set, playground will not use RocksDB cache
optional uint32 first_processed_batch = 3; // optional; defaults to 0
optional bool reset = 4; // optional; defaults to false
optional uint32 window_size = 5; // optional; non-zero; defaults to 1
Expand Down
2 changes: 1 addition & 1 deletion core/lib/state/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use self::{
},
shadow_storage::ShadowStorage,
storage_factory::{
BatchDiff, OwnedStorage, PgOrRocksdbStorage, ReadStorageFactory, RocksdbWithMemory,
BatchDiff, CommonStorage, OwnedStorage, ReadStorageFactory, RocksdbWithMemory,
},
};

Expand Down
2 changes: 1 addition & 1 deletion core/lib/state/src/rocksdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ impl RocksdbStorage {
let to_l1_batch_number = if let Some(to_l1_batch_number) = to_l1_batch_number {
if to_l1_batch_number > latest_l1_batch_number {
let err = anyhow::anyhow!(
"Requested to update RocksDB to L1 batch number ({current_l1_batch_number}) that \
"Requested to update RocksDB to L1 batch number ({to_l1_batch_number}) that \
is greater than the last sealed L1 batch number in Postgres ({latest_l1_batch_number})"
);
return Err(err.into());
Expand Down
78 changes: 49 additions & 29 deletions core/lib/state/src/shadow_storage.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::fmt;

use vise::{Counter, Metrics};
use zksync_types::{L1BatchNumber, StorageKey, StorageValue, H256};
use zksync_vm_interface::storage::ReadStorage;

#[allow(clippy::struct_field_names)]
#[derive(Debug, Metrics)]
#[metrics(prefix = "shadow_storage")]
#[allow(clippy::struct_field_names)] // false positive
struct ShadowStorageMetrics {
/// Number of mismatches when reading a value from a shadow storage.
read_value_mismatch: Counter,
Expand All @@ -19,60 +21,78 @@ struct ShadowStorageMetrics {
#[vise::register]
static METRICS: vise::Global<ShadowStorageMetrics> = vise::Global::new();

/// [`ReadStorage`] implementation backed by 2 different backends:
/// source_storage -- backend that will return values for function calls and be the source of truth
/// to_check_storage -- secondary storage, which will verify it's own return values against source_storage
/// Note that if to_check_storage value is different than source value, execution continues and metrics/ logs are emitted.
/// [`ReadStorage`] implementation backed by 2 different backends which are compared for each performed operation.
///
/// - `Ref` is the backend that will return values for function calls and be the source of truth
/// - `Check` is the secondary storage, which will have its return values verified against `Ref`
///
/// If `Check` value is different from a value from `Ref`, storage behavior depends on the [panic on divergence](Self::set_panic_on_divergence()) flag.
/// If this flag is set (which it is by default), the storage panics; otherwise, execution continues and metrics / logs are emitted.
#[derive(Debug)]
pub struct ShadowStorage<'a> {
source_storage: Box<dyn ReadStorage + 'a>,
to_check_storage: Box<dyn ReadStorage + 'a>,
metrics: &'a ShadowStorageMetrics,
pub struct ShadowStorage<Ref, Check> {
source_storage: Ref,
to_check_storage: Check,
metrics: &'static ShadowStorageMetrics,
l1_batch_number: L1BatchNumber,
panic_on_divergence: bool,
}

impl<'a> ShadowStorage<'a> {
impl<Ref: ReadStorage, Check: ReadStorage> ShadowStorage<Ref, Check> {
/// Creates a new storage using the 2 underlying [`ReadStorage`]s, first as source, the second to be checked
/// against the source.
pub fn new(
source_storage: Box<dyn ReadStorage + 'a>,
to_check_storage: Box<dyn ReadStorage + 'a>,
source_storage: Ref,
to_check_storage: Check,
l1_batch_number: L1BatchNumber,
) -> Self {
Self {
source_storage,
to_check_storage,
metrics: &METRICS,
l1_batch_number,
panic_on_divergence: true,
}
}

/// Sets behavior if a storage divergence is detected.
pub fn set_panic_on_divergence(&mut self, panic_on_divergence: bool) {
self.panic_on_divergence = panic_on_divergence;
}

fn error_or_panic(&self, args: fmt::Arguments<'_>) {
if self.panic_on_divergence {
panic!("{args}");
} else {
tracing::error!(l1_batch_number = self.l1_batch_number.0, "{args}");
}
}
}

impl ReadStorage for ShadowStorage<'_> {
impl<Ref: ReadStorage, Check: ReadStorage> ReadStorage for ShadowStorage<Ref, Check> {
fn read_value(&mut self, key: &StorageKey) -> StorageValue {
let source_value = self.source_storage.as_mut().read_value(key);
let expected_value = self.to_check_storage.as_mut().read_value(key);
let source_value = self.source_storage.read_value(key);
let expected_value = self.to_check_storage.read_value(key);
if source_value != expected_value {
self.metrics.read_value_mismatch.inc();
tracing::error!(
self.error_or_panic(format_args!(
"read_value({key:?}) -- l1_batch_number={:?} -- expected source={source_value:?} \
to be equal to to_check={expected_value:?}",
self.l1_batch_number
);
));
}
source_value
}

fn is_write_initial(&mut self, key: &StorageKey) -> bool {
let source_value = self.source_storage.as_mut().is_write_initial(key);
let expected_value = self.to_check_storage.as_mut().is_write_initial(key);
let source_value = self.source_storage.is_write_initial(key);
let expected_value = self.to_check_storage.is_write_initial(key);
if source_value != expected_value {
self.metrics.is_write_initial_mismatch.inc();
tracing::error!(
self.error_or_panic(format_args!(
"is_write_initial({key:?}) -- l1_batch_number={:?} -- expected source={source_value:?} \
to be equal to to_check={expected_value:?}",
self.l1_batch_number
);
));
}
source_value
}
Expand All @@ -82,25 +102,25 @@ impl ReadStorage for ShadowStorage<'_> {
let expected_value = self.to_check_storage.load_factory_dep(hash);
if source_value != expected_value {
self.metrics.load_factory_dep_mismatch.inc();
tracing::error!(
self.error_or_panic(format_args!(
"load_factory_dep({hash:?}) -- l1_batch_number={:?} -- expected source={source_value:?} \
to be equal to to_check={expected_value:?}",
self.l1_batch_number
);
self.l1_batch_number
));
}
source_value
}

fn get_enumeration_index(&mut self, key: &StorageKey) -> Option<u64> {
let source_value = self.source_storage.as_mut().get_enumeration_index(key);
let expected_value = self.to_check_storage.as_mut().get_enumeration_index(key);
let source_value = self.source_storage.get_enumeration_index(key);
let expected_value = self.to_check_storage.get_enumeration_index(key);
if source_value != expected_value {
tracing::error!(
self.metrics.get_enumeration_index_mismatch.inc();
self.error_or_panic(format_args!(
"get_enumeration_index({key:?}) -- l1_batch_number={:?} -- \
expected source={source_value:?} to be equal to to_check={expected_value:?}",
self.l1_batch_number
);
self.metrics.get_enumeration_index_mismatch.inc();
));
}
source_value
}
Expand Down
Loading

0 comments on commit d01840d

Please sign in to comment.