diff --git a/CHANGELOG.md b/CHANGELOG.md index 96e111b90ac..06128e8691b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2061](https://github.com/FuelLabs/fuel-core/pull/2061): Allow querying filled transaction body from the status. ### Changed --[2064](https://github.com/FuelLabs/fuel-core/pull/2064): Allow gas price metadata values to be overridden with config +-[2067](https://github.com/FuelLabs/fuel-core/pull/2067): Return error from TxPool level if the `BlobId` is known. +-[2064](https://github.com/FuelLabs/fuel-core/pull/2064): Allow gas price metadata values to be overridden with config ### Fixes - [2060](https://github.com/FuelLabs/fuel-core/pull/2060): Use `min-gas-price` as a starting point if `start-gas-price` is zero. diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index 4ee44db6c28..323b679d9cf 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -34,6 +34,7 @@ use fuel_core_types::{ relayer::message::Message, }, fuel_tx::{ + BlobId, ConsensusParameters, Transaction, UtxoId, @@ -42,6 +43,7 @@ use fuel_core_types::{ ContractId, Nonce, }, + fuel_vm::BlobData, services::{ block_importer::SharedImportResult, p2p::{ @@ -135,6 +137,10 @@ impl fuel_core_txpool::ports::TxPoolDb for OnChainIterableKeyValueView { self.storage::().contains_key(contract_id) } + fn blob_exist(&self, blob_id: &BlobId) -> StorageResult { + self.storage::().contains_key(blob_id) + } + fn message(&self, id: &Nonce) -> StorageResult> { self.storage::() .get(id) diff --git a/crates/services/txpool/src/containers/dependency.rs b/crates/services/txpool/src/containers/dependency.rs index 9801a16bcf7..01644cd67c5 100644 --- a/crates/services/txpool/src/containers/dependency.rs +++ b/crates/services/txpool/src/containers/dependency.rs @@ -6,6 +6,7 @@ use crate::{ }; use fuel_core_types::{ fuel_tx::{ + field::BlobId as BlobIdField, input::{ coin::{ CoinPredicate, @@ -23,7 +24,10 @@ use fuel_core_types::{ Output, UtxoId, }, - fuel_types::Nonce, + fuel_types::{ + BlobId, + Nonce, + }, services::txpool::ArcPoolTx, }; use std::collections::{ @@ -40,6 +44,8 @@ pub struct Dependency { coins: HashMap, /// Contract-> Tx mapping. contracts: HashMap, + /// Blob-> Tx mapping. + blobs: HashMap, /// messageId -> tx mapping messages: HashMap, /// max depth of dependency. @@ -90,77 +96,24 @@ pub struct MessageState { tip: Word, } +#[derive(Debug, Clone)] +pub struct BlobState { + origin_tx_id: TxId, + tip: Word, +} + impl Dependency { pub fn new(max_depth: usize, utxo_validation: bool) -> Self { Self { coins: HashMap::new(), contracts: HashMap::new(), + blobs: HashMap::new(), messages: HashMap::new(), max_depth, utxo_validation, } } - /// find all dependent Transactions that are inside txpool. - /// Does not check db. They can be sorted by gasPrice to get order of dependency - pub(crate) fn find_dependent( - &self, - tx: ArcPoolTx, - seen: &mut HashMap, - txs: &HashMap, - ) { - // for every input aggregate UtxoId and check if it is inside - let mut check = vec![tx.id()]; - while let Some(parent_txhash) = check.pop() { - let mut is_new = false; - let mut parent_tx = None; - seen.entry(parent_txhash).or_insert_with(|| { - is_new = true; - let parent = txs.get(&parent_txhash).expect("To have tx in txpool"); - parent_tx = Some(parent.clone()); - parent.tx().clone() - }); - // for every input check if tx_id is inside seen. if not, check coins/contract map. - if let Some(parent_tx) = parent_tx { - for input in parent_tx.inputs() { - // if found and depth is not zero add it to `check`. - match input { - Input::CoinSigned(CoinSigned { utxo_id, .. }) - | Input::CoinPredicate(CoinPredicate { utxo_id, .. }) => { - let state = self - .coins - .get(utxo_id) - .expect("to find coin inside spend tx"); - if !state.is_in_database() { - check.push(*utxo_id.tx_id()); - } - } - Input::Contract(Contract { contract_id, .. }) => { - let state = self - .contracts - .get(contract_id) - .expect("Expect to find contract in dependency"); - - if !state.is_in_database() { - let origin = state - .origin - .as_ref() - .expect("contract origin to be present"); - check.push(*origin.tx_id()); - } - } - Input::MessageCoinSigned(_) - | Input::MessageCoinPredicate(_) - | Input::MessageDataSigned(_) - | Input::MessageDataPredicate(_) => { - // Message inputs do not depend on any other fuel transactions - } - } - } - } - } - } - fn check_if_coin_input_can_spend_output( output: &Output, input: &Input, @@ -258,6 +211,7 @@ impl Dependency { usize, HashMap, HashMap, + HashMap, HashMap, Vec, ), @@ -268,7 +222,34 @@ impl Dependency { let mut max_depth = 0; let mut db_coins: HashMap = HashMap::new(); let mut db_contracts: HashMap = HashMap::new(); + let mut db_blobs: HashMap = HashMap::new(); let mut db_messages: HashMap = HashMap::new(); + + if let PoolTransaction::Blob(checked_tx, _) = tx.as_ref() { + let blob_id = checked_tx.transaction().blob_id(); + if db + .blob_exist(blob_id) + .map_err(|e| Error::Database(format!("{:?}", e)))? + { + return Err(Error::NotInsertedBlobIdAlreadyTaken(*blob_id)) + } + + if let Some(state) = self.blobs.get(blob_id) { + if state.tip >= tx.tip() { + return Err(Error::NotInsertedCollisionBlobId(*blob_id)) + } else { + collided.push(state.origin_tx_id); + } + } + db_blobs.insert( + *blob_id, + BlobState { + origin_tx_id: tx.id(), + tip: tx.tip(), + }, + ); + } + for input in tx.inputs() { // check if all required inputs are here. match input { @@ -461,7 +442,14 @@ impl Dependency { // collision of other outputs is not possible. } - Ok((max_depth, db_coins, db_contracts, db_messages, collided)) + Ok(( + max_depth, + db_coins, + db_contracts, + db_blobs, + db_messages, + collided, + )) } /// insert tx inside dependency @@ -475,7 +463,7 @@ impl Dependency { where DB: TxPoolDb, { - let (max_depth, db_coins, db_contracts, db_messages, collided) = + let (max_depth, db_coins, db_contracts, db_blobs, db_messages, collided) = self.check_for_collision(txs, db, tx)?; // now we are sure that transaction can be included. remove all collided transactions @@ -519,6 +507,7 @@ impl Dependency { // for contracts from db that are not found in dependency, we already inserted used_by // and are okay to just extend current list self.contracts.extend(db_contracts); + self.blobs.extend(db_blobs); // insert / overwrite all applicable message id spending relations self.messages.extend(db_messages); @@ -555,7 +544,7 @@ impl Dependency { ); } Output::Contract(_) => { - // do nothing, this contract is already already found in dependencies. + // do nothing, this contract is already found in dependencies. // as it is tied with input and used_by is already inserted. } }; @@ -663,6 +652,11 @@ impl Dependency { } } + if let PoolTransaction::Blob(checked_tx, _) = tx.as_ref() { + // remove blob state + self.blobs.remove(checked_tx.transaction().blob_id()); + } + removed_transactions } } diff --git a/crates/services/txpool/src/mock_db.rs b/crates/services/txpool/src/mock_db.rs index d6a4ff8d9c1..371d214f0c2 100644 --- a/crates/services/txpool/src/mock_db.rs +++ b/crates/services/txpool/src/mock_db.rs @@ -12,11 +12,13 @@ use fuel_core_types::{ relayer::message::Message, }, fuel_tx::{ + BlobId, Contract, ContractId, UtxoId, }, fuel_types::Nonce, + fuel_vm::BlobBytes, }; use std::{ collections::{ @@ -33,6 +35,7 @@ use std::{ pub struct Data { pub coins: HashMap, pub contracts: HashMap, + pub blobs: HashMap, pub messages: HashMap, pub spent_messages: HashSet, } @@ -51,6 +54,14 @@ impl MockDb { .insert(coin.utxo_id, coin.compress()); } + pub fn insert_dummy_blob(&self, blob_id: BlobId) { + self.data + .lock() + .unwrap() + .blobs + .insert(blob_id, vec![123; 123].into()); + } + pub fn insert_message(&self, message: Message) { self.data .lock() @@ -78,6 +89,10 @@ impl TxPoolDb for MockDb { .contains_key(contract_id)) } + fn blob_exist(&self, blob_id: &BlobId) -> StorageResult { + Ok(self.data.lock().unwrap().blobs.contains_key(blob_id)) + } + fn message(&self, id: &Nonce) -> StorageResult> { Ok(self.data.lock().unwrap().messages.get(id).cloned()) } diff --git a/crates/services/txpool/src/ports.rs b/crates/services/txpool/src/ports.rs index c013703460a..2f051c274ae 100644 --- a/crates/services/txpool/src/ports.rs +++ b/crates/services/txpool/src/ports.rs @@ -13,6 +13,7 @@ use fuel_core_types::{ UtxoId, }, fuel_types::{ + BlobId, ContractId, Nonce, }, @@ -59,6 +60,8 @@ pub trait TxPoolDb: Send + Sync { fn contract_exist(&self, contract_id: &ContractId) -> StorageResult; + fn blob_exist(&self, blob_id: &BlobId) -> StorageResult; + fn message(&self, message_id: &Nonce) -> StorageResult>; } diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index abe7fe52293..1a99b76057e 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -343,10 +343,6 @@ impl self.txpool.lock().find_one(&id) } - pub fn find_dependent(&self, ids: Vec) -> Vec { - self.txpool.lock().find_dependent(&ids) - } - pub fn select_transactions(&self, max_gas: u64) -> Vec { let mut guard = self.txpool.lock(); let txs = guard.includable(); diff --git a/crates/services/txpool/src/txpool.rs b/crates/services/txpool/src/txpool.rs index 11aef526b2e..2e11d42989a 100644 --- a/crates/services/txpool/src/txpool.rs +++ b/crates/services/txpool/src/txpool.rs @@ -60,7 +60,6 @@ use fuel_core_types::{ services::executor::TransactionExecutionStatus, }; use std::{ - cmp::Reverse, collections::HashMap, ops::Deref, sync::Arc, @@ -173,27 +172,6 @@ impl TxPool { self.txs().get(hash).cloned() } - /// find all dependent tx and return them with requested dependencies in one list sorted by Price. - pub fn find_dependent(&self, hashes: &[TxId]) -> Vec { - let mut seen = HashMap::new(); - { - for hash in hashes { - if let Some(tx) = self.txs().get(hash) { - self.dependency().find_dependent( - tx.tx().clone(), - &mut seen, - self.txs(), - ); - } - } - } - let mut list: Vec<_> = seen.into_values().collect(); - // sort from high to low price - list.sort_by_key(|tx| Reverse(tx.tip())); - - list - } - /// The number of pending transaction in the pool. pub fn pending_number(&self) -> usize { self.by_hash.len() diff --git a/crates/services/txpool/src/txpool/tests.rs b/crates/services/txpool/src/txpool/tests.rs index 7030beca497..469266002ef 100644 --- a/crates/services/txpool/src/txpool/tests.rs +++ b/crates/services/txpool/src/txpool/tests.rs @@ -1,3 +1,5 @@ +#![allow(non_snake_case)] + use crate::{ service::test_helpers::MockTxPoolGasPrice, test_helpers::{ @@ -25,6 +27,9 @@ use fuel_core_types::{ input::coin::CoinPredicate, Address, AssetId, + BlobBody, + BlobId, + BlobIdExt, ConsensusParameters, Contract, Finalizable, @@ -46,11 +51,7 @@ use fuel_core_types::{ interpreter::MemoryInstance, }, }; -use std::{ - cmp::Reverse, - collections::HashMap, - vec, -}; +use std::vec; use super::check_single_tx; @@ -1027,71 +1028,6 @@ async fn sorted_out_tx_by_creation_instant() { assert_eq!(txs[3].id(), tx4_id, "Fourth should be tx4"); } -#[tokio::test] -async fn find_dependent_tx1_tx2() { - let mut context = TextContext::default(); - - let (_, gas_coin) = context.setup_coin(); - let (output, unset_input) = context.create_output_and_input(10_000); - let tx1 = TransactionBuilder::script(vec![], vec![]) - .tip(11) - .max_fee_limit(11) - .script_gas_limit(GAS_LIMIT) - .add_input(gas_coin) - .add_output(output) - .finalize_as_transaction(); - - let input = unset_input.into_input(UtxoId::new(tx1.id(&Default::default()), 0)); - let (output, unset_input) = context.create_output_and_input(7_500); - let tx2 = TransactionBuilder::script(vec![], vec![]) - .tip(10) - .max_fee_limit(10) - .script_gas_limit(GAS_LIMIT) - .add_input(input) - .add_output(output) - .finalize_as_transaction(); - - let input = unset_input.into_input(UtxoId::new(tx2.id(&Default::default()), 0)); - let tx3 = TransactionBuilder::script(vec![], vec![]) - .tip(9) - .max_fee_limit(9) - .script_gas_limit(GAS_LIMIT) - .add_input(input) - .finalize_as_transaction(); - - let tx1_id = tx1.id(&ChainId::default()); - let tx2_id = tx2.id(&ChainId::default()); - let tx3_id = tx3.id(&ChainId::default()); - - let mut txpool = context.build(); - let tx1 = check_unwrap_tx(tx1, &txpool.config).await; - let tx2 = check_unwrap_tx(tx2, &txpool.config).await; - let tx3 = check_unwrap_tx(tx3, &txpool.config).await; - - txpool - .insert_single(tx1) - .expect("Tx0 should be Ok, got Err"); - txpool - .insert_single(tx2) - .expect("Tx1 should be Ok, got Err"); - let tx3_result = txpool - .insert_single(tx3) - .expect("Tx2 should be Ok, got Err"); - - let mut seen = HashMap::new(); - txpool - .dependency() - .find_dependent(tx3_result.inserted, &mut seen, txpool.txs()); - - let mut list: Vec<_> = seen.into_values().collect(); - // sort from high to low price - list.sort_by_key(|tx| Reverse(tx.tip())); - assert_eq!(list.len(), 3, "We should have three items"); - assert_eq!(list[0].id(), tx1_id, "Tx1 should be first."); - assert_eq!(list[1].id(), tx2_id, "Tx2 should be second."); - assert_eq!(list[2].id(), tx3_id, "Tx3 should be third."); -} - #[tokio::test] async fn tx_at_least_min_gas_price_is_insertable() { let gas_price = 10; @@ -1421,3 +1357,141 @@ async fn predicate_that_returns_false_is_invalid() { "unexpected error: {err}", ) } + +#[tokio::test] +async fn insert_single__blob_tx_works() { + let program = vec![123; 123]; + let tx = TransactionBuilder::blob(BlobBody { + id: BlobId::compute(program.as_slice()), + witness_index: 0, + }) + .add_witness(program.into()) + .add_random_fee_input() + .finalize_as_transaction(); + + let config = Config { + utxo_validation: false, + ..Default::default() + }; + let context = TextContext::default().config(config); + let mut txpool = context.build(); + + // Given + let tx = check_unwrap_tx(tx, &txpool.config).await; + let id = tx.id(); + + // When + let result = txpool.insert_single(tx); + + // Then + let _ = result.expect("Should insert blob"); + assert!(txpool.by_hash.contains_key(&id)); +} + +#[tokio::test] +async fn insert_single__blob_tx_fails_if_blob_already_inserted_and_lower_tip() { + let program = vec![123; 123]; + let blob_id = BlobId::compute(program.as_slice()); + let tx = TransactionBuilder::blob(BlobBody { + id: blob_id, + witness_index: 0, + }) + .add_witness(program.clone().into()) + .add_random_fee_input() + .finalize_as_transaction(); + + let config = Config { + utxo_validation: false, + ..Default::default() + }; + let context = TextContext::default().config(config); + let mut txpool = context.build(); + let tx = check_unwrap_tx(tx, &txpool.config).await; + + // Given + txpool.insert_single(tx).unwrap(); + let same_blob_tx = TransactionBuilder::blob(BlobBody { + id: blob_id, + witness_index: 1, + }) + .add_random_fee_input() + .add_witness(program.into()) + .finalize_as_transaction(); + let same_blob_tx = check_unwrap_tx(same_blob_tx, &txpool.config).await; + + // When + let result = txpool.insert_single(same_blob_tx); + + // Then + assert_eq!(result, Err(Error::NotInsertedCollisionBlobId(blob_id))); +} + +#[tokio::test] +async fn insert_single__blob_tx_succeeds_if_blob_already_inserted_but_higher_tip() { + let program = vec![123; 123]; + let blob_id = BlobId::compute(program.as_slice()); + let tx = TransactionBuilder::blob(BlobBody { + id: blob_id, + witness_index: 0, + }) + .add_witness(program.clone().into()) + .add_random_fee_input() + .finalize_as_transaction(); + + let config = Config { + utxo_validation: false, + ..Default::default() + }; + let context = TextContext::default().config(config); + let mut txpool = context.build(); + let tx = check_unwrap_tx(tx, &txpool.config).await; + + // Given + txpool.insert_single(tx).unwrap(); + let same_blob_tx = TransactionBuilder::blob(BlobBody { + id: blob_id, + witness_index: 1, + }) + .add_random_fee_input() + .add_witness(program.into()) + .tip(100) + .max_fee_limit(100) + .finalize_as_transaction(); + let same_blob_tx = check_unwrap_tx(same_blob_tx, &txpool.config).await; + + // When + let result = txpool.insert_single(same_blob_tx); + + // Then + let _ = result.expect("Should insert transaction with the same blob but higher tip"); +} + +#[tokio::test] +async fn insert_single__blob_tx_fails_if_blob_already_exists_in_database() { + let program = vec![123; 123]; + let blob_id = BlobId::compute(program.as_slice()); + let tx = TransactionBuilder::blob(BlobBody { + id: blob_id, + witness_index: 0, + }) + .add_witness(program.clone().into()) + .add_random_fee_input() + .finalize_as_transaction(); + + let config = Config { + utxo_validation: false, + ..Default::default() + }; + let context = TextContext::default().config(config); + let mut txpool = context.build(); + let tx = check_unwrap_tx(tx, &txpool.config).await; + + // Given + txpool.database.0.insert_dummy_blob(blob_id); + + // When + let result = txpool.insert_single(tx); + + // Then + assert_eq!(result, Err(Error::NotInsertedBlobIdAlreadyTaken(blob_id))); +} diff --git a/crates/types/src/services/txpool.rs b/crates/types/src/services/txpool.rs index 050d1591f0b..32b5a791b61 100644 --- a/crates/types/src/services/txpool.rs +++ b/crates/types/src/services/txpool.rs @@ -43,6 +43,7 @@ use fuel_vm_private::{ CheckError, CheckedTransaction, }, + fuel_tx::BlobId, fuel_types::BlockHeight, }; use std::{ @@ -207,7 +208,7 @@ impl From<&PoolTransaction> for CheckedTransaction { /// The `removed` field contains the list of removed transactions during the insertion /// of the `inserted` transaction. -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct InsertionResult { /// This was inserted pub inserted: ArcPoolTx, @@ -326,6 +327,10 @@ pub enum Error { "Transaction is not inserted. More priced tx has created contract with ContractId {0:#x}" )] NotInsertedCollisionContractId(ContractId), + #[error( + "Transaction is not inserted. More priced tx has uploaded the blob with BlobId {0:#x}" + )] + NotInsertedCollisionBlobId(BlobId), #[error( "Transaction is not inserted. A higher priced tx {0:#x} is already spending this message: {1:#x}" )] @@ -336,6 +341,8 @@ pub enum Error { NotInsertedInputContractDoesNotExist(ContractId), #[error("Transaction is not inserted. ContractId is already taken {0:#x}")] NotInsertedContractIdAlreadyTaken(ContractId), + #[error("Transaction is not inserted. BlobId is already taken {0:#x}")] + NotInsertedBlobIdAlreadyTaken(BlobId), #[error("Transaction is not inserted. UTXO does not exist: {0:#x}")] NotInsertedInputUtxoIdNotDoesNotExist(UtxoId), #[error("Transaction is not inserted. UTXO is spent: {0:#x}")] @@ -390,6 +397,13 @@ pub enum Error { Other(String), } +#[cfg(feature = "test-helpers")] +impl PartialEq for Error { + fn eq(&self, other: &Self) -> bool { + self.to_string().eq(&other.to_string()) + } +} + impl From for Error { fn from(e: CheckError) -> Self { Error::ConsensusValidity(e) diff --git a/tests/tests/blob.rs b/tests/tests/blob.rs index cdbc76296c1..526c191642f 100644 --- a/tests/tests/blob.rs +++ b/tests/tests/blob.rs @@ -27,6 +27,7 @@ use fuel_core_types::{ fuel_types::canonical::Serialize, fuel_vm::checked_transaction::IntoChecked, }; +use tokio::io; struct TestContext { _node: FuelService, @@ -48,7 +49,10 @@ impl TestContext { Self { _node, client } } - async fn new_blob(&mut self, blob_data: Vec) -> (TransactionStatus, BlobId) { + async fn new_blob( + &mut self, + blob_data: Vec, + ) -> io::Result<(TransactionStatus, BlobId)> { let blob_id = BlobId::compute(&blob_data); let tx = TransactionBuilder::blob(BlobBody { id: blob_id, @@ -63,9 +67,8 @@ impl TestContext { let status = self .client .submit_and_await_commit(tx.transaction()) - .await - .expect("Cannot submit blob"); - (status, blob_id) + .await?; + Ok((status, blob_id)) } } @@ -77,7 +80,8 @@ async fn blob__upload_works() { // When let (status, blob_id) = ctx .new_blob([op::ret(RegId::ONE)].into_iter().collect()) - .await; + .await + .unwrap(); assert!(matches!(status, TransactionStatus::Success { .. })); // Then @@ -107,14 +111,15 @@ async fn blob__cannot_post_already_existing_blob() { // Given let mut ctx = TestContext::new().await; let payload: Vec = [op::ret(RegId::ONE)].into_iter().collect(); - let (status, _blob_id) = ctx.new_blob(payload.clone()).await; + let (status, _blob_id) = ctx.new_blob(payload.clone()).await.unwrap(); assert!(matches!(status, TransactionStatus::Success { .. })); // When - let (status, _blob_id) = ctx.new_blob(payload).await; + let result = ctx.new_blob(payload).await; // Then - assert!(matches!(status, TransactionStatus::SqueezedOut { .. })); + let err = result.expect_err("Should fail because of the same blob id"); + assert!(err.to_string().contains("BlobId is already taken")); } #[tokio::test]