Skip to content

Commit

Permalink
fix(ethexe): Fix critical bug of prefix iterating RocksDB (#4285)
Browse files Browse the repository at this point in the history
  • Loading branch information
breathx authored Oct 11, 2024
1 parent c850327 commit bcf7899
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
30 changes: 22 additions & 8 deletions ethexe/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,32 @@ mod tests {
}

pub fn kv_iter_prefix<DB: KVDatabase>(db: DB) {
let testcase = |prefix: &str, expectations: &[(&str, &str)]| {
let actual: BTreeSet<_> = db.iter_prefix(prefix.as_bytes()).collect();
let expected: BTreeSet<_> = expectations
.iter()
.map(|(k, v)| (k.as_bytes().to_vec(), v.as_bytes().to_vec()))
.collect();
assert_eq!(actual, expected);
};

db.put(b"prefix_foo", b"hello".to_vec());
db.put(b"prefix_bar", b"world".to_vec());

let values: BTreeSet<(Vec<u8>, Vec<u8>)> = db.iter_prefix(b"prefix_").collect();
assert_eq!(
values,
[
(b"prefix_foo".to_vec(), b"hello".to_vec()),
(b"prefix_bar".to_vec(), b"world".to_vec()),
]
.into(),
testcase(
"prefix_",
&[("prefix_foo", "hello"), ("prefix_bar", "world")],
);

testcase("", &[("prefix_foo", "hello"), ("prefix_bar", "world")]);

testcase("0", &[]);

testcase("prefix_foobar", &[]);

testcase("prefix_foo", &[("prefix_foo", "hello")]);

testcase("prefix_bar", &[("prefix_bar", "world")]);
}

pub fn cas_multi_thread<DB: CASDatabase>(db: DB) {
Expand Down
43 changes: 35 additions & 8 deletions ethexe/db/src/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use crate::{CASDatabase, KVDatabase};
use anyhow::Result;
use gprimitives::H256;
use rocksdb::{Options, DB};
use rocksdb::{DBIteratorWithThreadMode, Options, DB};
use std::{path::PathBuf, sync::Arc};

/// Database for storing states and codes in memory.
Expand Down Expand Up @@ -85,13 +85,40 @@ impl KVDatabase for RocksDatabase {
.expect("Failed to write data, database is not in valid state");
}

fn iter_prefix(&self, prefix: &[u8]) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + '_> {
Box::new(
self.inner
.prefix_iterator(prefix)
.map(|kv| kv.expect("unexpected error during iteration"))
.map(|(k, v)| (<[u8]>::into_vec(k), <[u8]>::into_vec(v))),
)
fn iter_prefix<'a>(
&'a self,
prefix: &'a [u8],
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + '_> {
Box::new(PrefixIterator {
prefix,
prefix_iter: self.inner.prefix_iterator(prefix),
done: false,
})
}
}

pub struct PrefixIterator<'a> {
prefix: &'a [u8],
prefix_iter: DBIteratorWithThreadMode<'a, DB>,
done: bool,
}

impl<'a> Iterator for PrefixIterator<'a> {
type Item = (Vec<u8>, Vec<u8>);

fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}

match self.prefix_iter.next() {
Some(Ok((k, v))) if k.starts_with(self.prefix) => Some((k.to_vec(), v.to_vec())),
Some(Err(e)) => panic!("Failed to read data, database is not in valid state: {e:?}"),
_ => {
self.done = true;
None
}
}
}
}

Expand Down

0 comments on commit bcf7899

Please sign in to comment.