diff --git a/CHANGELOG.md b/CHANGELOG.md index 31a3b3847d4..40dd32ecc4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). #### Breaking - [2389](https://github.com/FuelLabs/fuel-core/pull/2258): Updated the `messageProof` GraphQL schema to return a non-nullable `MessageProof`. +- [2383](https://github.com/FuelLabs/fuel-core/pull/2383): Asset balance queries now return U128 instead of U64. - [2154](https://github.com/FuelLabs/fuel-core/pull/2154): Transaction graphql endpoints use `TransactionType` instead of `fuel_tx::Transaction`. - [2446](https://github.com/FuelLabs/fuel-core/pull/2446): Use graphiql instead of graphql-playground due to known vulnerability and stale development. - [2379](https://github.com/FuelLabs/fuel-core/issues/2379): Change `kv_store::Value` to be `Arc<[u8]>` instead of `Arc>`. diff --git a/bin/e2e-test-client/src/test_context.rs b/bin/e2e-test-client/src/test_context.rs index 1cb6bcb8b07..d5e98c121d4 100644 --- a/bin/e2e-test-client/src/test_context.rs +++ b/bin/e2e-test-client/src/test_context.rs @@ -99,7 +99,7 @@ impl Wallet { } /// returns the balance associated with a wallet - pub async fn balance(&self, asset_id: Option) -> anyhow::Result { + pub async fn balance(&self, asset_id: Option) -> anyhow::Result { self.client .balance(&self.address, Some(&asset_id.unwrap_or_default())) .await diff --git a/crates/client/assets/schema.sdl b/crates/client/assets/schema.sdl index 54db4ec0995..0cd31fdb34f 100644 --- a/crates/client/assets/schema.sdl +++ b/crates/client/assets/schema.sdl @@ -4,7 +4,7 @@ scalar AssetId type Balance { owner: Address! - amount: U64! + amount: U128! assetId: AssetId! } @@ -1259,6 +1259,8 @@ enum TxParametersVersion { scalar TxPointer +scalar U128 + scalar U16 scalar U32 diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 6cd9023276f..aa41567b384 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1069,7 +1069,7 @@ impl FuelClient { &self, owner: &Address, asset_id: Option<&AssetId>, - ) -> io::Result { + ) -> io::Result { let owner: schema::Address = (*owner).into(); let asset_id: schema::AssetId = match asset_id { Some(asset_id) => (*asset_id).into(), diff --git a/crates/client/src/client/schema/balance.rs b/crates/client/src/client/schema/balance.rs index 89c5dbca32d..c20da989f62 100644 --- a/crates/client/src/client/schema/balance.rs +++ b/crates/client/src/client/schema/balance.rs @@ -4,7 +4,7 @@ use crate::client::{ Address, AssetId, PageInfo, - U64, + U128, }, PageDirection, PaginationRequest, @@ -99,7 +99,7 @@ pub struct BalanceEdge { #[cynic(schema_path = "./assets/schema.sdl")] pub struct Balance { pub owner: Address, - pub amount: U64, + pub amount: U128, pub asset_id: AssetId, } diff --git a/crates/client/src/client/schema/primitives.rs b/crates/client/src/client/schema/primitives.rs index 1559c835844..4c2852f852a 100644 --- a/crates/client/src/client/schema/primitives.rs +++ b/crates/client/src/client/schema/primitives.rs @@ -272,6 +272,7 @@ macro_rules! number_scalar { }; } +number_scalar!(U128, u128); number_scalar!(U64, u64); number_scalar!(U32, u32); number_scalar!(U16, u16); diff --git a/crates/client/src/client/types/balance.rs b/crates/client/src/client/types/balance.rs index 334fc5dec46..3220d9c036c 100644 --- a/crates/client/src/client/types/balance.rs +++ b/crates/client/src/client/types/balance.rs @@ -10,7 +10,7 @@ use crate::client::{ #[derive(Clone, Copy, Debug, PartialEq)] pub struct Balance { pub owner: Address, - pub amount: u64, + pub amount: u128, pub asset_id: AssetId, } diff --git a/crates/fuel-core/src/coins_query.rs b/crates/fuel-core/src/coins_query.rs index 3d7abda01bc..cc93928b5ed 100644 --- a/crates/fuel-core/src/coins_query.rs +++ b/crates/fuel-core/src/coins_query.rs @@ -986,6 +986,7 @@ mod tests { let on_chain = self.database.on_chain().clone(); let off_chain = self.database.off_chain().clone(); ServiceDatabase::new(100, 0u32.into(), on_chain, off_chain) + .expect("should create service database") } } diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index 50c286ea85b..41d0451f221 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -58,6 +58,7 @@ use fuel_core_types::{ }; use itertools::Itertools; use std::{ + borrow::Cow, fmt::Debug, sync::Arc, }; @@ -66,7 +67,10 @@ pub use fuel_core_database::Error; pub type Result = core::result::Result; // TODO: Extract `Database` and all belongs into `fuel-core-database`. -use crate::database::database_description::gas_price::GasPriceDatabase; +use crate::database::database_description::{ + gas_price::GasPriceDatabase, + indexation_availability, +}; #[cfg(feature = "rocksdb")] use crate::state::{ historical_rocksdb::{ @@ -482,15 +486,13 @@ where ConflictPolicy::Overwrite, changes, ); + let maybe_current_metadata = transaction + .storage_as_mut::>() + .get(&())?; + let metadata = update_metadata::(maybe_current_metadata, new_height); transaction .storage_as_mut::>() - .insert( - &(), - &DatabaseMetadata::V1 { - version: Description::version(), - height: new_height, - }, - )?; + .insert(&(), &metadata)?; transaction.into_changes() } else { @@ -509,6 +511,39 @@ where Ok(()) } +fn update_metadata( + maybe_current_metadata: Option< + Cow::Height>>, + >, + new_height: ::Height, +) -> DatabaseMetadata<::Height> +where + Description: DatabaseDescription, +{ + let updated_metadata = match maybe_current_metadata.as_ref() { + Some(metadata) => match metadata.as_ref() { + DatabaseMetadata::V1 { .. } => DatabaseMetadata::V1 { + version: Description::version(), + height: new_height, + }, + DatabaseMetadata::V2 { + indexation_availability, + .. + } => DatabaseMetadata::V2 { + version: Description::version(), + height: new_height, + indexation_availability: indexation_availability.clone(), + }, + }, + None => DatabaseMetadata::V2 { + version: Description::version(), + height: new_height, + indexation_availability: indexation_availability::(None), + }, + }; + updated_metadata +} + #[cfg(feature = "rocksdb")] pub fn convert_to_rocksdb_direction(direction: IterDirection) -> rocksdb::Direction { match direction { @@ -524,10 +559,6 @@ mod tests { database_description::DatabaseDescription, Database, }; - use fuel_core_storage::{ - tables::FuelBlocks, - StorageAsMut, - }; fn column_keys_not_exceed_count() where @@ -1084,4 +1115,145 @@ mod tests { // rocks db fails test(db); } + + mod metadata { + use crate::database::database_description::IndexationKind; + use fuel_core_storage::kv_store::StorageColumn; + use std::{ + borrow::Cow, + collections::HashSet, + }; + use strum::EnumCount; + + use super::{ + database_description::DatabaseDescription, + update_metadata, + DatabaseHeight, + DatabaseMetadata, + }; + + #[derive(Debug, Copy, Clone, Default, PartialEq, Eq)] + struct HeightMock(u64); + impl DatabaseHeight for HeightMock { + fn as_u64(&self) -> u64 { + 1 + } + + fn advance_height(&self) -> Option { + None + } + + fn rollback_height(&self) -> Option { + None + } + } + + const MOCK_VERSION: u32 = 0; + + #[derive(EnumCount, enum_iterator::Sequence, Debug, Clone, Copy)] + enum ColumnMock { + Column1, + } + + impl StorageColumn for ColumnMock { + fn name(&self) -> String { + "column".to_string() + } + + fn id(&self) -> u32 { + 42 + } + } + + #[derive(Debug, Clone, Copy)] + struct DatabaseDescriptionMock; + impl DatabaseDescription for DatabaseDescriptionMock { + type Column = ColumnMock; + + type Height = HeightMock; + + fn version() -> u32 { + MOCK_VERSION + } + + fn name() -> String { + "mock".to_string() + } + + fn metadata_column() -> Self::Column { + Self::Column::Column1 + } + + fn prefix(_: &Self::Column) -> Option { + None + } + } + + #[test] + fn update_metadata_preserves_v1() { + let current_metadata: DatabaseMetadata = DatabaseMetadata::V1 { + version: MOCK_VERSION, + height: HeightMock(1), + }; + let new_metadata = update_metadata::( + Some(Cow::Borrowed(¤t_metadata)), + HeightMock(2), + ); + + match new_metadata { + DatabaseMetadata::V1 { version, height } => { + assert_eq!(version, current_metadata.version()); + assert_eq!(height, HeightMock(2)); + } + DatabaseMetadata::V2 { .. } => panic!("should be V1"), + } + } + + #[test] + fn update_metadata_preserves_v2() { + let available_indexation = HashSet::new(); + + let current_metadata: DatabaseMetadata = DatabaseMetadata::V2 { + version: MOCK_VERSION, + height: HeightMock(1), + indexation_availability: available_indexation.clone(), + }; + let new_metadata = update_metadata::( + Some(Cow::Borrowed(¤t_metadata)), + HeightMock(2), + ); + + match new_metadata { + DatabaseMetadata::V1 { .. } => panic!("should be V2"), + DatabaseMetadata::V2 { + version, + height, + indexation_availability, + } => { + assert_eq!(version, current_metadata.version()); + assert_eq!(height, HeightMock(2)); + assert_eq!(indexation_availability, available_indexation); + } + } + } + + #[test] + fn update_metadata_none_becomes_v2() { + let new_metadata = + update_metadata::(None, HeightMock(2)); + + match new_metadata { + DatabaseMetadata::V1 { .. } => panic!("should be V2"), + DatabaseMetadata::V2 { + version, + height, + indexation_availability, + } => { + assert_eq!(version, MOCK_VERSION); + assert_eq!(height, HeightMock(2)); + assert_eq!(indexation_availability, IndexationKind::all().collect()); + } + } + } + } } diff --git a/crates/fuel-core/src/database/database_description.rs b/crates/fuel-core/src/database/database_description.rs index 14d240c54f5..efc0f48b5bf 100644 --- a/crates/fuel-core/src/database/database_description.rs +++ b/crates/fuel-core/src/database/database_description.rs @@ -4,6 +4,8 @@ use fuel_core_types::{ blockchain::primitives::DaBlockHeight, fuel_types::BlockHeight, }; +use std::collections::HashSet; +use strum::IntoEnumIterator; pub mod gas_price; pub mod off_chain; @@ -67,10 +69,39 @@ pub trait DatabaseDescription: 'static + Copy + Debug + Send + Sync { fn prefix(column: &Self::Column) -> Option; } +#[derive( + Copy, + Clone, + Debug, + serde::Serialize, + serde::Deserialize, + Eq, + PartialEq, + Hash, + strum::EnumIter, +)] +pub enum IndexationKind { + Balances, +} + +impl IndexationKind { + pub fn all() -> impl Iterator { + Self::iter() + } +} + /// The metadata of the database contains information about the version and its height. -#[derive(Copy, Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub enum DatabaseMetadata { - V1 { version: u32, height: Height }, + V1 { + version: u32, + height: Height, + }, + V2 { + version: u32, + height: Height, + indexation_availability: HashSet, + }, } impl DatabaseMetadata { @@ -78,6 +109,7 @@ impl DatabaseMetadata { pub fn version(&self) -> u32 { match self { Self::V1 { version, .. } => *version, + Self::V2 { version, .. } => *version, } } @@ -85,6 +117,37 @@ impl DatabaseMetadata { pub fn height(&self) -> &Height { match self { Self::V1 { height, .. } => height, + Self::V2 { height, .. } => height, + } + } + + /// Returns true if the given indexation kind is available. + pub fn indexation_available(&self, kind: IndexationKind) -> bool { + match self { + Self::V1 { .. } => false, + Self::V2 { + indexation_availability, + .. + } => indexation_availability.contains(&kind), } } } + +/// Gets the indexation availability from the metadata. +pub fn indexation_availability( + metadata: Option>, +) -> HashSet +where + D: DatabaseDescription, +{ + match metadata { + Some(DatabaseMetadata::V1 { .. }) => HashSet::new(), + Some(DatabaseMetadata::V2 { + indexation_availability, + .. + }) => indexation_availability.clone(), + // If the metadata doesn't exist, it is a new database, + // and we should set all indexation kinds to available. + None => IndexationKind::all().collect(), + } +} diff --git a/crates/fuel-core/src/database/metadata.rs b/crates/fuel-core/src/database/metadata.rs index 72cf2bbedb7..bbc9a0fb353 100644 --- a/crates/fuel-core/src/database/metadata.rs +++ b/crates/fuel-core/src/database/metadata.rs @@ -1,3 +1,7 @@ +use super::database_description::{ + indexation_availability, + IndexationKind, +}; use crate::database::{ database_description::{ DatabaseDescription, @@ -74,4 +78,14 @@ where Ok(metadata) } + + pub fn indexation_available(&self, kind: IndexationKind) -> StorageResult { + let metadata = self + .storage::>() + .get(&())? + .map(|metadata| metadata.into_owned()); + + let indexation_availability = indexation_availability::(metadata); + Ok(indexation_availability.contains(&kind)) + } } diff --git a/crates/fuel-core/src/executor.rs b/crates/fuel-core/src/executor.rs index 186b9fd44b7..e06ff89bd66 100644 --- a/crates/fuel-core/src/executor.rs +++ b/crates/fuel-core/src/executor.rs @@ -607,6 +607,7 @@ mod tests { .next() .unwrap() .unwrap(); + assert_eq!(asset_id, AssetId::zeroed()); assert_eq!(amount, expected_fee_amount_1 + expected_fee_amount_2); } diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index 4e469a205d7..63a6efcf0de 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -11,6 +11,7 @@ use std::{ pub mod api_service; pub mod da_compression; pub mod database; +pub(crate) mod indexation; pub(crate) mod metrics_extension; pub mod ports; pub mod storage; @@ -79,6 +80,9 @@ impl Default for Costs { } pub const DEFAULT_QUERY_COSTS: Costs = Costs { + // TODO: The cost of the `balance` and `balances` query should depend on the + // `OffChainDatabase::balances_enabled` value. If additional indexation is enabled, + // the cost should be cheaper. balance_query: 40001, coins_to_spend: 40001, get_peers: 40001, diff --git a/crates/fuel-core/src/graphql_api/api_service.rs b/crates/fuel-core/src/graphql_api/api_service.rs index aeda75b4518..ed6c1fd724b 100644 --- a/crates/fuel-core/src/graphql_api/api_service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -88,6 +88,7 @@ use tower_http::{ pub type Service = fuel_core_services::ServiceRunner; pub use super::database::ReadDatabase; +use super::ports::worker; pub type BlockProducer = Box; // In the future GraphQL should not be aware of `TxPool`. It should @@ -231,7 +232,7 @@ pub fn new_service( ) -> anyhow::Result where OnChain: AtomicView + 'static, - OffChain: AtomicView + 'static, + OffChain: AtomicView + worker::OffChainDatabase + 'static, OnChain::LatestView: OnChainDatabase, OffChain::LatestView: OffChainDatabase, { @@ -243,7 +244,7 @@ where genesis_block_height, on_database, off_database, - ); + )?; let request_timeout = config.config.api_request_timeout; let concurrency_limit = config.config.max_concurrent_queries; let body_limit = config.config.request_body_bytes_limit; diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs index bf47c8d92a7..6feaaabc5dc 100644 --- a/crates/fuel-core/src/graphql_api/database.rs +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -68,6 +68,8 @@ use std::{ sync::Arc, }; +use super::ports::worker; + mod arc_wrapper; /// The on-chain view of the database used by the [`ReadView`] to fetch on-chain data. @@ -86,6 +88,8 @@ pub struct ReadDatabase { on_chain: Box>, /// The off-chain database view provider. off_chain: Box>, + /// The flag that indicates whether the Balances cache table is enabled. + balances_enabled: bool, } impl ReadDatabase { @@ -95,19 +99,22 @@ impl ReadDatabase { genesis_height: BlockHeight, on_chain: OnChain, off_chain: OffChain, - ) -> Self + ) -> Result where OnChain: AtomicView + 'static, - OffChain: AtomicView + 'static, + OffChain: AtomicView + worker::OffChainDatabase + 'static, OnChain::LatestView: OnChainDatabase, OffChain::LatestView: OffChainDatabase, { - Self { + let balances_enabled = off_chain.balances_enabled()?; + + Ok(Self { batch_size, genesis_height, on_chain: Box::new(ArcWrapper::new(on_chain)), off_chain: Box::new(ArcWrapper::new(off_chain)), - } + balances_enabled, + }) } /// Creates a consistent view of the database. @@ -120,6 +127,7 @@ impl ReadDatabase { genesis_height: self.genesis_height, on_chain: self.on_chain.latest_view()?, off_chain: self.off_chain.latest_view()?, + balances_enabled: self.balances_enabled, }) } @@ -135,6 +143,7 @@ pub struct ReadView { pub(crate) genesis_height: BlockHeight, pub(crate) on_chain: OnChainView, pub(crate) off_chain: OffChainView, + pub(crate) balances_enabled: bool, } impl ReadView { diff --git a/crates/fuel-core/src/graphql_api/indexation.rs b/crates/fuel-core/src/graphql_api/indexation.rs new file mode 100644 index 00000000000..1b490c2f13d --- /dev/null +++ b/crates/fuel-core/src/graphql_api/indexation.rs @@ -0,0 +1,714 @@ +use fuel_core_storage::{ + Error as StorageError, + StorageAsMut, +}; +use fuel_core_types::{ + entities::{ + coins::coin::Coin, + Message, + }, + fuel_tx::{ + Address, + AssetId, + }, + services::executor::Event, +}; + +use super::{ + ports::worker::OffChainDatabaseTransaction, + storage::balances::{ + CoinBalances, + CoinBalancesKey, + MessageBalance, + MessageBalances, + }, +}; + +#[derive(derive_more::From, derive_more::Display, Debug)] +pub enum IndexationError { + #[display( + fmt = "Coin balance would underflow for owner: {}, asset_id: {}, current_amount: {}, requested_deduction: {}", + owner, + asset_id, + current_amount, + requested_deduction + )] + CoinBalanceWouldUnderflow { + owner: Address, + asset_id: AssetId, + current_amount: u128, + requested_deduction: u128, + }, + #[display( + fmt = "Message balance would underflow for owner: {}, current_amount: {}, requested_deduction: {}, retryable: {}", + owner, + current_amount, + requested_deduction, + retryable + )] + MessageBalanceWouldUnderflow { + owner: Address, + current_amount: u128, + requested_deduction: u128, + retryable: bool, + }, + #[from] + StorageError(StorageError), +} + +fn increase_message_balance( + block_st_transaction: &mut T, + message: &Message, +) -> Result<(), IndexationError> +where + T: OffChainDatabaseTransaction, +{ + let key = message.recipient(); + let storage = block_st_transaction.storage::(); + let current_balance = storage.get(key)?.unwrap_or_default().into_owned(); + let MessageBalance { + mut retryable, + mut non_retryable, + } = current_balance; + if message.is_retryable_message() { + retryable = retryable.saturating_add(message.amount() as u128); + } else { + non_retryable = non_retryable.saturating_add(message.amount() as u128); + } + let new_balance = MessageBalance { + retryable, + non_retryable, + }; + + block_st_transaction + .storage::() + .insert(key, &new_balance) + .map_err(Into::into) +} + +fn decrease_message_balance( + block_st_transaction: &mut T, + message: &Message, +) -> Result<(), IndexationError> +where + T: OffChainDatabaseTransaction, +{ + let key = message.recipient(); + let storage = block_st_transaction.storage::(); + let MessageBalance { + retryable, + non_retryable, + } = storage.get(key)?.unwrap_or_default().into_owned(); + let current_balance = if message.is_retryable_message() { + retryable + } else { + non_retryable + }; + + let new_amount = current_balance + .checked_sub(message.amount() as u128) + .ok_or_else(|| IndexationError::MessageBalanceWouldUnderflow { + owner: *message.recipient(), + current_amount: current_balance, + requested_deduction: message.amount() as u128, + retryable: message.is_retryable_message(), + })?; + + let new_balance = if message.is_retryable_message() { + MessageBalance { + retryable: new_amount, + non_retryable, + } + } else { + MessageBalance { + retryable, + non_retryable: new_amount, + } + }; + block_st_transaction + .storage::() + .insert(key, &new_balance) + .map_err(Into::into) +} + +fn increase_coin_balance( + block_st_transaction: &mut T, + coin: &Coin, +) -> Result<(), IndexationError> +where + T: OffChainDatabaseTransaction, +{ + let key = CoinBalancesKey::new(&coin.owner, &coin.asset_id); + let storage = block_st_transaction.storage::(); + let current_amount = storage.get(&key)?.unwrap_or_default().into_owned(); + let new_amount = current_amount.saturating_add(coin.amount as u128); + + block_st_transaction + .storage::() + .insert(&key, &new_amount) + .map_err(Into::into) +} + +fn decrease_coin_balance( + block_st_transaction: &mut T, + coin: &Coin, +) -> Result<(), IndexationError> +where + T: OffChainDatabaseTransaction, +{ + let key = CoinBalancesKey::new(&coin.owner, &coin.asset_id); + let storage = block_st_transaction.storage::(); + let current_amount = storage.get(&key)?.unwrap_or_default().into_owned(); + + let new_amount = + current_amount + .checked_sub(coin.amount as u128) + .ok_or_else(|| IndexationError::CoinBalanceWouldUnderflow { + owner: coin.owner, + asset_id: coin.asset_id, + current_amount, + requested_deduction: coin.amount as u128, + })?; + + block_st_transaction + .storage::() + .insert(&key, &new_amount) + .map_err(Into::into) +} + +pub(crate) fn process_balances_update( + event: &Event, + block_st_transaction: &mut T, + balances_enabled: bool, +) -> Result<(), IndexationError> +where + T: OffChainDatabaseTransaction, +{ + if !balances_enabled { + return Ok(()); + } + + match event { + Event::MessageImported(message) => { + increase_message_balance(block_st_transaction, message) + } + Event::MessageConsumed(message) => { + decrease_message_balance(block_st_transaction, message) + } + Event::CoinCreated(coin) => increase_coin_balance(block_st_transaction, coin), + Event::CoinConsumed(coin) => decrease_coin_balance(block_st_transaction, coin), + Event::ForcedTransactionFailed { .. } => Ok(()), + } +} + +#[cfg(test)] +mod tests { + use fuel_core_storage::{ + transactional::WriteTransaction, + StorageAsMut, + }; + use fuel_core_types::{ + entities::{ + coins::coin::Coin, + relayer::message::MessageV1, + Message, + }, + fuel_tx::{ + Address, + AssetId, + }, + services::executor::Event, + }; + + use crate::{ + database::{ + database_description::off_chain::OffChain, + Database, + }, + graphql_api::{ + indexation::{ + process_balances_update, + IndexationError, + }, + ports::worker::OffChainDatabaseTransaction, + storage::balances::{ + CoinBalances, + CoinBalancesKey, + MessageBalance, + MessageBalances, + }, + }, + }; + + impl PartialEq for IndexationError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + ( + Self::CoinBalanceWouldUnderflow { + owner: l_owner, + asset_id: l_asset_id, + current_amount: l_current_amount, + requested_deduction: l_requested_deduction, + }, + Self::CoinBalanceWouldUnderflow { + owner: r_owner, + asset_id: r_asset_id, + current_amount: r_current_amount, + requested_deduction: r_requested_deduction, + }, + ) => { + l_owner == r_owner + && l_asset_id == r_asset_id + && l_current_amount == r_current_amount + && l_requested_deduction == r_requested_deduction + } + ( + Self::MessageBalanceWouldUnderflow { + owner: l_owner, + current_amount: l_current_amount, + requested_deduction: l_requested_deduction, + retryable: l_retryable, + }, + Self::MessageBalanceWouldUnderflow { + owner: r_owner, + current_amount: r_current_amount, + requested_deduction: r_requested_deduction, + retryable: r_retryable, + }, + ) => { + l_owner == r_owner + && l_current_amount == r_current_amount + && l_requested_deduction == r_requested_deduction + && l_retryable == r_retryable + } + (Self::StorageError(l0), Self::StorageError(r0)) => l0 == r0, + _ => false, + } + } + } + + fn make_coin(owner: &Address, asset_id: &AssetId, amount: u64) -> Coin { + Coin { + utxo_id: Default::default(), + owner: *owner, + amount, + asset_id: *asset_id, + tx_pointer: Default::default(), + } + } + + fn make_retryable_message(owner: &Address, amount: u64) -> Message { + Message::V1(MessageV1 { + sender: Default::default(), + recipient: *owner, + nonce: Default::default(), + amount, + data: vec![1], + da_height: Default::default(), + }) + } + + fn make_nonretryable_message(owner: &Address, amount: u64) -> Message { + let mut message = make_retryable_message(owner, amount); + message.set_data(vec![]); + message + } + + fn assert_coin_balance( + tx: &mut T, + owner: Address, + asset_id: AssetId, + expected_balance: u128, + ) where + T: OffChainDatabaseTransaction, + { + let key = CoinBalancesKey::new(&owner, &asset_id); + let balance = tx + .storage::() + .get(&key) + .expect("should correctly query db") + .expect("should have balance"); + + assert_eq!(*balance, expected_balance); + } + + fn assert_message_balance( + tx: &mut T, + owner: Address, + expected_balance: MessageBalance, + ) where + T: OffChainDatabaseTransaction, + { + let balance = tx + .storage::() + .get(&owner) + .expect("should correctly query db") + .expect("should have balance"); + + assert_eq!(*balance, expected_balance); + } + + #[test] + fn balances_enabled_flag_is_respected() { + use tempfile::TempDir; + let tmp_dir = TempDir::new().unwrap(); + let mut db: Database = + Database::open_rocksdb(tmp_dir.path(), None, Default::default(), 512) + .unwrap(); + let mut tx = db.write_transaction(); + + const BALANCES_ARE_DISABLED: bool = false; + + let owner_1 = Address::from([1; 32]); + let owner_2 = Address::from([2; 32]); + + let asset_id_1 = AssetId::from([11; 32]); + let asset_id_2 = AssetId::from([12; 32]); + + // Initial set of coins + let events: Vec = vec![ + Event::CoinCreated(make_coin(&owner_1, &asset_id_1, 100)), + Event::CoinConsumed(make_coin(&owner_1, &asset_id_2, 200)), + Event::MessageImported(make_retryable_message(&owner_1, 300)), + Event::MessageConsumed(make_nonretryable_message(&owner_2, 400)), + ]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_DISABLED) + .expect("should process balance"); + }); + + let key = CoinBalancesKey::new(&owner_1, &asset_id_1); + let balance = tx + .storage::() + .get(&key) + .expect("should correctly query db"); + assert!(balance.is_none()); + + let key = CoinBalancesKey::new(&owner_1, &asset_id_2); + let balance = tx + .storage::() + .get(&key) + .expect("should correctly query db"); + assert!(balance.is_none()); + + let balance = tx + .storage::() + .get(&owner_1) + .expect("should correctly query db"); + assert!(balance.is_none()); + + let balance = tx + .storage::() + .get(&owner_2) + .expect("should correctly query db"); + assert!(balance.is_none()); + } + + #[test] + fn coins() { + use tempfile::TempDir; + let tmp_dir = TempDir::new().unwrap(); + let mut db: Database = + Database::open_rocksdb(tmp_dir.path(), None, Default::default(), 512) + .unwrap(); + let mut tx = db.write_transaction(); + + const BALANCES_ARE_ENABLED: bool = true; + + let owner_1 = Address::from([1; 32]); + let owner_2 = Address::from([2; 32]); + + let asset_id_1 = AssetId::from([11; 32]); + let asset_id_2 = AssetId::from([12; 32]); + + // Initial set of coins + let events: Vec = vec![ + Event::CoinCreated(make_coin(&owner_1, &asset_id_1, 100)), + Event::CoinCreated(make_coin(&owner_1, &asset_id_2, 200)), + Event::CoinCreated(make_coin(&owner_2, &asset_id_1, 300)), + Event::CoinCreated(make_coin(&owner_2, &asset_id_2, 400)), + ]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED) + .expect("should process balance"); + }); + + assert_coin_balance(&mut tx, owner_1, asset_id_1, 100); + assert_coin_balance(&mut tx, owner_1, asset_id_2, 200); + assert_coin_balance(&mut tx, owner_2, asset_id_1, 300); + assert_coin_balance(&mut tx, owner_2, asset_id_2, 400); + + // Add some more coins + let events: Vec = vec![ + Event::CoinCreated(make_coin(&owner_1, &asset_id_1, 1)), + Event::CoinCreated(make_coin(&owner_1, &asset_id_2, 2)), + Event::CoinCreated(make_coin(&owner_2, &asset_id_1, 3)), + Event::CoinCreated(make_coin(&owner_2, &asset_id_2, 4)), + ]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED) + .expect("should process balance"); + }); + + assert_coin_balance(&mut tx, owner_1, asset_id_1, 101); + assert_coin_balance(&mut tx, owner_1, asset_id_2, 202); + assert_coin_balance(&mut tx, owner_2, asset_id_1, 303); + assert_coin_balance(&mut tx, owner_2, asset_id_2, 404); + + // Consume some coins + let events: Vec = vec![ + Event::CoinConsumed(make_coin(&owner_1, &asset_id_1, 100)), + Event::CoinConsumed(make_coin(&owner_1, &asset_id_2, 200)), + Event::CoinConsumed(make_coin(&owner_2, &asset_id_1, 300)), + Event::CoinConsumed(make_coin(&owner_2, &asset_id_2, 400)), + ]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED) + .expect("should process balance"); + }); + + assert_coin_balance(&mut tx, owner_1, asset_id_1, 1); + assert_coin_balance(&mut tx, owner_1, asset_id_2, 2); + assert_coin_balance(&mut tx, owner_2, asset_id_1, 3); + assert_coin_balance(&mut tx, owner_2, asset_id_2, 4); + } + + #[test] + fn messages() { + use tempfile::TempDir; + let tmp_dir = TempDir::new().unwrap(); + let mut db: Database = + Database::open_rocksdb(tmp_dir.path(), None, Default::default(), 512) + .unwrap(); + let mut tx = db.write_transaction(); + + const BALANCES_ARE_ENABLED: bool = true; + + let owner_1 = Address::from([1; 32]); + let owner_2 = Address::from([2; 32]); + + // Initial set of messages + let events: Vec = vec![ + Event::MessageImported(make_retryable_message(&owner_1, 100)), + Event::MessageImported(make_retryable_message(&owner_2, 200)), + Event::MessageImported(make_nonretryable_message(&owner_1, 300)), + Event::MessageImported(make_nonretryable_message(&owner_2, 400)), + ]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED) + .expect("should process balance"); + }); + + assert_message_balance( + &mut tx, + owner_1, + MessageBalance { + retryable: 100, + non_retryable: 300, + }, + ); + + assert_message_balance( + &mut tx, + owner_2, + MessageBalance { + retryable: 200, + non_retryable: 400, + }, + ); + + // Add some messages + let events: Vec = vec![ + Event::MessageImported(make_retryable_message(&owner_1, 1)), + Event::MessageImported(make_retryable_message(&owner_2, 2)), + Event::MessageImported(make_nonretryable_message(&owner_1, 3)), + Event::MessageImported(make_nonretryable_message(&owner_2, 4)), + ]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED) + .expect("should process balance"); + }); + + assert_message_balance( + &mut tx, + owner_1, + MessageBalance { + retryable: 101, + non_retryable: 303, + }, + ); + + assert_message_balance( + &mut tx, + owner_2, + MessageBalance { + retryable: 202, + non_retryable: 404, + }, + ); + + // Consume some messages + let events: Vec = vec![ + Event::MessageConsumed(make_retryable_message(&owner_1, 100)), + Event::MessageConsumed(make_retryable_message(&owner_2, 200)), + Event::MessageConsumed(make_nonretryable_message(&owner_1, 300)), + Event::MessageConsumed(make_nonretryable_message(&owner_2, 400)), + ]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED) + .expect("should process balance"); + }); + + assert_message_balance( + &mut tx, + owner_1, + MessageBalance { + retryable: 1, + non_retryable: 3, + }, + ); + + assert_message_balance( + &mut tx, + owner_2, + MessageBalance { + retryable: 2, + non_retryable: 4, + }, + ); + } + + #[test] + fn coin_balance_overflow_does_not_error() { + use tempfile::TempDir; + let tmp_dir = TempDir::new().unwrap(); + let mut db: Database = + Database::open_rocksdb(tmp_dir.path(), None, Default::default(), 512) + .unwrap(); + let mut tx = db.write_transaction(); + + const BALANCES_ARE_ENABLED: bool = true; + + let owner = Address::from([1; 32]); + let asset_id = AssetId::from([11; 32]); + + // Make the initial balance huge + let key = CoinBalancesKey::new(&owner, &asset_id); + tx.storage::() + .insert(&key, &u128::MAX) + .expect("should correctly query db"); + + assert_coin_balance(&mut tx, owner, asset_id, u128::MAX); + + // Try to add more coins + let events: Vec = + vec![Event::CoinCreated(make_coin(&owner, &asset_id, 1))]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED) + .expect("should process balance"); + }); + + assert_coin_balance(&mut tx, owner, asset_id, u128::MAX); + } + + #[test] + fn message_balance_overflow_does_not_error() { + use tempfile::TempDir; + let tmp_dir = TempDir::new().unwrap(); + let mut db: Database = + Database::open_rocksdb(tmp_dir.path(), None, Default::default(), 512) + .unwrap(); + let mut tx = db.write_transaction(); + + const BALANCES_ARE_ENABLED: bool = true; + const MAX_BALANCES: MessageBalance = MessageBalance { + retryable: u128::MAX, + non_retryable: u128::MAX, + }; + + let owner = Address::from([1; 32]); + + // Make the initial balance huge + tx.storage::() + .insert(&owner, &MAX_BALANCES) + .expect("should correctly query db"); + + assert_message_balance(&mut tx, owner, MAX_BALANCES); + + // Try to add more coins + let events: Vec = vec![ + Event::MessageImported(make_retryable_message(&owner, 1)), + Event::MessageImported(make_nonretryable_message(&owner, 1)), + ]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED) + .expect("should process balance"); + }); + + assert_message_balance(&mut tx, owner, MAX_BALANCES); + } + + #[test] + fn coin_balance_underflow_causes_error() { + use tempfile::TempDir; + let tmp_dir = TempDir::new().unwrap(); + let mut db: Database = + Database::open_rocksdb(tmp_dir.path(), None, Default::default(), 512) + .unwrap(); + let mut tx = db.write_transaction(); + + const BALANCES_ARE_ENABLED: bool = true; + + let owner = Address::from([1; 32]); + let asset_id_1 = AssetId::from([11; 32]); + let asset_id_2 = AssetId::from([12; 32]); + + // Initial set of coins + let events: Vec = + vec![Event::CoinCreated(make_coin(&owner, &asset_id_1, 100))]; + + events.iter().for_each(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED) + .expect("should process balance"); + }); + + // Consume more coins than available + let events: Vec = vec![ + Event::CoinConsumed(make_coin(&owner, &asset_id_1, 10000)), + Event::CoinConsumed(make_coin(&owner, &asset_id_2, 20000)), + ]; + + let expected_errors = vec![ + IndexationError::CoinBalanceWouldUnderflow { + owner, + asset_id: asset_id_1, + current_amount: 100, + requested_deduction: 10000, + }, + IndexationError::CoinBalanceWouldUnderflow { + owner, + asset_id: asset_id_2, + current_amount: 0, + requested_deduction: 20000, + }, + ]; + + let actual_errors: Vec<_> = events + .iter() + .map(|event| { + process_balances_update(event, &mut tx, BALANCES_ARE_ENABLED).unwrap_err() + }) + .collect(); + + assert_eq!(expected_errors, actual_errors); + } +} diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index 307904c9a92..0463ee3992e 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -64,6 +64,8 @@ use fuel_core_types::{ }; use std::sync::Arc; +use super::storage::balances::TotalBalanceAmount; + pub trait OffChainDatabase: Send + Sync { fn block_height(&self, block_id: &BlockId) -> StorageResult; @@ -71,6 +73,20 @@ pub trait OffChainDatabase: Send + Sync { fn tx_status(&self, tx_id: &TxId) -> StorageResult; + fn balance( + &self, + owner: &Address, + asset_id: &AssetId, + base_asset_id: &AssetId, + ) -> StorageResult; + + fn balances( + &self, + owner: &Address, + base_asset_id: &AssetId, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult<(AssetId, TotalBalanceAmount)>>; + fn owned_coins_ids( &self, owner: &Address, @@ -273,6 +289,10 @@ pub mod worker { }, }, graphql_api::storage::{ + balances::{ + CoinBalances, + MessageBalances, + }, da_compression::*, old::{ OldFuelBlockConsensus, @@ -316,6 +336,9 @@ pub mod worker { /// Creates a write database transaction. fn transaction(&mut self) -> Self::Transaction<'_>; + + /// Checks if Balances cache functionality is available. + fn balances_enabled(&self) -> StorageResult; } /// Represents either the Genesis Block or a block at a specific height @@ -337,6 +360,8 @@ pub mod worker { + StorageMutate + StorageMutate + StorageMutate + + StorageMutate + + StorageMutate + StorageMutate + StorageMutate + StorageMutate diff --git a/crates/fuel-core/src/graphql_api/storage.rs b/crates/fuel-core/src/graphql_api/storage.rs index 8f8cfcd1f19..8ebe615b30a 100644 --- a/crates/fuel-core/src/graphql_api/storage.rs +++ b/crates/fuel-core/src/graphql_api/storage.rs @@ -36,6 +36,7 @@ use fuel_core_types::{ }; use statistic::StatisticTable; +pub mod balances; pub mod blocks; pub mod coins; pub mod contracts; @@ -113,6 +114,10 @@ pub enum Column { DaCompressionTemporalRegistryScriptCode = 21, /// See [`DaCompressionTemporalRegistryPredicateCode`](da_compression::DaCompressionTemporalRegistryPredicateCode) DaCompressionTemporalRegistryPredicateCode = 22, + /// Coin balances per account and asset. + CoinBalances = 23, + /// Message balances per account. + MessageBalances = 24, } impl Column { diff --git a/crates/fuel-core/src/graphql_api/storage/balances.rs b/crates/fuel-core/src/graphql_api/storage/balances.rs new file mode 100644 index 00000000000..a6402cd260e --- /dev/null +++ b/crates/fuel-core/src/graphql_api/storage/balances.rs @@ -0,0 +1,100 @@ +use fuel_core_storage::{ + blueprint::plain::Plain, + codec::{ + postcard::Postcard, + raw::Raw, + }, + structured_storage::TableWithBlueprint, + Mappable, +}; +use fuel_core_types::{ + fuel_tx::{ + Address, + AssetId, + }, + fuel_vm::double_key, +}; +use rand::{ + distributions::Standard, + prelude::Distribution, + Rng, +}; + +pub type ItemAmount = u64; +pub type TotalBalanceAmount = u128; + +double_key!(CoinBalancesKey, Address, address, AssetId, asset_id); +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> CoinBalancesKey { + let mut bytes = [0u8; CoinBalancesKey::LEN]; + rng.fill_bytes(bytes.as_mut()); + CoinBalancesKey::from_array(bytes) + } +} + +impl core::fmt::Display for CoinBalancesKey { + fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result { + write!(f, "address={} asset_id={}", self.address(), self.asset_id()) + } +} + +/// This table stores the balances of coins per owner and asset id. +pub struct CoinBalances; + +impl Mappable for CoinBalances { + type Key = CoinBalancesKey; + type OwnedKey = Self::Key; + type Value = TotalBalanceAmount; + type OwnedValue = Self::Value; +} + +impl TableWithBlueprint for CoinBalances { + type Blueprint = Plain; + type Column = super::Column; + + fn column() -> Self::Column { + Self::Column::CoinBalances + } +} + +#[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize, PartialEq)] +pub struct MessageBalance { + pub retryable: TotalBalanceAmount, + pub non_retryable: TotalBalanceAmount, +} + +/// This table stores the balances of messages per owner. +pub struct MessageBalances; + +impl Mappable for MessageBalances { + type Key = Address; + type OwnedKey = Self::Key; + type Value = MessageBalance; + type OwnedValue = Self::Value; +} + +impl TableWithBlueprint for MessageBalances { + type Blueprint = Plain; + type Column = super::Column; + + fn column() -> Self::Column { + Self::Column::MessageBalances + } +} + +#[cfg(test)] +mod test { + use super::*; + + fuel_core_storage::basic_storage_tests!( + CoinBalances, + ::Key::default(), + ::Value::default() + ); + + fuel_core_storage::basic_storage_tests!( + MessageBalances, + ::Key::default(), + ::Value::default() + ); +} diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs index 6c29e49e7e4..68b3c4fa109 100644 --- a/crates/fuel-core/src/graphql_api/worker_service.rs +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -1,5 +1,8 @@ +use self::indexation::IndexationError; + use super::{ da_compression::da_compress_block, + indexation, storage::old::{ OldFuelBlockConsensus, OldFuelBlocks, @@ -129,6 +132,7 @@ pub struct Task { chain_id: ChainId, da_compression_config: DaCompressionConfig, continue_on_error: bool, + balances_enabled: bool, } impl Task @@ -161,6 +165,7 @@ where process_executor_events( result.events.iter().map(Cow::Borrowed), &mut transaction, + self.balances_enabled, )?; match self.da_compression_config { @@ -189,12 +194,29 @@ where pub fn process_executor_events<'a, Iter, T>( events: Iter, block_st_transaction: &mut T, + balances_enabled: bool, ) -> anyhow::Result<()> where Iter: Iterator>, T: OffChainDatabaseTransaction, { for event in events { + match indexation::process_balances_update( + event.deref(), + block_st_transaction, + balances_enabled, + ) { + Ok(()) => (), + Err(IndexationError::StorageError(err)) => { + return Err(err.into()); + } + Err(err @ IndexationError::CoinBalanceWouldUnderflow { .. }) + | Err(err @ IndexationError::MessageBalanceWouldUnderflow { .. }) => { + // TODO[RC]: Balances overflow to be correctly handled. See: https://github.com/FuelLabs/fuel-core/issues/2428 + tracing::error!("Balances underflow detected: {}", err); + } + } + match event.deref() { Event::MessageImported(message) => { block_st_transaction @@ -477,6 +499,9 @@ where graphql_metrics().total_txs_count.set(total_tx_count as i64); } + let balances_enabled = self.off_chain_database.balances_enabled()?; + tracing::info!("Balances cache available: {}", balances_enabled); + let InitializeTask { chain_id, da_compression_config, @@ -495,6 +520,7 @@ where chain_id, da_compression_config, continue_on_error, + balances_enabled, }; let mut target_chain_height = on_chain_database.latest_height()?; diff --git a/crates/fuel-core/src/graphql_api/worker_service/tests.rs b/crates/fuel-core/src/graphql_api/worker_service/tests.rs index 401d25db455..5a59073702a 100644 --- a/crates/fuel-core/src/graphql_api/worker_service/tests.rs +++ b/crates/fuel-core/src/graphql_api/worker_service/tests.rs @@ -83,5 +83,6 @@ fn worker_task_with_block_importer_and_db( chain_id, da_compression_config: DaCompressionConfig::Disabled, continue_on_error: false, + balances_enabled: true, } } diff --git a/crates/fuel-core/src/query/balance.rs b/crates/fuel-core/src/query/balance.rs index 161fd64b87e..706fcf02569 100644 --- a/crates/fuel-core/src/query/balance.rs +++ b/crates/fuel-core/src/query/balance.rs @@ -1,4 +1,12 @@ -use crate::fuel_core_graphql_api::database::ReadView; +use std::{ + cmp::Ordering, + collections::HashMap, +}; + +use crate::{ + fuel_core_graphql_api::database::ReadView, + graphql_api::storage::balances::TotalBalanceAmount, +}; use asset_query::{ AssetQuery, AssetSpendTarget, @@ -17,15 +25,12 @@ use fuel_core_types::{ services::graphql_api::AddressBalance, }; use futures::{ + stream, FutureExt, Stream, StreamExt, TryStreamExt, }; -use std::{ - cmp::Ordering, - collections::HashMap, -}; pub mod asset_query; @@ -36,22 +41,23 @@ impl ReadView { asset_id: AssetId, base_asset_id: AssetId, ) -> StorageResult { - let amount = AssetQuery::new( - &owner, - &AssetSpendTarget::new(asset_id, u64::MAX, u16::MAX), - &base_asset_id, - None, - self, - ) - .coins() - .map(|res| res.map(|coins| coins.amount())) - .try_fold(0u64, |balance, amount| { - async move { - // Increase the balance - Ok(balance.saturating_add(amount)) - } - }) - .await?; + let amount = if self.balances_enabled { + self.off_chain.balance(&owner, &asset_id, &base_asset_id)? + } else { + AssetQuery::new( + &owner, + &AssetSpendTarget::new(asset_id, u64::MAX, u16::MAX), + &base_asset_id, + None, + self, + ) + .coins() + .map(|res| res.map(|coins| coins.amount())) + .try_fold(0u128, |balance, amount| async move { + Ok(balance.saturating_add(amount as TotalBalanceAmount)) + }) + .await? as TotalBalanceAmount + }; Ok(AddressBalance { owner, @@ -65,6 +71,27 @@ impl ReadView { owner: &'a Address, direction: IterDirection, base_asset_id: &'a AssetId, + ) -> impl Stream> + 'a { + if self.balances_enabled { + futures::future::Either::Left(self.balances_with_cache( + owner, + base_asset_id, + direction, + )) + } else { + futures::future::Either::Right(self.balances_without_cache( + owner, + base_asset_id, + direction, + )) + } + } + + fn balances_without_cache<'a>( + &'a self, + owner: &'a Address, + base_asset_id: &'a AssetId, + direction: IterDirection, ) -> impl Stream> + 'a { let query = AssetsQuery::new(owner, None, None, self, base_asset_id); let stream = query.coins(); @@ -73,10 +100,10 @@ impl ReadView { .try_fold( HashMap::new(), move |mut amounts_per_asset, coin| async move { - let amount: &mut u64 = amounts_per_asset + let amount: &mut TotalBalanceAmount = amounts_per_asset .entry(*coin.asset_id(base_asset_id)) .or_default(); - *amount = amount.saturating_add(coin.amount()); + *amount = amount.saturating_add(coin.amount() as TotalBalanceAmount); Ok(amounts_per_asset) }, ) @@ -109,4 +136,21 @@ impl ReadView { .try_flatten() .yield_each(self.batch_size) } + + fn balances_with_cache<'a>( + &'a self, + owner: &'a Address, + base_asset_id: &AssetId, + direction: IterDirection, + ) -> impl Stream> + 'a { + stream::iter(self.off_chain.balances(owner, base_asset_id, direction)) + .map(move |result| { + result.map(|(asset_id, amount)| AddressBalance { + owner: *owner, + asset_id, + amount, + }) + }) + .yield_each(self.batch_size) + } } diff --git a/crates/fuel-core/src/query/balance/asset_query.rs b/crates/fuel-core/src/query/balance/asset_query.rs index 13a289ec1e4..6ea62390fd1 100644 --- a/crates/fuel-core/src/query/balance/asset_query.rs +++ b/crates/fuel-core/src/query/balance/asset_query.rs @@ -175,7 +175,7 @@ impl<'a> AssetsQuery<'a> { .try_flatten() .filter(|result| { if let Ok(message) = result { - message.data().is_empty() + message.is_non_retryable_message() } else { true } diff --git a/crates/fuel-core/src/schema/balance.rs b/crates/fuel-core/src/schema/balance.rs index 140bb81256f..b6b95228e83 100644 --- a/crates/fuel-core/src/schema/balance.rs +++ b/crates/fuel-core/src/schema/balance.rs @@ -7,7 +7,7 @@ use crate::{ scalars::{ Address, AssetId, - U64, + U128, }, ReadViewProvider, }, @@ -33,7 +33,7 @@ impl Balance { self.0.owner.into() } - async fn amount(&self) -> U64 { + async fn amount(&self) -> U128 { self.0.amount.into() } diff --git a/crates/fuel-core/src/schema/scalars.rs b/crates/fuel-core/src/schema/scalars.rs index e75b3271f98..d3ddb7df20b 100644 --- a/crates/fuel-core/src/schema/scalars.rs +++ b/crates/fuel-core/src/schema/scalars.rs @@ -79,6 +79,7 @@ macro_rules! number_scalar { }; } +number_scalar!(U128, u128, "U128"); number_scalar!(U64, u64, "U64"); number_scalar!(U32, u32, "U32"); number_scalar!(U16, u16, "U16"); diff --git a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs index 2caf033e2c8..6397182ed0e 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs @@ -1,6 +1,9 @@ use crate::{ database::{ - database_description::off_chain::OffChain, + database_description::{ + off_chain::OffChain, + IndexationKind, + }, Database, OffChainIterableKeyValueView, }, @@ -16,10 +19,19 @@ use crate::{ transactions::OwnedTransactionIndexCursor, }, }, - graphql_api::storage::old::{ - OldFuelBlockConsensus, - OldFuelBlocks, - OldTransactions, + graphql_api::storage::{ + balances::{ + CoinBalances, + CoinBalancesKey, + MessageBalance, + MessageBalances, + TotalBalanceAmount, + }, + old::{ + OldFuelBlockConsensus, + OldFuelBlocks, + OldTransactions, + }, }, }; use fuel_core_storage::{ @@ -51,6 +63,7 @@ use fuel_core_types::{ entities::relayer::transaction::RelayedTransactionStatus, fuel_tx::{ Address, + AssetId, Bytes32, ContractId, Salt, @@ -65,6 +78,7 @@ use fuel_core_types::{ }, services::txpool::TransactionStatus, }; +use std::iter; impl OffChainDatabase for OffChainIterableKeyValueView { fn block_height(&self, id: &BlockId) -> StorageResult { @@ -187,6 +201,87 @@ impl OffChainDatabase for OffChainIterableKeyValueView { fn message_is_spent(&self, nonce: &Nonce) -> StorageResult { self.message_is_spent(nonce) } + + fn balance( + &self, + owner: &Address, + asset_id: &AssetId, + base_asset_id: &AssetId, + ) -> StorageResult { + let coins = self + .storage_as_ref::() + .get(&CoinBalancesKey::new(owner, asset_id))? + .unwrap_or_default() + .into_owned() as TotalBalanceAmount; + + if base_asset_id == asset_id { + let MessageBalance { + retryable: _, // TODO: https://github.com/FuelLabs/fuel-core/issues/2448 + non_retryable, + } = self + .storage_as_ref::() + .get(owner)? + .unwrap_or_default() + .into_owned(); + + let total = coins.checked_add(non_retryable).ok_or(anyhow::anyhow!( + "Total balance overflow: coins: {coins}, messages: {non_retryable}" + ))?; + Ok(total) + } else { + Ok(coins) + } + } + + fn balances( + &self, + owner: &Address, + base_asset_id: &AssetId, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult<(AssetId, TotalBalanceAmount)>> { + let base_asset_id = *base_asset_id; + let base_balance = self.balance(owner, &base_asset_id, &base_asset_id); + let base_asset_balance = match base_balance { + Ok(base_asset_balance) => { + if base_asset_balance != 0 { + iter::once(Ok((base_asset_id, base_asset_balance))).into_boxed() + } else { + iter::empty().into_boxed() + } + } + Err(err) => iter::once(Err(err)).into_boxed(), + }; + + let non_base_asset_balance = self + .iter_all_filtered_keys::(Some(owner), None, Some(direction)) + .filter_map(move |result| match result { + Ok(key) if *key.asset_id() != base_asset_id => Some(Ok(key)), + Ok(_) => None, + Err(err) => Some(Err(err)), + }) + .map(move |result| { + result.and_then(|key| { + let asset_id = key.asset_id(); + let coin_balance = + self.storage_as_ref::() + .get(&key)? + .unwrap_or_default() + .into_owned() as TotalBalanceAmount; + Ok((*asset_id, coin_balance)) + }) + }) + .into_boxed(); + + if direction == IterDirection::Forward { + base_asset_balance + .chain(non_base_asset_balance) + .into_boxed() + } else { + non_base_asset_balance + .chain(base_asset_balance) + .into_boxed() + } + } } impl worker::OffChainDatabase for Database { @@ -199,4 +294,8 @@ impl worker::OffChainDatabase for Database { fn transaction(&mut self) -> Self::Transaction<'_> { self.into_transaction() } + + fn balances_enabled(&self) -> StorageResult { + self.indexation_available(IndexationKind::Balances) + } } diff --git a/crates/fuel-core/src/service/genesis/importer/off_chain.rs b/crates/fuel-core/src/service/genesis/importer/off_chain.rs index eef13bf9ee5..8726c73a235 100644 --- a/crates/fuel-core/src/service/genesis/importer/off_chain.rs +++ b/crates/fuel-core/src/service/genesis/importer/off_chain.rs @@ -43,6 +43,23 @@ use super::{ Handler, }; +fn balances_indexation_enabled() -> bool { + use std::sync::OnceLock; + + static BALANCES_INDEXATION_ENABLED: OnceLock = OnceLock::new(); + + *BALANCES_INDEXATION_ENABLED.get_or_init(|| { + // During re-genesis process the metadata is always doesn't exist. + let metadata = None; + let indexation_availability = + crate::database::database_description::indexation_availability::( + metadata, + ); + indexation_availability + .contains(&crate::database::database_description::IndexationKind::Balances) + }) +} + impl ImportTable for Handler { type TableInSnapshot = TransactionStatuses; type TableBeingWritten = TransactionStatuses; @@ -110,7 +127,11 @@ impl ImportTable for Handler { let events = group .into_iter() .map(|TableEntry { value, .. }| Cow::Owned(Event::MessageImported(value))); - worker_service::process_executor_events(events, tx)?; + worker_service::process_executor_events( + events, + tx, + balances_indexation_enabled(), + )?; Ok(()) } } @@ -128,7 +149,11 @@ impl ImportTable for Handler { let events = group.into_iter().map(|TableEntry { value, key }| { Cow::Owned(Event::CoinCreated(value.uncompress(key))) }); - worker_service::process_executor_events(events, tx)?; + worker_service::process_executor_events( + events, + tx, + balances_indexation_enabled(), + )?; Ok(()) } } diff --git a/crates/types/src/entities.rs b/crates/types/src/entities.rs index 5dbf816fba5..19c5ec0bf14 100644 --- a/crates/types/src/entities.rs +++ b/crates/types/src/entities.rs @@ -22,10 +22,9 @@ impl TryFrom for MessageCoin { let recipient = *message.recipient(); let nonce = *message.nonce(); let amount = message.amount(); - let data = message.data(); let da_height = message.da_height(); - if !data.is_empty() { + if message.is_retryable_message() { return Err(anyhow::anyhow!( "The data is not empty, impossible to convert into the `MessageCoin`" )) diff --git a/crates/types/src/entities/relayer/message.rs b/crates/types/src/entities/relayer/message.rs index b058b6ff75c..5be4cbda490 100644 --- a/crates/types/src/entities/relayer/message.rs +++ b/crates/types/src/entities/relayer/message.rs @@ -135,6 +135,18 @@ impl Message { } } + /// Returns `true` if the message is retryable. + pub fn is_retryable_message(&self) -> bool { + let is_data_empty = self.data().is_empty(); + !is_data_empty + } + + /// Returns `true` if the message is non retryable. + pub fn is_non_retryable_message(&self) -> bool { + let is_data_empty = self.data().is_empty(); + is_data_empty + } + /// Set the message data #[cfg(any(test, feature = "test-helpers"))] pub fn set_data(&mut self, data: Vec) { diff --git a/crates/types/src/services/graphql_api.rs b/crates/types/src/services/graphql_api.rs index b38d73e0e03..efcfdc99ecd 100644 --- a/crates/types/src/services/graphql_api.rs +++ b/crates/types/src/services/graphql_api.rs @@ -7,17 +7,17 @@ use crate::fuel_types::{ }; /// The cumulative balance(`amount`) of the `Owner` of `asset_id`. -pub struct Balance { +pub struct Balance { /// Owner of the asset. pub owner: Owner, /// The cumulative amount of the asset. - pub amount: u64, + pub amount: Amount, /// The identifier of the asset. pub asset_id: AssetId, } /// The alias for the `Balance` of the address. -pub type AddressBalance = Balance
; +pub type AddressBalance = Balance; /// The alias for the `Balance` of the contract. -pub type ContractBalance = Balance; +pub type ContractBalance = Balance; diff --git a/tests/tests/balances.rs b/tests/tests/balances.rs index a5892b434eb..20eb662914c 100644 --- a/tests/tests/balances.rs +++ b/tests/tests/balances.rs @@ -132,6 +132,101 @@ async fn balance() { assert_eq!(balance, 449); } +#[tokio::test] +async fn balance_messages_only() { + let owner = Address::default(); + let asset_id = AssetId::BASE; + + const RETRYABLE: &[u8] = &[1]; + const NON_RETRYABLE: &[u8] = &[]; + + // setup config + let state_config = StateConfig { + contracts: vec![], + coins: vec![], + messages: vec![ + (owner, 60, NON_RETRYABLE), + (owner, 200, RETRYABLE), + (owner, 90, NON_RETRYABLE), + ] + .into_iter() + .enumerate() + .map(|(nonce, (owner, amount, data))| MessageConfig { + sender: owner, + recipient: owner, + nonce: (nonce as u64).into(), + amount, + data: data.to_vec(), + da_height: DaBlockHeight::from(0usize), + }) + .collect(), + ..Default::default() + }; + let config = Config::local_node_with_state_config(state_config); + + // setup server & client + let srv = FuelService::new_node(config).await.unwrap(); + let client = FuelClient::from(srv.bound_address); + + // run test + const NON_RETRYABLE_AMOUNT: u128 = 60 + 90; + let balance = client.balance(&owner, Some(&asset_id)).await.unwrap(); + assert_eq!(balance, NON_RETRYABLE_AMOUNT); +} + +#[tokio::test] +async fn balances_messages_only() { + let owner = Address::default(); + + const RETRYABLE: &[u8] = &[1]; + const NON_RETRYABLE: &[u8] = &[]; + + // setup config + let state_config = StateConfig { + contracts: vec![], + coins: vec![], + messages: vec![ + (owner, 60, NON_RETRYABLE), + (owner, 200, RETRYABLE), + (owner, 90, NON_RETRYABLE), + ] + .into_iter() + .enumerate() + .map(|(nonce, (owner, amount, data))| MessageConfig { + sender: owner, + recipient: owner, + nonce: (nonce as u64).into(), + amount, + data: data.to_vec(), + da_height: DaBlockHeight::from(0usize), + }) + .collect(), + ..Default::default() + }; + let config = Config::local_node_with_state_config(state_config); + + // setup server & client + let srv = FuelService::new_node(config).await.unwrap(); + let client = FuelClient::from(srv.bound_address); + + // run test + const NON_RETRYABLE_AMOUNT: u128 = 60 + 90; + let balances = client + .balances( + &owner, + PaginationRequest { + cursor: None, + results: 10, + direction: PageDirection::Forward, + }, + ) + .await + .unwrap(); + assert_eq!(balances.results.len(), 1); + let messages_balance = balances.results[0].amount; + assert_eq!(messages_balance, NON_RETRYABLE_AMOUNT); +} + #[tokio::test] async fn first_5_balances() { let owner = Address::from([10u8; 32]); diff --git a/tests/tests/blob.rs b/tests/tests/blob.rs index 813dfea1418..74a1a7d13aa 100644 --- a/tests/tests/blob.rs +++ b/tests/tests/blob.rs @@ -183,7 +183,7 @@ async fn blob__cannot_post_already_existing_blob() { } #[tokio::test] -async fn blob__accessing_nonexitent_blob_panics_vm() { +async fn blob__accessing_nonexistent_blob_panics_vm() { // Given let ctx = TestContext::new().await; let blob_id = BlobId::new([0; 32]); // Nonexistent diff --git a/tests/tests/chain.rs b/tests/tests/chain.rs index c5c62b8f600..43470f7c0d3 100644 --- a/tests/tests/chain.rs +++ b/tests/tests/chain.rs @@ -170,11 +170,11 @@ async fn network_operates_with_non_zero_base_asset_id() { .expect("transaction should insert"); // Then - let expected_fee = 1; + let expected_fee = 1_u128; assert!(matches!(result, TransactionStatus::Success { .. })); let balance = client .balance(&owner, Some(&new_base_asset_id)) .await .expect("Should fetch the balance"); - assert_eq!(balance, amount - expected_fee); + assert_eq!(balance, amount as u128 - expected_fee); } diff --git a/tests/tests/fee_collection_contract.rs b/tests/tests/fee_collection_contract.rs index 54426b5c293..e449e6fa17b 100644 --- a/tests/tests/fee_collection_contract.rs +++ b/tests/tests/fee_collection_contract.rs @@ -227,7 +227,7 @@ async fn happy_path() { // Make sure that the full balance was been withdrawn assert_eq!( ctx.client.balance(&ctx.address, None).await.unwrap(), - contract_balance_before_collect + contract_balance_before_collect as u128 ); }