Skip to content

Commit

Permalink
Support backward iteration in the RocksDB (#1492)
Browse files Browse the repository at this point in the history
Reverting the change #1004 and
adds support for reverse prefix iteration in the RocksDB.

---------

Co-authored-by: Mitchell Turner <james.mitchell.turner@gmail.com>
  • Loading branch information
xgreenx and MitchTurner authored Nov 20, 2023
1 parent 1c46249 commit ce0392c
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Added

- [#1492](https://github.com/FuelLabs/fuel-core/pull/1492): Support backward iteration in the RocksDB. It allows backward queries that were not allowed before.
- [#1490](https://github.com/FuelLabs/fuel-core/pull/1490): Add push and pop benchmarks.
- [#1485](https://github.com/FuelLabs/fuel-core/pull/1485): Prepare rc release of fuel core v0.21
- [#1476](https://github.com/FuelLabs/fuel-core/pull/1453): Add the majority of the "other" benchmarks for contract opcodes.
Expand Down
6 changes: 0 additions & 6 deletions crates/fuel-core/src/schema/coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::{
U64,
},
};
use anyhow::anyhow;
use async_graphql::{
connection::{
Connection,
Expand Down Expand Up @@ -167,11 +166,6 @@ impl CoinQuery {
last: Option<i32>,
before: Option<String>,
) -> async_graphql::Result<Connection<UtxoId, Coin, EmptyFields, EmptyFields>> {
// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(anyhow!("reverse pagination isn't supported for this coins").into())
}

let query: &Database = ctx.data_unchecked();
crate::schema::query_pagination(after, before, first, last, |start, direction| {
let owner: fuel_tx::Address = filter.owner.into();
Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-core/src/schema/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
U64,
},
};
use anyhow::anyhow;
use async_graphql::{
connection::{
Connection,
Expand Down Expand Up @@ -138,13 +137,6 @@ impl ContractBalanceQuery {
> {
let query: &Database = ctx.data_unchecked();

// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(
anyhow!("reverse pagination isn't supported for this resource").into(),
)
}

crate::schema::query_pagination(after, before, first, last, |start, direction| {
let balances = query
.contract_balances(
Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-core/src/schema/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,6 @@ impl MessageQuery {
};

let messages = if let Some(owner) = owner {
// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(anyhow!(
"reverse pagination isn't supported for this resource"
)
.into())
}

query.owned_messages(&owner.0, start, direction)
} else {
query.all_messages(start, direction)
Expand Down
8 changes: 0 additions & 8 deletions crates/fuel-core/src/schema/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
TxPointer,
},
};
use anyhow::anyhow;
use async_graphql::{
connection::{
Connection,
Expand Down Expand Up @@ -168,13 +167,6 @@ impl TxQuery {
before: Option<String>,
) -> async_graphql::Result<Connection<TxPointer, Transaction, EmptyFields, EmptyFields>>
{
// Rocksdb doesn't support reverse iteration over a prefix
if matches!(last, Some(last) if last > 0) {
return Err(
anyhow!("reverse pagination isn't supported for this resource").into(),
)
}

let query: &Database = ctx.data_unchecked();
let config = ctx.data_unchecked::<Config>();
let owner = fuel_types::Address::from(owner);
Expand Down
86 changes: 80 additions & 6 deletions crates/fuel-core/src/state/rocks_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,63 @@ impl RocksDb {
opts
}

/// RocksDB prefix iteration doesn't support reverse order,
/// but seeking the start key and iterating in reverse order works.
/// So we can create a workaround. We need to find the next available
/// element and use it as an anchor for reverse iteration,
/// but skip the first element to jump on the previous prefix.
/// If we can't find the next element, we are at the end of the list,
/// so we can use `IteratorMode::End` to start reverse iteration.
fn reverse_prefix_iter(
&self,
prefix: &[u8],
column: Column,
) -> impl Iterator<Item = KVItem> + '_ {
let maybe_next_item = next_prefix(prefix.to_vec())
.and_then(|next_prefix| {
self.iter_all(
column,
Some(next_prefix.as_slice()),
None,
IterDirection::Forward,
)
.next()
})
.and_then(|res| res.ok());

if let Some((next_start_key, _)) = maybe_next_item {
let iter_mode = IteratorMode::From(
next_start_key.as_slice(),
rocksdb::Direction::Reverse,
);
let prefix = prefix.to_vec();
self
._iter_all(column, ReadOptions::default(), iter_mode)
// Skip the element under the `next_start_key` key.
.skip(1)
.take_while(move |item| {
if let Ok((key, _)) = item {
key.starts_with(prefix.as_slice())
} else {
true
}
})
.into_boxed()
} else {
// No next item, so we can start backward iteration from the end.
let prefix = prefix.to_vec();
self._iter_all(column, ReadOptions::default(), IteratorMode::End)
.take_while(move |item| {
if let Ok((key, _)) = item {
key.starts_with(prefix.as_slice())
} else {
true
}
})
.into_boxed()
}
}

fn _iter_all(
&self,
column: Column,
Expand Down Expand Up @@ -321,13 +378,19 @@ impl KeyValueStore for RocksDb {
.into_boxed()
}
(Some(prefix), None) => {
// start iterating in a certain direction within the keyspace
let iter_mode =
IteratorMode::From(prefix, convert_to_rocksdb_direction(direction));
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);
if direction == IterDirection::Reverse {
self.reverse_prefix_iter(prefix, column).into_boxed()
} else {
// start iterating in a certain direction within the keyspace
let iter_mode = IteratorMode::From(
prefix,
convert_to_rocksdb_direction(direction),
);
let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true);

self._iter_all(column, opts, iter_mode).into_boxed()
self._iter_all(column, opts, iter_mode).into_boxed()
}
}
(None, Some(start)) => {
// start iterating in a certain direction from the start key
Expand Down Expand Up @@ -503,6 +566,17 @@ impl TransactableStorage for RocksDb {
}
}

/// The `None` means overflow, so there is not following prefix.
fn next_prefix(mut prefix: Vec<u8>) -> Option<Vec<u8>> {
for byte in prefix.iter_mut().rev() {
if let Some(new_byte) = byte.checked_add(1) {
*byte = new_byte;
return Some(prefix)
}
}
None
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
7 changes: 1 addition & 6 deletions tests/tests/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ async fn test_contract_balance(
#[rstest]
#[tokio::test]
async fn test_5_contract_balances(
#[values(PageDirection::Forward)] direction: PageDirection,
// #[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
// Rocksdb doesn't support reverse seeks using a prefix, we'd need to implement a custom
// comparator to support this usecase.
// > One common bug of using prefix iterating is to use prefix mode to iterate in reverse order. But it is not yet supported.
// https://github.com/facebook/rocksdb/wiki/Prefix-Seek#limitation
#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
) {
let mut test_builder = TestSetupBuilder::new(SEED);
let (_, contract_id) = test_builder.setup_contract(
Expand Down
4 changes: 1 addition & 3 deletions tests/tests/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,7 @@ async fn messages_by_owner_returns_messages_for_the_given_owner() {
#[rstest]
#[tokio::test]
async fn messages_empty_results_for_owner_with_no_messages(
#[values(PageDirection::Forward)] direction: PageDirection,
//#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
// reverse iteration with prefix not supported by rocksdb
#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection,
#[values(Address::new([16; 32]), Address::new([0; 32]))] owner: Address,
) {
let srv = FuelService::new_node(Config::local_node()).await.unwrap();
Expand Down
50 changes: 38 additions & 12 deletions tests/tests/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,12 @@ async fn get_transactions() {
assert!(response.has_previous_page);
}

#[test_case::test_case(PageDirection::Forward; "forward")]
#[test_case::test_case(PageDirection::Backward; "backward")]
#[tokio::test]
async fn get_transactions_by_owner_forward_and_backward_iterations() {
async fn get_transactions_by_owner_returns_correct_number_of_results(
direction: PageDirection,
) {
let alice = Address::from([1; 32]);
let bob = Address::from([2; 32]);

Expand All @@ -397,7 +401,7 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() {
let all_transactions_forward = PaginationRequest {
cursor: None,
results: 10,
direction: PageDirection::Forward,
direction,
};
let response = client
.transactions_by_owner(&bob, all_transactions_forward)
Expand All @@ -412,24 +416,46 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() {
})
.collect_vec();
assert_eq!(transactions_forward.len(), 5);
}

#[test_case::test_case(PageDirection::Forward; "forward")]
#[test_case::test_case(PageDirection::Backward; "backward")]
#[tokio::test]
async fn get_transactions_by_owner_supports_cursor(direction: PageDirection) {
let alice = Address::from([1; 32]);
let bob = Address::from([2; 32]);

let mut context = TestContext::new(100).await;
let _ = context.transfer(alice, bob, 1).await.unwrap();
let _ = context.transfer(alice, bob, 2).await.unwrap();
let _ = context.transfer(alice, bob, 3).await.unwrap();
let _ = context.transfer(alice, bob, 4).await.unwrap();
let _ = context.transfer(alice, bob, 5).await.unwrap();

let client = context.client;

let all_transactions_backward = PaginationRequest {
let all_transactions_forward = PaginationRequest {
cursor: None,
results: 10,
direction: PageDirection::Backward,
direction,
};
let response = client
.transactions_by_owner(&bob, all_transactions_backward)
.await;
// Backward request is not supported right now.
assert!(response.is_err());

///////////////// Iteration
.transactions_by_owner(&bob, all_transactions_forward)
.await
.unwrap();
let transactions_forward = response
.results
.into_iter()
.map(|tx| {
assert!(matches!(tx.status, TransactionStatus::Success { .. }));
tx.transaction
})
.collect_vec();

let forward_iter_three = PaginationRequest {
cursor: None,
results: 3,
direction: PageDirection::Forward,
direction,
};
let response_after_iter_three = client
.transactions_by_owner(&bob, forward_iter_three)
Expand All @@ -451,7 +477,7 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() {
let forward_iter_next_two = PaginationRequest {
cursor: response_after_iter_three.cursor.clone(),
results: 2,
direction: PageDirection::Forward,
direction,
};
let response = client
.transactions_by_owner(&bob, forward_iter_next_two)
Expand Down

0 comments on commit ce0392c

Please sign in to comment.