diff --git a/CHANGELOG.md b/CHANGELOG.md index c39d22022c1..9db41dc31f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool - [2182](https://github.com/FuelLabs/fuel-core/pull/2151): Limit number of transactions that can be fetched via TxSource::next - [2189](https://github.com/FuelLabs/fuel-core/pull/2151): Select next DA height to never include more than u16::MAX -1 transactions from L1. - +- [2265](https://github.com/FuelLabs/fuel-core/pull/2265): Integrate Block Committer API for DA Block Costs. ### Changed diff --git a/crates/services/gas_price_service/src/v1/da_source_adapter.rs b/crates/services/gas_price_service/src/v1/da_source_adapter.rs index d984cdeed91..a1e26611920 100644 --- a/crates/services/gas_price_service/src/v1/da_source_adapter.rs +++ b/crates/services/gas_price_service/src/v1/da_source_adapter.rs @@ -24,7 +24,7 @@ pub const POLLING_INTERVAL_MS: u64 = 10_000; #[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] pub struct DaBlockCosts { - pub l2_block_range: core::ops::Range, + pub l2_block_range: core::ops::Range, pub blob_size_bytes: u32, pub blob_cost_wei: u128, } diff --git a/crates/services/gas_price_service/src/v1/da_source_adapter/block_committer_costs.rs b/crates/services/gas_price_service/src/v1/da_source_adapter/block_committer_costs.rs index 7c0beaaffbf..cc9179e540f 100644 --- a/crates/services/gas_price_service/src/v1/da_source_adapter/block_committer_costs.rs +++ b/crates/services/gas_price_service/src/v1/da_source_adapter/block_committer_costs.rs @@ -1,3 +1,5 @@ +#![allow(clippy::arithmetic_side_effects)] + use crate::v1::da_source_adapter::{ service::{ DaBlockCostsSource, @@ -6,57 +8,317 @@ use crate::v1::da_source_adapter::{ DaBlockCosts, }; use anyhow::anyhow; -use reqwest::Url; +use fuel_core_types::blockchain::primitives::DaBlockHeight; use serde::{ Deserialize, Serialize, }; -/// This struct is used to denote the block committer da gas price source,, +#[async_trait::async_trait] +trait BlockCommitterApi: Send + Sync { + /// Used on first run to get the latest costs and seqno + async fn get_latest_costs(&self) -> DaBlockCostsResult>; + /// Used to get the costs for a specific seqno + async fn get_costs_by_seqno( + &self, + number: u64, + ) -> DaBlockCostsResult>; + /// Used to get the costs for a range of blocks (inclusive) + async fn get_cost_bundles_by_range( + &self, + range: core::ops::Range, + ) -> DaBlockCostsResult>>; +} + +/// This struct is used to denote the block committer da block costs source /// which receives data from the block committer (only http api for now) -pub struct BlockCommitterDaBlockCosts { - client: reqwest::Client, - url: Url, +pub struct BlockCommitterDaBlockCosts { + client: BlockCommitter, + last_raw_da_block_costs: Option, } #[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)] -struct RawDaBlockCosts { - pub l2_block_range: core::ops::Range, - pub blob_size_bytes: u32, - pub blob_cost: u128, +pub struct RawDaBlockCosts { + /// Sequence number (Monotonically increasing nonce) + pub sequence_number: u64, + /// The range of blocks that the costs apply to + pub blocks_range: core::ops::Range, + /// The DA block height of the last transaction for the range of blocks + pub da_block_height: DaBlockHeight, + /// Rolling sum cost of posting blobs (wei) + pub total_cost: u128, + /// Rolling sum size of blobs (bytes) + pub total_size_bytes: u32, } -impl From for DaBlockCosts { - fn from(raw: RawDaBlockCosts) -> Self { +impl From<&RawDaBlockCosts> for DaBlockCosts { + fn from(raw_da_block_costs: &RawDaBlockCosts) -> Self { DaBlockCosts { - l2_block_range: raw.l2_block_range, - blob_size_bytes: raw.blob_size_bytes, - blob_cost_wei: raw.blob_cost, + l2_block_range: raw_da_block_costs.blocks_range.clone(), + blob_size_bytes: raw_da_block_costs.total_size_bytes, + blob_cost_wei: raw_da_block_costs.total_cost, } } } -impl BlockCommitterDaBlockCosts { - /// Create a new instance of the block committer da gas price source - pub fn new(url: String) -> Self { +impl BlockCommitterDaBlockCosts { + /// Create a new instance of the block committer da block costs source + pub fn new(client: BlockCommitter, last_value: Option) -> Self { Self { - client: reqwest::Client::new(), - url: Url::parse(&url).unwrap(), + client, + last_raw_da_block_costs: last_value, } } } #[async_trait::async_trait] -impl DaBlockCostsSource for BlockCommitterDaBlockCosts { +impl DaBlockCostsSource for BlockCommitterDaBlockCosts +where + BlockCommitter: BlockCommitterApi, +{ async fn request_da_block_cost(&mut self) -> DaBlockCostsResult { - let response = self.client.get(self.url.clone()).send().await?; - if !response.status().is_success() { - return Err(anyhow!("failed with response: {}", response.status())); + let raw_da_block_costs = match self.last_raw_da_block_costs { + Some(ref last_value) => self + .client + .get_costs_by_seqno(last_value.sequence_number + 1), + _ => self.client.get_latest_costs(), } - let response = response + .await?; + + let Some(ref raw_da_block_costs) = raw_da_block_costs else { + return Err(anyhow!("No response from block committer")) + }; + + let da_block_costs = self.last_raw_da_block_costs.iter().fold( + Ok(raw_da_block_costs.into()), + |costs: DaBlockCostsResult, last_value| { + let costs = costs.expect("Defined to be OK"); + let blob_size_bytes = costs + .blob_size_bytes + .checked_sub(last_value.total_size_bytes) + .ok_or(anyhow!("Blob size bytes underflow"))?; + let blob_cost_wei = raw_da_block_costs + .total_cost + .checked_sub(last_value.total_cost) + .ok_or(anyhow!("Blob cost wei underflow"))?; + Ok(DaBlockCosts { + blob_size_bytes, + blob_cost_wei, + ..costs + }) + }, + )?; + + self.last_raw_da_block_costs = Some(raw_da_block_costs.clone()); + Ok(da_block_costs) + } +} + +pub struct BlockCommitterHttpApi { + client: reqwest::Client, + url: String, +} + +impl BlockCommitterHttpApi { + pub fn new(url: String) -> Self { + Self { + client: reqwest::Client::new(), + url, + } + } +} + +#[async_trait::async_trait] +impl BlockCommitterApi for BlockCommitterHttpApi { + async fn get_latest_costs(&self) -> DaBlockCostsResult> { + let response = self + .client + .get(&self.url) + .send() + .await? + .json::() + .await?; + Ok(Some(response)) + } + + async fn get_costs_by_seqno( + &self, + number: u64, + ) -> DaBlockCostsResult> { + let response = self + .client + .get(&format!("{}/{}", self.url, number)) + .send() + .await? .json::() - .await - .map_err(|err| anyhow!(err))?; - Ok(response.into()) + .await?; + Ok(Some(response)) + } + + async fn get_cost_bundles_by_range( + &self, + range: core::ops::Range, + ) -> DaBlockCostsResult>> { + let response = self + .client + .get(&format!("{}/{}-{}", self.url, range.start, range.end)) + .send() + .await? + .json::>() + .await?; + Ok(response.into_iter().map(Some).collect()) + } +} + +#[cfg(test)] +#[allow(non_snake_case)] +mod tests { + use super::*; + + struct MockBlockCommitterApi { + value: Option, + } + + impl MockBlockCommitterApi { + fn new(value: Option) -> Self { + Self { value } + } + } + + #[async_trait::async_trait] + impl BlockCommitterApi for MockBlockCommitterApi { + async fn get_latest_costs(&self) -> DaBlockCostsResult> { + Ok(self.value.clone()) + } + async fn get_costs_by_seqno( + &self, + seq_no: u64, + ) -> DaBlockCostsResult> { + // arbitrary logic to generate a new value + let mut value = self.value.clone(); + if let Some(value) = &mut value { + value.sequence_number = seq_no; + value.blocks_range = + value.blocks_range.end * seq_no..value.blocks_range.end * seq_no + 10; + value.da_block_height = value.da_block_height + (seq_no + 1).into(); + value.total_cost += 1; + value.total_size_bytes += 1; + } + Ok(value) + } + async fn get_cost_bundles_by_range( + &self, + _: core::ops::Range, + ) -> DaBlockCostsResult>> { + Ok(vec![self.value.clone()]) + } + } + + fn test_da_block_costs() -> RawDaBlockCosts { + RawDaBlockCosts { + sequence_number: 1, + blocks_range: 0..10, + da_block_height: 1u64.into(), + total_cost: 1, + total_size_bytes: 1, + } + } + + #[tokio::test] + async fn request_da_block_cost__when_last_value_is_none__then_get_latest_costs_is_called( + ) { + // given + let da_block_costs = test_da_block_costs(); + let expected = (&da_block_costs).into(); + let mock_api = MockBlockCommitterApi::new(Some(da_block_costs)); + let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None); + + // when + let actual = block_committer.request_da_block_cost().await.unwrap(); + + // then + assert_eq!(actual, expected); + } + + #[tokio::test] + async fn request_da_block_cost__when_last_value_is_some__then_get_costs_by_seqno_is_called( + ) { + // given + let mut da_block_costs = test_da_block_costs(); + let mock_api = MockBlockCommitterApi::new(Some(da_block_costs.clone())); + let mut block_committer = + BlockCommitterDaBlockCosts::new(mock_api, Some(da_block_costs.clone())); + + // when + let actual = block_committer.request_da_block_cost().await.unwrap(); + + // then + assert_ne!(da_block_costs.blocks_range, actual.l2_block_range); + } + + #[tokio::test] + async fn request_da_block_cost__when_response_is_none__then_error() { + // given + let mock_api = MockBlockCommitterApi::new(None); + let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None); + + // when + let result = block_committer.request_da_block_cost().await; + + // then + assert!(result.is_err()); + } + + struct UnderflowingMockBlockCommitterApi { + value: Option, + } + + impl UnderflowingMockBlockCommitterApi { + fn new(value: Option) -> Self { + Self { value } + } + } + + #[async_trait::async_trait] + impl BlockCommitterApi for UnderflowingMockBlockCommitterApi { + async fn get_latest_costs(&self) -> DaBlockCostsResult> { + Ok(self.value.clone()) + } + async fn get_costs_by_seqno( + &self, + seq_no: u64, + ) -> DaBlockCostsResult> { + // arbitrary logic to generate a new value + let mut value = self.value.clone(); + if let Some(value) = &mut value { + value.sequence_number = seq_no; + value.blocks_range = value.blocks_range.end..value.blocks_range.end + 10; + value.da_block_height = value.da_block_height + 1u64.into(); + value.total_cost -= 1; + value.total_size_bytes -= 1; + } + Ok(value) + } + async fn get_cost_bundles_by_range( + &self, + _: core::ops::Range, + ) -> DaBlockCostsResult>> { + Ok(vec![self.value.clone()]) + } + } + + #[tokio::test] + async fn request_da_block_cost__when_underflow__then_error() { + // given + let da_block_costs = test_da_block_costs(); + let mock_api = UnderflowingMockBlockCommitterApi::new(Some(da_block_costs)); + let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None); + let _ = block_committer.request_da_block_cost().await.unwrap(); + + // when + let result = block_committer.request_da_block_cost().await; + + // then + assert!(result.is_err()); } }