Skip to content

Commit

Permalink
Use own codec for CoinsToSpendIndexKey
Browse files Browse the repository at this point in the history
  • Loading branch information
xgreenx committed Jan 2, 2025
1 parent f776cb4 commit 6c26bc2
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 395 deletions.
154 changes: 38 additions & 116 deletions crates/fuel-core/src/coins_query.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::{
fuel_core_graphql_api::database::ReadView,
graphql_api::{
ports::CoinsToSpendIndexIter,
storage::coins::{
CoinsToSpendIndexEntry,
IndexedCoinType,
},
fuel_core_graphql_api::{
database::ReadView,
storage::coins::CoinsToSpendIndexKey,
},
graphql_api::ports::CoinsToSpendIndexIter,
query::asset_query::{
AssetQuery,
AssetSpendTarget,
Expand Down Expand Up @@ -41,7 +38,6 @@ use rand::prelude::*;
use std::{
cmp::Reverse,
collections::HashSet,
ops::Deref,
};
use thiserror::Error;

Expand Down Expand Up @@ -289,7 +285,7 @@ pub async fn select_coins_to_spend(
asset_id: &AssetId,
excluded_ids: &ExcludedCoinIds<'_>,
batch_size: usize,
) -> Result<Vec<CoinsToSpendIndexEntry>, CoinsQueryError> {
) -> Result<Vec<CoinsToSpendIndexKey>, CoinsQueryError> {
const TOTAL_AMOUNT_ADJUSTMENT_FACTOR: u64 = 2;
if total == 0 || max == 0 {
return Err(CoinsQueryError::IncorrectQueryParameters {
Expand Down Expand Up @@ -351,124 +347,65 @@ pub async fn select_coins_to_spend(
.collect())
}

// This is the `CoinsToSpendIndexEntry` which is guaranteed to have a key
// which allows to properly decode the amount.
struct CheckedCoinsToSpendIndexEntry {
inner: CoinsToSpendIndexEntry,
amount: u64,
}

impl TryFrom<CoinsToSpendIndexEntry> for CheckedCoinsToSpendIndexEntry {
type Error = CoinsQueryError;

fn try_from(value: CoinsToSpendIndexEntry) -> Result<Self, Self::Error> {
let amount = value
.0
.amount()
.ok_or(CoinsQueryError::IncorrectCoinsToSpendIndexKey)?;
Ok(Self {
inner: value,
amount,
})
}
}

impl From<CheckedCoinsToSpendIndexEntry> for CoinsToSpendIndexEntry {
fn from(value: CheckedCoinsToSpendIndexEntry) -> Self {
value.inner
}
}

impl Deref for CheckedCoinsToSpendIndexEntry {
type Target = CoinsToSpendIndexEntry;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

async fn big_coins(
big_coins_stream: impl Stream<Item = StorageResult<CoinsToSpendIndexEntry>> + Unpin,
big_coins_stream: impl Stream<Item = StorageResult<CoinsToSpendIndexKey>> + Unpin,
total: u64,
max: u16,
excluded_ids: &ExcludedCoinIds<'_>,
) -> Result<(u64, Vec<CheckedCoinsToSpendIndexEntry>), CoinsQueryError> {
select_coins_until(
big_coins_stream,
max,
excluded_ids,
|_, total_so_far| total_so_far >= total,
CheckedCoinsToSpendIndexEntry::try_from,
)
) -> Result<(u64, Vec<CoinsToSpendIndexKey>), CoinsQueryError> {
select_coins_until(big_coins_stream, max, excluded_ids, |_, total_so_far| {
total_so_far >= total
})
.await
}

async fn dust_coins(
dust_coins_stream: impl Stream<Item = StorageResult<CoinsToSpendIndexEntry>> + Unpin,
last_big_coin: &CoinsToSpendIndexEntry,
dust_coins_stream: impl Stream<Item = StorageResult<CoinsToSpendIndexKey>> + Unpin,
last_big_coin: &CoinsToSpendIndexKey,
max_dust_count: u16,
excluded_ids: &ExcludedCoinIds<'_>,
) -> Result<(u64, Vec<CoinsToSpendIndexEntry>), CoinsQueryError> {
) -> Result<(u64, Vec<CoinsToSpendIndexKey>), CoinsQueryError> {
select_coins_until(
dust_coins_stream,
max_dust_count,
excluded_ids,
|coin, _| coin == last_big_coin,
Ok::<CoinsToSpendIndexEntry, CoinsQueryError>,
)
.await
}

async fn select_coins_until<Pred, Ret, Mapper, E>(
mut coins_stream: impl Stream<Item = StorageResult<CoinsToSpendIndexEntry>> + Unpin,
async fn select_coins_until<Pred>(
mut coins_stream: impl Stream<Item = StorageResult<CoinsToSpendIndexKey>> + Unpin,
max: u16,
excluded_ids: &ExcludedCoinIds<'_>,
predicate: Pred,
mapper: Mapper,
) -> Result<(u64, Vec<Ret>), CoinsQueryError>
) -> Result<(u64, Vec<CoinsToSpendIndexKey>), CoinsQueryError>
where
Pred: Fn(&CoinsToSpendIndexEntry, u64) -> bool,
Mapper: Fn(CoinsToSpendIndexEntry) -> Result<Ret, E>,
E: From<CoinsQueryError>,
Pred: Fn(&CoinsToSpendIndexKey, u64) -> bool,
{
let mut coins_total_value: u64 = 0;
let mut coins = Vec::with_capacity(max as usize);
while let Some(coin) = coins_stream.next().await {
let coin = coin?;
if !is_excluded(&coin, excluded_ids)? {
if !is_excluded(&coin, excluded_ids) {
if coins.len() >= max as usize || predicate(&coin, coins_total_value) {
break;
}
let amount = coin
.0
.amount()
.ok_or(CoinsQueryError::IncorrectCoinsToSpendIndexKey)?;
let amount = coin.amount();
coins_total_value = coins_total_value.saturating_add(amount);
coins.push(
mapper(coin)
.map_err(|_| CoinsQueryError::IncorrectCoinsToSpendIndexKey)?,
);
coins.push(coin);
}
}
Ok((coins_total_value, coins))
}

fn is_excluded(
(key, coin_type): &CoinsToSpendIndexEntry,
excluded_ids: &ExcludedCoinIds,
) -> Result<bool, CoinsQueryError> {
match coin_type {
IndexedCoinType::Coin => {
let utxo = key
.try_into()
.map_err(|_| CoinsQueryError::IncorrectCoinForeignKeyInIndex)?;
Ok(excluded_ids.is_coin_excluded(&utxo))
fn is_excluded(key: &CoinsToSpendIndexKey, excluded_ids: &ExcludedCoinIds) -> bool {
match key {
CoinsToSpendIndexKey::Coin { utxo_id, .. } => {
excluded_ids.is_coin_excluded(utxo_id)
}
IndexedCoinType::Message => {
let nonce = key
.try_into()
.map_err(|_| CoinsQueryError::IncorrectMessageForeignKeyInIndex)?;
Ok(excluded_ids.is_message_excluded(&nonce))
CoinsToSpendIndexKey::Message { nonce, .. } => {
excluded_ids.is_message_excluded(nonce)
}
}
}
Expand All @@ -479,12 +416,12 @@ fn max_dust_count(max: u16, big_coins_len: u16) -> u16 {
}

fn skip_big_coins_up_to_amount(
big_coins: impl IntoIterator<Item = CheckedCoinsToSpendIndexEntry>,
big_coins: impl IntoIterator<Item = CoinsToSpendIndexKey>,
skipped_amount: u64,
) -> impl Iterator<Item = CheckedCoinsToSpendIndexEntry> {
) -> impl Iterator<Item = CoinsToSpendIndexKey> {
let mut current_dust_coins_value = skipped_amount;
big_coins.into_iter().skip_while(move |item| {
let item_amount = item.amount;
let item_amount = item.amount();
current_dust_coins_value
.checked_sub(item_amount)
.map(|new_value| {
Expand Down Expand Up @@ -1150,22 +1087,16 @@ mod tests {
select_coins_to_spend,
select_coins_until,
CoinsQueryError,
CoinsToSpendIndexEntry,
CoinsToSpendIndexKey,
ExcludedCoinIds,
},
graphql_api::{
ports::CoinsToSpendIndexIter,
storage::coins::{
CoinsToSpendIndexKey,
IndexedCoinType,
},
},
graphql_api::ports::CoinsToSpendIndexIter,
};

const BATCH_SIZE: usize = 1;

struct TestCoinSpec {
index_entry: Result<CoinsToSpendIndexEntry, fuel_core_storage::Error>,
index_entry: Result<CoinsToSpendIndexKey, fuel_core_storage::Error>,
utxo_id: UtxoId,
}

Expand All @@ -1186,10 +1117,7 @@ mod tests {
};

TestCoinSpec {
index_entry: Ok((
CoinsToSpendIndexKey::from_coin(&coin),
IndexedCoinType::Coin,
)),
index_entry: Ok(CoinsToSpendIndexKey::from_coin(&coin)),
utxo_id,
}
})
Expand All @@ -1215,7 +1143,6 @@ mod tests {
MAX,
&excluded,
|_, _| false,
Ok::<CoinsToSpendIndexEntry, CoinsQueryError>,
)
.await
.expect("should select coins");
Expand Down Expand Up @@ -1247,7 +1174,6 @@ mod tests {
MAX,
&excluded,
|_, _| false,
Ok::<CoinsToSpendIndexEntry, CoinsQueryError>,
)
.await
.expect("should select coins");
Expand All @@ -1271,7 +1197,7 @@ mod tests {

let excluded = ExcludedCoinIds::new(std::iter::empty(), std::iter::empty());

let predicate: fn(&CoinsToSpendIndexEntry, u64) -> bool =
let predicate: fn(&CoinsToSpendIndexKey, u64) -> bool =
|_, total| total > TOTAL;

// When
Expand All @@ -1280,7 +1206,6 @@ mod tests {
MAX,
&excluded,
predicate,
Ok::<CoinsToSpendIndexEntry, CoinsQueryError>,
)
.await
.expect("should select coins");
Expand Down Expand Up @@ -1329,14 +1254,14 @@ mod tests {

let mut results = result
.into_iter()
.map(|(key, _)| key.amount())
.map(|key| key.amount())
.collect::<Vec<_>>();

// Then

// Because we select a total of 202 (TOTAL * 2), first 3 coins should always selected (100, 100, 4).
let expected = vec![100, 100, 4];
let actual: Vec<_> = results.drain(..3).map(Option::unwrap).collect();
let actual: Vec<_> = results.drain(..3).collect();
assert_eq!(expected, actual);

// The number of dust coins is selected randomly, so we might have:
Expand All @@ -1349,7 +1274,7 @@ mod tests {
let expected_1: Vec<u64> = vec![];
let expected_2: Vec<u64> = vec![2];
let expected_3: Vec<u64> = vec![2, 3];
let actual: Vec<_> = results.drain(..).map(Option::unwrap).collect();
let actual: Vec<_> = results;

assert!(
actual == expected_1 || actual == expected_2 || actual == expected_3,
Expand Down Expand Up @@ -1390,10 +1315,7 @@ mod tests {
.expect("should not error");

// Then
let results: Vec<_> = result
.into_iter()
.map(|(key, _)| key.amount().unwrap())
.collect();
let results: Vec<_> = result.into_iter().map(|key| key.amount()).collect();
assert_eq!(results, vec![10, 10]);
}

Expand Down
15 changes: 7 additions & 8 deletions crates/fuel-core/src/graphql_api/indexation/coins_to_spend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::graphql_api::{
storage::coins::{
CoinsToSpendIndex,
CoinsToSpendIndexKey,
IndexedCoinType,
},
};

Expand All @@ -32,7 +31,7 @@ where
{
let key = CoinsToSpendIndexKey::from_coin(coin);
let storage = block_st_transaction.storage::<CoinsToSpendIndex>();
let maybe_old_value = storage.replace(&key, &IndexedCoinType::Coin)?;
let maybe_old_value = storage.replace(&key, &())?;
if maybe_old_value.is_some() {
return Err(IndexationError::CoinToSpendAlreadyIndexed {
owner: coin.owner,
Expand Down Expand Up @@ -75,7 +74,7 @@ where
{
let key = CoinsToSpendIndexKey::from_message(message, base_asset_id);
let storage = block_st_transaction.storage::<CoinsToSpendIndex>();
let maybe_old_value = storage.replace(&key, &IndexedCoinType::Message)?;
let maybe_old_value = storage.replace(&key, &())?;
if maybe_old_value.is_some() {
return Err(IndexationError::MessageToSpendAlreadyIndexed {
owner: *message.recipient(),
Expand Down Expand Up @@ -191,10 +190,10 @@ mod tests {
.map(|entry| entry.expect("should read entries"))
.map(|entry| {
(
entry.key.owner().unwrap(),
entry.key.asset_id().unwrap(),
[entry.key.retryable_flag().unwrap()],
entry.key.amount().unwrap(),
*entry.key.owner(),
*entry.key.asset_id(),
[entry.key.retryable_flag()],
entry.key.amount(),
)
})
.collect();
Expand Down Expand Up @@ -727,7 +726,7 @@ mod tests {
.entries::<CoinsToSpendIndex>(None, IterDirection::Forward)
.map(|entry| entry.expect("should read entries"))
.map(|entry|
entry.key.amount().unwrap(),
entry.key.amount(),
)
.collect();

Expand Down
11 changes: 4 additions & 7 deletions crates/fuel-core/src/graphql_api/ports.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use super::storage::balances::TotalBalanceAmount;
use crate::fuel_core_graphql_api::storage::coins::CoinsToSpendIndexKey;
use async_trait::async_trait;
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::{
Expand Down Expand Up @@ -64,14 +66,9 @@ use fuel_core_types::{
};
use std::sync::Arc;

use super::storage::{
balances::TotalBalanceAmount,
coins::CoinsToSpendIndexEntry,
};

pub struct CoinsToSpendIndexIter<'a> {
pub big_coins_iter: BoxedIter<'a, Result<CoinsToSpendIndexEntry, StorageError>>,
pub dust_coins_iter: BoxedIter<'a, Result<CoinsToSpendIndexEntry, StorageError>>,
pub big_coins_iter: BoxedIter<'a, Result<CoinsToSpendIndexKey, StorageError>>,
pub dust_coins_iter: BoxedIter<'a, Result<CoinsToSpendIndexKey, StorageError>>,
}

pub trait OffChainDatabase: Send + Sync {
Expand Down
Loading

0 comments on commit 6c26bc2

Please sign in to comment.