Skip to content

Commit

Permalink
Bugfix: the iter_all_by_prefix was not working for all tables (#1814)
Browse files Browse the repository at this point in the history
We used `RocksDB` filtering by prefix during `iter_all_by_prefix`. But
if the prefix is not set for the column, the `RocksDB` ignores it.

We don't set the prefix for all tables because of performance purposes.
But it means that iteration sometimes can be wrong.

The change adds a `Rust` level filtering since not all tables really
need a prefix.

The issue found by @segfault-magnet in
#1799 (comment)

### Before requesting review
- [x] I have reviewed the code myself
  • Loading branch information
xgreenx authored Apr 9, 2024
1 parent 7cd2ceb commit 987ad60
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 86 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]

Description of the upcoming release here.
### Fixed

- [#1814](https://github.com/FuelLabs/fuel-core/pull/1814): Bugfix: the `iter_all_by_prefix` was not working for all tables. The change adds a `Rust` level filtering.
=======
### Added

- [#1799](https://github.com/FuelLabs/fuel-core/pull/1799) Snapshot creation is now concurrent.
Expand Down
3 changes: 1 addition & 2 deletions crates/fuel-core/src/combined_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,13 @@ impl CombinedDatabase {
use fuel_core_chain_config::AddTable;
use itertools::Itertools;
let mut builder = StateConfigBuilder::default();
use crate::database::IncludeAll;

macro_rules! add_tables {
($($table: ty),*) => {
$(
let table = self
.on_chain()
.entries::<$table>(IncludeAll, fuel_core_storage::iter::IterDirection::Forward)
.entries::<$table>(None, fuel_core_storage::iter::IterDirection::Forward)
.try_collect()?;
builder.add(table);
)*
Expand Down
70 changes: 46 additions & 24 deletions crates/fuel-core/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,42 +104,20 @@ impl Database<OnChain> {
}
}

pub trait KeyFilter<K> {
fn start_at_prefix(&self) -> Option<&[u8]>;
fn should_stop(&self, key: &K) -> bool;
}

pub struct IncludeAll;
impl<K> KeyFilter<K> for IncludeAll {
fn start_at_prefix(&self) -> Option<&[u8]> {
None
}

fn should_stop(&self, _: &K) -> bool {
false
}
}

impl<DbDesc> Database<DbDesc>
where
DbDesc: DatabaseDescription,
{
pub fn entries<'a, T>(
&'a self,
filter: impl KeyFilter<T::OwnedKey> + 'a,
prefix: Option<Vec<u8>>,
direction: IterDirection,
) -> impl Iterator<Item = StorageResult<TableEntry<T>>> + 'a
where
T: TableWithBlueprint<Column = <DbDesc as DatabaseDescription>::Column> + 'a,
T::Blueprint: BlueprintInspect<T, Self>,
{
self.iter_all_filtered::<T, _>(filter.start_at_prefix(), None, Some(direction))
.take_while(move |result| {
let Ok((key, _)) = result.as_ref() else {
return true;
};
!filter.should_stop(key)
})
self.iter_all_filtered::<T, _>(prefix, None, Some(direction))
.map_ok(|(key, value)| TableEntry { key, value })
}
}
Expand Down Expand Up @@ -1009,4 +987,48 @@ mod tests {
);
}
}

#[cfg(feature = "rocksdb")]
#[test]
fn database_iter_all_by_prefix_works() {
use fuel_core_storage::tables::ContractsRawCode;
use fuel_core_types::fuel_types::ContractId;
use std::str::FromStr;

let test = |mut db: Database<OnChain>| {
let contract_id_1 = ContractId::from_str(
"5962be5ebddc516cb4ed7d7e76365f59e0d231ac25b53f262119edf76564aab4",
)
.unwrap();

let mut insert_empty_code = |id| {
StorageMutate::<ContractsRawCode>::insert(&mut db, &id, &[]).unwrap()
};
insert_empty_code(contract_id_1);

let contract_id_2 = ContractId::from_str(
"5baf0dcae7c114f647f6e71f1723f59bcfc14ecb28071e74895d97b14873c5dc",
)
.unwrap();
insert_empty_code(contract_id_2);

let matched_keys: Vec<_> = db
.iter_all_by_prefix::<ContractsRawCode, _>(Some(contract_id_1))
.map_ok(|(k, _)| k)
.try_collect()
.unwrap();

assert_eq!(matched_keys, vec![contract_id_1]);
};

let temp_dir = tempfile::tempdir().unwrap();
let db = Database::<OnChain>::in_memory();
// in memory passes
test(db);

let db = Database::<OnChain>::open_rocksdb(temp_dir.path(), 1024 * 1024 * 1024)
.unwrap();
// rocks db fails
test(db);
}
}
15 changes: 5 additions & 10 deletions crates/fuel-core/src/service/genesis/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use crate::{
database::{
database_description::DatabaseDescription,
Database,
IncludeAll,
KeyFilter,
},
fuel_core_graphql_api::storage::transactions::{
OwnedTransactions,
Expand Down Expand Up @@ -39,10 +37,7 @@ use itertools::Itertools;

use tokio_util::sync::CancellationToken;

use self::filter::ByContractId;

use super::task_manager::TaskManager;
mod filter;

pub struct Exporter<Fun> {
db: CombinedDatabase,
Expand Down Expand Up @@ -74,7 +69,7 @@ where
pub async fn write_full_snapshot(mut self) -> Result<(), anyhow::Error> {
macro_rules! export {
($db: expr, $($table: ty),*) => {
$(self.spawn_task::<$table, _>(IncludeAll, $db)?;)*
$(self.spawn_task::<$table, _>(None, $db)?;)*
};
}

Expand Down Expand Up @@ -106,8 +101,7 @@ where
) -> Result<(), anyhow::Error> {
macro_rules! export {
($($table: ty),*) => {
let filter = ByContractId::new(contract_id);
$(self.spawn_task::<$table, _>(filter, |ctx: &Self| ctx.db.on_chain())?;)*
$(self.spawn_task::<$table, _>(Some(contract_id.as_ref()), |ctx: &Self| ctx.db.on_chain())?;)*
};
}
export!(
Expand Down Expand Up @@ -145,7 +139,7 @@ where

fn spawn_task<T, DbDesc>(
&mut self,
filter: impl KeyFilter<T::OwnedKey> + Send + 'static,
prefix: Option<&[u8]>,
db_picker: impl FnOnce(&Self) -> &Database<DbDesc>,
) -> anyhow::Result<()>
where
Expand All @@ -160,9 +154,10 @@ where
let group_size = self.group_size;

let db = db_picker(self).clone();
let prefix = prefix.map(|p| p.to_vec());
self.task_manager.spawn(move |cancel| {
tokio_rayon::spawn(move || {
db.entries::<T>(filter, IterDirection::Forward)
db.entries::<T>(prefix, IterDirection::Forward)
.chunks(group_size)
.into_iter()
.take_while(|_| !cancel.is_cancelled())
Expand Down
49 changes: 0 additions & 49 deletions crates/fuel-core/src/service/genesis/exporter/filter.rs

This file was deleted.

14 changes: 13 additions & 1 deletion crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,22 @@ where
prefix,
convert_to_rocksdb_direction(direction),
);

// Setting prefix on the RocksDB level to optimize iteration.
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);

self._iter_all(column, opts, iter_mode).into_boxed()
let prefix = prefix.to_vec();
self._iter_all(column, opts, iter_mode)
// Not all tables has a prefix set, so we need to filter out the keys.
.take_while(move |item| {
if let Ok((key, _)) = item {
key.starts_with(prefix.as_slice())
} else {
true
}
})
.into_boxed()
}
}
(None, Some(start)) => {
Expand Down

0 comments on commit 987ad60

Please sign in to comment.