From ce0392c13beb099dc9c7402ca8420dff74f5b80e Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Mon, 20 Nov 2023 22:16:48 +0300 Subject: [PATCH] Support backward iteration in the RocksDB (#1492) Reverting the change https://github.com/FuelLabs/fuel-core/pull/1004 and adds support for reverse prefix iteration in the RocksDB. --------- Co-authored-by: Mitchell Turner --- CHANGELOG.md | 1 + crates/fuel-core/src/schema/coins.rs | 6 -- crates/fuel-core/src/schema/contract.rs | 8 --- crates/fuel-core/src/schema/message.rs | 8 --- crates/fuel-core/src/schema/tx.rs | 8 --- crates/fuel-core/src/state/rocks_db.rs | 86 +++++++++++++++++++++++-- tests/tests/contract.rs | 7 +- tests/tests/messages.rs | 4 +- tests/tests/tx.rs | 50 ++++++++++---- 9 files changed, 121 insertions(+), 57 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5668ab0ef55..c51aec1ed33 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Description of the upcoming release here. ### Added +- [#1492](https://github.com/FuelLabs/fuel-core/pull/1492): Support backward iteration in the RocksDB. It allows backward queries that were not allowed before. - [#1490](https://github.com/FuelLabs/fuel-core/pull/1490): Add push and pop benchmarks. - [#1485](https://github.com/FuelLabs/fuel-core/pull/1485): Prepare rc release of fuel core v0.21 - [#1476](https://github.com/FuelLabs/fuel-core/pull/1453): Add the majority of the "other" benchmarks for contract opcodes. diff --git a/crates/fuel-core/src/schema/coins.rs b/crates/fuel-core/src/schema/coins.rs index 81f8f587a3c..60a75add8f9 100644 --- a/crates/fuel-core/src/schema/coins.rs +++ b/crates/fuel-core/src/schema/coins.rs @@ -21,7 +21,6 @@ use crate::{ U64, }, }; -use anyhow::anyhow; use async_graphql::{ connection::{ Connection, @@ -167,11 +166,6 @@ impl CoinQuery { last: Option, before: Option, ) -> async_graphql::Result> { - // Rocksdb doesn't support reverse iteration over a prefix - if matches!(last, Some(last) if last > 0) { - return Err(anyhow!("reverse pagination isn't supported for this coins").into()) - } - let query: &Database = ctx.data_unchecked(); crate::schema::query_pagination(after, before, first, last, |start, direction| { let owner: fuel_tx::Address = filter.owner.into(); diff --git a/crates/fuel-core/src/schema/contract.rs b/crates/fuel-core/src/schema/contract.rs index 331908be412..2409041925d 100644 --- a/crates/fuel-core/src/schema/contract.rs +++ b/crates/fuel-core/src/schema/contract.rs @@ -12,7 +12,6 @@ use crate::{ U64, }, }; -use anyhow::anyhow; use async_graphql::{ connection::{ Connection, @@ -138,13 +137,6 @@ impl ContractBalanceQuery { > { let query: &Database = ctx.data_unchecked(); - // Rocksdb doesn't support reverse iteration over a prefix - if matches!(last, Some(last) if last > 0) { - return Err( - anyhow!("reverse pagination isn't supported for this resource").into(), - ) - } - crate::schema::query_pagination(after, before, first, last, |start, direction| { let balances = query .contract_balances( diff --git a/crates/fuel-core/src/schema/message.rs b/crates/fuel-core/src/schema/message.rs index 95cc97e3bde..75707190e22 100644 --- a/crates/fuel-core/src/schema/message.rs +++ b/crates/fuel-core/src/schema/message.rs @@ -89,14 +89,6 @@ impl MessageQuery { }; let messages = if let Some(owner) = owner { - // Rocksdb doesn't support reverse iteration over a prefix - if matches!(last, Some(last) if last > 0) { - return Err(anyhow!( - "reverse pagination isn't supported for this resource" - ) - .into()) - } - query.owned_messages(&owner.0, start, direction) } else { query.all_messages(start, direction) diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index 813fcd9911e..ca3664de238 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -22,7 +22,6 @@ use crate::{ TxPointer, }, }; -use anyhow::anyhow; use async_graphql::{ connection::{ Connection, @@ -168,13 +167,6 @@ impl TxQuery { before: Option, ) -> async_graphql::Result> { - // Rocksdb doesn't support reverse iteration over a prefix - if matches!(last, Some(last) if last > 0) { - return Err( - anyhow!("reverse pagination isn't supported for this resource").into(), - ) - } - let query: &Database = ctx.data_unchecked(); let config = ctx.data_unchecked::(); let owner = fuel_types::Address::from(owner); diff --git a/crates/fuel-core/src/state/rocks_db.rs b/crates/fuel-core/src/state/rocks_db.rs index e02d3d78edf..c6c6ca36190 100644 --- a/crates/fuel-core/src/state/rocks_db.rs +++ b/crates/fuel-core/src/state/rocks_db.rs @@ -224,6 +224,63 @@ impl RocksDb { opts } + /// RocksDB prefix iteration doesn't support reverse order, + /// but seeking the start key and iterating in reverse order works. + /// So we can create a workaround. We need to find the next available + /// element and use it as an anchor for reverse iteration, + /// but skip the first element to jump on the previous prefix. + /// If we can't find the next element, we are at the end of the list, + /// so we can use `IteratorMode::End` to start reverse iteration. + fn reverse_prefix_iter( + &self, + prefix: &[u8], + column: Column, + ) -> impl Iterator + '_ { + let maybe_next_item = next_prefix(prefix.to_vec()) + .and_then(|next_prefix| { + self.iter_all( + column, + Some(next_prefix.as_slice()), + None, + IterDirection::Forward, + ) + .next() + }) + .and_then(|res| res.ok()); + + if let Some((next_start_key, _)) = maybe_next_item { + let iter_mode = IteratorMode::From( + next_start_key.as_slice(), + rocksdb::Direction::Reverse, + ); + let prefix = prefix.to_vec(); + self + ._iter_all(column, ReadOptions::default(), iter_mode) + // Skip the element under the `next_start_key` key. + .skip(1) + .take_while(move |item| { + if let Ok((key, _)) = item { + key.starts_with(prefix.as_slice()) + } else { + true + } + }) + .into_boxed() + } else { + // No next item, so we can start backward iteration from the end. + let prefix = prefix.to_vec(); + self._iter_all(column, ReadOptions::default(), IteratorMode::End) + .take_while(move |item| { + if let Ok((key, _)) = item { + key.starts_with(prefix.as_slice()) + } else { + true + } + }) + .into_boxed() + } + } + fn _iter_all( &self, column: Column, @@ -321,13 +378,19 @@ impl KeyValueStore for RocksDb { .into_boxed() } (Some(prefix), None) => { - // start iterating in a certain direction within the keyspace - let iter_mode = - IteratorMode::From(prefix, convert_to_rocksdb_direction(direction)); - let mut opts = ReadOptions::default(); - opts.set_prefix_same_as_start(true); + if direction == IterDirection::Reverse { + self.reverse_prefix_iter(prefix, column).into_boxed() + } else { + // start iterating in a certain direction within the keyspace + let iter_mode = IteratorMode::From( + prefix, + convert_to_rocksdb_direction(direction), + ); + let mut opts = ReadOptions::default(); + opts.set_prefix_same_as_start(true); - self._iter_all(column, opts, iter_mode).into_boxed() + self._iter_all(column, opts, iter_mode).into_boxed() + } } (None, Some(start)) => { // start iterating in a certain direction from the start key @@ -503,6 +566,17 @@ impl TransactableStorage for RocksDb { } } +/// The `None` means overflow, so there is not following prefix. +fn next_prefix(mut prefix: Vec) -> Option> { + for byte in prefix.iter_mut().rev() { + if let Some(new_byte) = byte.checked_add(1) { + *byte = new_byte; + return Some(prefix) + } + } + None +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests/tests/contract.rs b/tests/tests/contract.rs index b72da64bf9a..9511d6ff32b 100644 --- a/tests/tests/contract.rs +++ b/tests/tests/contract.rs @@ -57,12 +57,7 @@ async fn test_contract_balance( #[rstest] #[tokio::test] async fn test_5_contract_balances( - #[values(PageDirection::Forward)] direction: PageDirection, - // #[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection, - // Rocksdb doesn't support reverse seeks using a prefix, we'd need to implement a custom - // comparator to support this usecase. - // > One common bug of using prefix iterating is to use prefix mode to iterate in reverse order. But it is not yet supported. - // https://github.com/facebook/rocksdb/wiki/Prefix-Seek#limitation + #[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection, ) { let mut test_builder = TestSetupBuilder::new(SEED); let (_, contract_id) = test_builder.setup_contract( diff --git a/tests/tests/messages.rs b/tests/tests/messages.rs index 7ba51d35d46..2ea755e8792 100644 --- a/tests/tests/messages.rs +++ b/tests/tests/messages.rs @@ -172,9 +172,7 @@ async fn messages_by_owner_returns_messages_for_the_given_owner() { #[rstest] #[tokio::test] async fn messages_empty_results_for_owner_with_no_messages( - #[values(PageDirection::Forward)] direction: PageDirection, - //#[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection, - // reverse iteration with prefix not supported by rocksdb + #[values(PageDirection::Forward, PageDirection::Backward)] direction: PageDirection, #[values(Address::new([16; 32]), Address::new([0; 32]))] owner: Address, ) { let srv = FuelService::new_node(Config::local_node()).await.unwrap(); diff --git a/tests/tests/tx.rs b/tests/tests/tx.rs index fcc08abe88f..bd28aeb37e4 100644 --- a/tests/tests/tx.rs +++ b/tests/tests/tx.rs @@ -380,8 +380,12 @@ async fn get_transactions() { assert!(response.has_previous_page); } +#[test_case::test_case(PageDirection::Forward; "forward")] +#[test_case::test_case(PageDirection::Backward; "backward")] #[tokio::test] -async fn get_transactions_by_owner_forward_and_backward_iterations() { +async fn get_transactions_by_owner_returns_correct_number_of_results( + direction: PageDirection, +) { let alice = Address::from([1; 32]); let bob = Address::from([2; 32]); @@ -397,7 +401,7 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() { let all_transactions_forward = PaginationRequest { cursor: None, results: 10, - direction: PageDirection::Forward, + direction, }; let response = client .transactions_by_owner(&bob, all_transactions_forward) @@ -412,24 +416,46 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() { }) .collect_vec(); assert_eq!(transactions_forward.len(), 5); +} + +#[test_case::test_case(PageDirection::Forward; "forward")] +#[test_case::test_case(PageDirection::Backward; "backward")] +#[tokio::test] +async fn get_transactions_by_owner_supports_cursor(direction: PageDirection) { + let alice = Address::from([1; 32]); + let bob = Address::from([2; 32]); + + let mut context = TestContext::new(100).await; + let _ = context.transfer(alice, bob, 1).await.unwrap(); + let _ = context.transfer(alice, bob, 2).await.unwrap(); + let _ = context.transfer(alice, bob, 3).await.unwrap(); + let _ = context.transfer(alice, bob, 4).await.unwrap(); + let _ = context.transfer(alice, bob, 5).await.unwrap(); + + let client = context.client; - let all_transactions_backward = PaginationRequest { + let all_transactions_forward = PaginationRequest { cursor: None, results: 10, - direction: PageDirection::Backward, + direction, }; let response = client - .transactions_by_owner(&bob, all_transactions_backward) - .await; - // Backward request is not supported right now. - assert!(response.is_err()); - - ///////////////// Iteration + .transactions_by_owner(&bob, all_transactions_forward) + .await + .unwrap(); + let transactions_forward = response + .results + .into_iter() + .map(|tx| { + assert!(matches!(tx.status, TransactionStatus::Success { .. })); + tx.transaction + }) + .collect_vec(); let forward_iter_three = PaginationRequest { cursor: None, results: 3, - direction: PageDirection::Forward, + direction, }; let response_after_iter_three = client .transactions_by_owner(&bob, forward_iter_three) @@ -451,7 +477,7 @@ async fn get_transactions_by_owner_forward_and_backward_iterations() { let forward_iter_next_two = PaginationRequest { cursor: response_after_iter_three.cursor.clone(), results: 2, - direction: PageDirection::Forward, + direction, }; let response = client .transactions_by_owner(&bob, forward_iter_next_two)