From ba0fa15cd05d87d639f071d7a6318f9eab29d97d Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Thu, 14 Nov 2024 11:23:17 +0100 Subject: [PATCH] feat(gas_price_service_v1): define RunnableTask for GasPriceServiceV1 (#2416) > [!NOTE] > Some values for the tests need expert opinion from @MitchTurner. A follow up PR will be created to define the `UninitializedTask` that wraps over this task. ## Linked Issues/PRs part of https://github.com/FuelLabs/fuel-core/issues/2140, but doesn't close it yet. ## Description - We define a new `RunnableTask`, `GasPriceServiceV1`, which uses the da block cost source in tandem with the l2 block source - Tests for the same - Casts to and from the v1 algorithm updater - we take a direct dependency on v0's metadata so that we can migrate between the two. ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --- CHANGELOG.md | 2 + .../client/assets/debugAdapterProtocol.json | 2 +- crates/fuel-gas-price-algorithm/src/v1.rs | 1 + .../v1/tests/update_da_record_data_tests.rs | 163 ++++++ .../src/common/fuel_core_storage_adapter.rs | 6 + .../gas_price_service/src/common/utils.rs | 4 + .../gas_price_service/src/v0/service.rs | 3 + .../gas_price_service/src/v0/tests.rs | 4 + crates/services/gas_price_service/src/v1.rs | 1 + .../gas_price_service/src/v1/algorithm.rs | 7 +- .../src/v1/da_source_service.rs | 2 +- .../block_committer_costs.rs | 23 +- .../src/v1/da_source_service/service.rs | 1 + .../gas_price_service/src/v1/metadata.rs | 74 ++- .../gas_price_service/src/v1/service.rs | 516 ++++++++++++++++++ 15 files changed, 778 insertions(+), 31 deletions(-) create mode 100644 crates/services/gas_price_service/src/v1/service.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 4385b4daae4..89d77a99f37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit. - [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer. - [2327](https://github.com/FuelLabs/fuel-core/pull/2327): Add more services tests and more checks of the pool. Also add an high level documentation for users of the pool and contributors. +- [2416](https://github.com/FuelLabs/fuel-core/issues/2416): Define the `GasPriceServiceV1` task. + ### Fixed - [2366](https://github.com/FuelLabs/fuel-core/pull/2366): The `importer_gas_price_for_block` metric is properly collected. diff --git a/crates/client/assets/debugAdapterProtocol.json b/crates/client/assets/debugAdapterProtocol.json index 44a0c2eed9c..b435aef0f85 100644 --- a/crates/client/assets/debugAdapterProtocol.json +++ b/crates/client/assets/debugAdapterProtocol.json @@ -1440,7 +1440,7 @@ { "$ref": "#/definitions/Request" }, { "type": "object", - "description": "Replaces all existing instruction breakpoints. Typically, instruction breakpoints would be set from a diassembly window. \nTo clear all instruction breakpoints, specify an empty array.\nWhen an instruction breakpoint is hit, a 'stopped' event (with reason 'instruction breakpoint') is generated.\nClients should only call this request if the capability 'supportsInstructionBreakpoints' is true.", + "description": "Replaces all existing instruction breakpoints. Typically, instruction breakpoints would be set from a disassembly window. \nTo clear all instruction breakpoints, specify an empty array.\nWhen an instruction breakpoint is hit, a 'stopped' event (with reason 'instruction breakpoint') is generated.\nClients should only call this request if the capability 'supportsInstructionBreakpoints' is true.", "properties": { "command": { "type": "string", diff --git a/crates/fuel-gas-price-algorithm/src/v1.rs b/crates/fuel-gas-price-algorithm/src/v1.rs index 9a9da066fee..a3ace733423 100644 --- a/crates/fuel-gas-price-algorithm/src/v1.rs +++ b/crates/fuel-gas-price-algorithm/src/v1.rs @@ -313,6 +313,7 @@ impl AlgorithmUpdaterV1 { if !height_range.is_empty() { self.da_block_update(height_range, range_cost)?; self.recalculate_projected_cost(); + self.update_da_gas_price(); } Ok(()) } diff --git a/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs b/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs index 123520b4e99..efa0dc62d6f 100644 --- a/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs +++ b/crates/fuel-gas-price-algorithm/src/v1/tests/update_da_record_data_tests.rs @@ -270,3 +270,166 @@ fn update_da_record_data__da_block_updates_projected_total_cost_with_known_and_g let expected = new_known_total_cost + guessed_part; assert_eq!(actual, expected as u128); } + +#[test] +fn update_da_record_data__da_block_lowers_da_gas_price() { + // given + let da_cost_per_byte = 40; + let da_recorded_block_height = 10; + let l2_block_height = 11; + let original_known_total_cost = 150; + let unrecorded_blocks = vec![BlockBytes { + height: 11, + block_bytes: 3000, + }]; + let da_p_component = 2; + let guessed_cost: u64 = unrecorded_blocks + .iter() + .map(|block| block.block_bytes * da_cost_per_byte) + .sum(); + let projected_total_cost = original_known_total_cost + guessed_cost; + + let mut updater = UpdaterBuilder::new() + .with_da_cost_per_byte(da_cost_per_byte as u128) + .with_da_p_component(da_p_component) + .with_last_profit(10, 0) + .with_da_recorded_block_height(da_recorded_block_height) + .with_l2_block_height(l2_block_height) + .with_projected_total_cost(projected_total_cost as u128) + .with_known_total_cost(original_known_total_cost as u128) + .with_unrecorded_blocks(unrecorded_blocks.clone()) + .build(); + + let new_cost_per_byte = 100; + let (recorded_heights, recorded_cost) = + unrecorded_blocks + .iter() + .fold((vec![], 0), |(mut range, cost), block| { + range.push(block.height); + (range, cost + block.block_bytes * new_cost_per_byte) + }); + let min = recorded_heights.iter().min().unwrap(); + let max = recorded_heights.iter().max().unwrap(); + let recorded_range = *min..(max + 1); + + let old_da_gas_price = updater.new_scaled_da_gas_price; + + // when + updater + .update_da_record_data(recorded_range, recorded_cost as u128) + .unwrap(); + + // then + let new_da_gas_price = updater.new_scaled_da_gas_price; + // because the profit is 10 and the da_p_component is 2, the new da gas price should be lesser than the previous one. + assert_eq!(new_da_gas_price, 0); + assert_ne!(old_da_gas_price, new_da_gas_price); +} + +#[test] +fn update_da_record_data__da_block_increases_da_gas_price() { + // given + let da_cost_per_byte = 40; + let da_recorded_block_height = 10; + let l2_block_height = 11; + let original_known_total_cost = 150; + let unrecorded_blocks = vec![BlockBytes { + height: 11, + block_bytes: 3000, + }]; + let da_p_component = 2; + let guessed_cost: u64 = unrecorded_blocks + .iter() + .map(|block| block.block_bytes * da_cost_per_byte) + .sum(); + let projected_total_cost = original_known_total_cost + guessed_cost; + + let mut updater = UpdaterBuilder::new() + .with_da_cost_per_byte(da_cost_per_byte as u128) + .with_da_p_component(da_p_component) + .with_last_profit(-10, 0) + .with_da_recorded_block_height(da_recorded_block_height) + .with_l2_block_height(l2_block_height) + .with_projected_total_cost(projected_total_cost as u128) + .with_known_total_cost(original_known_total_cost as u128) + .with_unrecorded_blocks(unrecorded_blocks.clone()) + .build(); + + let new_cost_per_byte = 100; + let (recorded_heights, recorded_cost) = + unrecorded_blocks + .iter() + .fold((vec![], 0), |(mut range, cost), block| { + range.push(block.height); + (range, cost + block.block_bytes * new_cost_per_byte) + }); + let min = recorded_heights.iter().min().unwrap(); + let max = recorded_heights.iter().max().unwrap(); + let recorded_range = *min..(max + 1); + + let old_da_gas_price = updater.new_scaled_da_gas_price; + + // when + updater + .update_da_record_data(recorded_range, recorded_cost as u128) + .unwrap(); + + // then + let new_da_gas_price = updater.new_scaled_da_gas_price; + // because the profit is -10 and the da_p_component is 2, the new da gas price should be greater than the previous one. + assert_eq!(new_da_gas_price, 6); + assert_ne!(old_da_gas_price, new_da_gas_price); +} + +#[test] +fn update_da_record_data__da_block_will_not_change_da_gas_price() { + // given + let da_cost_per_byte = 40; + let da_recorded_block_height = 10; + let l2_block_height = 11; + let original_known_total_cost = 150; + let unrecorded_blocks = vec![BlockBytes { + height: 11, + block_bytes: 3000, + }]; + let da_p_component = 2; + let guessed_cost: u64 = unrecorded_blocks + .iter() + .map(|block| block.block_bytes * da_cost_per_byte) + .sum(); + let projected_total_cost = original_known_total_cost + guessed_cost; + + let mut updater = UpdaterBuilder::new() + .with_da_cost_per_byte(da_cost_per_byte as u128) + .with_da_p_component(da_p_component) + .with_last_profit(0, 0) + .with_da_recorded_block_height(da_recorded_block_height) + .with_l2_block_height(l2_block_height) + .with_projected_total_cost(projected_total_cost as u128) + .with_known_total_cost(original_known_total_cost as u128) + .with_unrecorded_blocks(unrecorded_blocks.clone()) + .build(); + + let new_cost_per_byte = 100; + let (recorded_heights, recorded_cost) = + unrecorded_blocks + .iter() + .fold((vec![], 0), |(mut range, cost), block| { + range.push(block.height); + (range, cost + block.block_bytes * new_cost_per_byte) + }); + let min = recorded_heights.iter().min().unwrap(); + let max = recorded_heights.iter().max().unwrap(); + let recorded_range = *min..(max + 1); + + let old_da_gas_price = updater.new_scaled_da_gas_price; + + // when + updater + .update_da_record_data(recorded_range, recorded_cost as u128) + .unwrap(); + + // then + let new_da_gas_price = updater.new_scaled_da_gas_price; + assert_eq!(old_da_gas_price, new_da_gas_price); +} diff --git a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs index 4e76ee88e2f..e81ea58b2c3 100644 --- a/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs +++ b/crates/services/gas_price_service/src/common/fuel_core_storage_adapter.rs @@ -17,6 +17,10 @@ use crate::{ ports::MetadataStorage, }; use fuel_core_storage::{ + codec::{ + postcard::Postcard, + Encode, + }, kv_store::KeyValueInspect, structured_storage::StructuredStorage, transactional::{ @@ -101,6 +105,8 @@ pub fn get_block_info( height: (*block.header().height()).into(), gas_used: used_gas, block_gas_capacity: block_gas_limit, + block_bytes: Postcard::encode(block).len() as u64, + block_fees: fee, }; Ok(info) } diff --git a/crates/services/gas_price_service/src/common/utils.rs b/crates/services/gas_price_service/src/common/utils.rs index 712e288adca..a3813e53e4f 100644 --- a/crates/services/gas_price_service/src/common/utils.rs +++ b/crates/services/gas_price_service/src/common/utils.rs @@ -36,5 +36,9 @@ pub enum BlockInfo { gas_used: u64, // Total gas capacity of the block block_gas_capacity: u64, + // The size of block in bytes + block_bytes: u64, + // The fees the block has collected + block_fees: u64, }, } diff --git a/crates/services/gas_price_service/src/v0/service.rs b/crates/services/gas_price_service/src/v0/service.rs index ee062097dfd..7ef806fdd23 100644 --- a/crates/services/gas_price_service/src/v0/service.rs +++ b/crates/services/gas_price_service/src/v0/service.rs @@ -104,6 +104,7 @@ where height, gas_used, block_gas_capacity, + .. } => { self.handle_normal_block(height, gas_used, block_gas_capacity) .await?; @@ -225,6 +226,8 @@ mod tests { height: block_height, gas_used: 60, block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, }; let (l2_block_sender, l2_block_receiver) = mpsc::channel(1); diff --git a/crates/services/gas_price_service/src/v0/tests.rs b/crates/services/gas_price_service/src/v0/tests.rs index c395b39aeba..7526975e467 100644 --- a/crates/services/gas_price_service/src/v0/tests.rs +++ b/crates/services/gas_price_service/src/v0/tests.rs @@ -147,6 +147,8 @@ async fn next_gas_price__affected_by_new_l2_block() { height: 1, gas_used: 60, block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, }; let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); let l2_block_source = FakeL2BlockSource { @@ -186,6 +188,8 @@ async fn next__new_l2_block_saves_old_metadata() { height: 1, gas_used: 60, block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, }; let (l2_block_sender, l2_block_receiver) = tokio::sync::mpsc::channel(1); let l2_block_source = FakeL2BlockSource { diff --git a/crates/services/gas_price_service/src/v1.rs b/crates/services/gas_price_service/src/v1.rs index 4770112762a..fafb7245ef8 100644 --- a/crates/services/gas_price_service/src/v1.rs +++ b/crates/services/gas_price_service/src/v1.rs @@ -1,3 +1,4 @@ pub mod algorithm; pub mod da_source_service; pub mod metadata; +pub mod service; diff --git a/crates/services/gas_price_service/src/v1/algorithm.rs b/crates/services/gas_price_service/src/v1/algorithm.rs index 92333d8cce6..c7f87aed2ad 100644 --- a/crates/services/gas_price_service/src/v1/algorithm.rs +++ b/crates/services/gas_price_service/src/v1/algorithm.rs @@ -1,4 +1,7 @@ -use crate::common::gas_price_algorithm::GasPriceAlgorithm; +use crate::common::gas_price_algorithm::{ + GasPriceAlgorithm, + SharedGasPriceAlgo, +}; use fuel_core_types::fuel_types::BlockHeight; use fuel_gas_price_algorithm::v1::AlgorithmV1; @@ -11,3 +14,5 @@ impl GasPriceAlgorithm for AlgorithmV1 { self.worst_case(block_height.into()) } } + +pub type SharedV1Algorithm = SharedGasPriceAlgo; diff --git a/crates/services/gas_price_service/src/v1/da_source_service.rs b/crates/services/gas_price_service/src/v1/da_source_service.rs index eb699233455..840345bff98 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service.rs @@ -7,7 +7,7 @@ pub mod service; #[derive(Debug, Default, Clone, Eq, Hash, PartialEq)] pub struct DaBlockCosts { - pub l2_block_range: core::ops::Range, + pub l2_block_range: core::ops::Range, pub blob_size_bytes: u32, pub blob_cost_wei: u128, } diff --git a/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs index 7016fe0da08..e0629cc0edd 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/block_committer_costs.rs @@ -21,12 +21,12 @@ trait BlockCommitterApi: Send + Sync { /// Used to get the costs for a specific seqno async fn get_costs_by_seqno( &self, - number: u64, + number: u32, ) -> DaBlockCostsResult>; /// Used to get the costs for a range of blocks (inclusive) async fn get_cost_bundles_by_range( &self, - range: core::ops::Range, + range: core::ops::Range, ) -> DaBlockCostsResult>>; } @@ -40,9 +40,9 @@ pub struct BlockCommitterDaBlockCosts { #[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)] pub struct RawDaBlockCosts { /// Sequence number (Monotonically increasing nonce) - pub sequence_number: u64, + pub sequence_number: u32, /// The range of blocks that the costs apply to - pub blocks_range: core::ops::Range, + pub blocks_range: core::ops::Range, /// The DA block height of the last transaction for the range of blocks pub da_block_height: DaBlockHeight, /// Rolling sum cost of posting blobs (wei) @@ -143,7 +143,7 @@ impl BlockCommitterApi for BlockCommitterHttpApi { async fn get_costs_by_seqno( &self, - number: u64, + number: u32, ) -> DaBlockCostsResult> { let response = self .client @@ -157,7 +157,7 @@ impl BlockCommitterApi for BlockCommitterHttpApi { async fn get_cost_bundles_by_range( &self, - range: core::ops::Range, + range: core::ops::Range, ) -> DaBlockCostsResult>> { let response = self .client @@ -192,7 +192,7 @@ mod tests { } async fn get_costs_by_seqno( &self, - seq_no: u64, + seq_no: u32, ) -> DaBlockCostsResult> { // arbitrary logic to generate a new value let mut value = self.value.clone(); @@ -200,7 +200,8 @@ mod tests { value.sequence_number = seq_no; value.blocks_range = value.blocks_range.end * seq_no..value.blocks_range.end * seq_no + 10; - value.da_block_height = value.da_block_height + (seq_no + 1).into(); + value.da_block_height = + value.da_block_height + ((seq_no + 1) as u64).into(); value.total_cost += 1; value.total_size_bytes += 1; } @@ -208,7 +209,7 @@ mod tests { } async fn get_cost_bundles_by_range( &self, - _: core::ops::Range, + _: core::ops::Range, ) -> DaBlockCostsResult>> { Ok(vec![self.value.clone()]) } @@ -286,7 +287,7 @@ mod tests { } async fn get_costs_by_seqno( &self, - seq_no: u64, + seq_no: u32, ) -> DaBlockCostsResult> { // arbitrary logic to generate a new value let mut value = self.value.clone(); @@ -301,7 +302,7 @@ mod tests { } async fn get_cost_bundles_by_range( &self, - _: core::ops::Range, + _: core::ops::Range, ) -> DaBlockCostsResult>> { Ok(vec![self.value.clone()]) } diff --git a/crates/services/gas_price_service/src/v1/da_source_service/service.rs b/crates/services/gas_price_service/src/v1/da_source_service/service.rs index 1f9cadf02dd..e20a506a097 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/service.rs @@ -15,6 +15,7 @@ use tokio::{ use crate::v1::da_source_service::DaBlockCosts; pub use anyhow::Result; +use fuel_core_services::stream::BoxFuture; #[derive(Clone)] pub struct SharedState(Sender); diff --git a/crates/services/gas_price_service/src/v1/metadata.rs b/crates/services/gas_price_service/src/v1/metadata.rs index 422cdf42507..c5a1e1c0447 100644 --- a/crates/services/gas_price_service/src/v1/metadata.rs +++ b/crates/services/gas_price_service/src/v1/metadata.rs @@ -1,7 +1,6 @@ use crate::v0::metadata::V0Metadata; use fuel_gas_price_algorithm::v1::{ AlgorithmUpdaterV1, - ClampedPercentage, L2ActivityTracker, }; use std::num::NonZeroU64; @@ -41,7 +40,7 @@ pub struct V1Metadata { impl V1Metadata { pub fn construct_from_v0_metadata( v0_metadata: V0Metadata, - config: V1AlgorithmConfig, + config: &V1AlgorithmConfig, ) -> anyhow::Result { let metadata = Self { new_scaled_exec_price: v0_metadata @@ -68,19 +67,56 @@ impl V1Metadata { } pub struct V1AlgorithmConfig { - new_exec_gas_price: u64, - min_exec_gas_price: u64, - exec_gas_price_change_percent: u16, - l2_block_fullness_threshold_percent: u8, - gas_price_factor: NonZeroU64, - min_da_gas_price: u64, - max_da_gas_price_change_percent: u16, - da_p_component: i64, - da_d_component: i64, - normal_range_size: u16, - capped_range_size: u16, - decrease_range_size: u16, - block_activity_threshold: u8, + pub new_exec_gas_price: u64, + pub min_exec_gas_price: u64, + pub exec_gas_price_change_percent: u16, + pub l2_block_fullness_threshold_percent: u8, + pub gas_price_factor: NonZeroU64, + pub min_da_gas_price: u64, + pub max_da_gas_price_change_percent: u16, + pub da_p_component: i64, + pub da_d_component: i64, + pub normal_range_size: u16, + pub capped_range_size: u16, + pub decrease_range_size: u16, + pub block_activity_threshold: u8, + pub unrecorded_blocks: Vec<(u32, u64)>, +} + +impl From<&V1AlgorithmConfig> for AlgorithmUpdaterV1 { + fn from(value: &V1AlgorithmConfig) -> Self { + let l2_activity = L2ActivityTracker::new_full( + value.normal_range_size, + value.capped_range_size, + value.decrease_range_size, + value.block_activity_threshold.into(), + ); + let unrecorded_blocks = value.unrecorded_blocks.clone().into_iter().collect(); + Self { + new_scaled_exec_price: value.new_exec_gas_price, + l2_block_height: 0, + new_scaled_da_gas_price: value.min_da_gas_price, + gas_price_factor: value.gas_price_factor, + total_da_rewards_excess: 0, + da_recorded_block_height: 0, + latest_known_total_da_cost_excess: 0, + projected_total_da_cost: 0, + last_profit: 0, + second_to_last_profit: 0, + latest_da_cost_per_byte: 0, + l2_activity, + min_exec_gas_price: value.min_exec_gas_price, + exec_gas_price_change_percent: value.exec_gas_price_change_percent, + l2_block_fullness_threshold_percent: value + .l2_block_fullness_threshold_percent + .into(), + min_da_gas_price: value.min_da_gas_price, + max_da_gas_price_change_percent: value.max_da_gas_price_change_percent, + da_p_component: value.da_p_component, + da_d_component: value.da_d_component, + unrecorded_blocks, + } + } } impl From for V1Metadata { @@ -104,7 +140,7 @@ impl From for V1Metadata { pub fn v1_algorithm_from_metadata( metadata: V1Metadata, - config: V1AlgorithmConfig, + config: &V1AlgorithmConfig, ) -> AlgorithmUpdaterV1 { let l2_activity = L2ActivityTracker::new_full( config.normal_range_size, @@ -112,7 +148,11 @@ pub fn v1_algorithm_from_metadata( config.decrease_range_size, config.block_activity_threshold.into(), ); - let unrecorded_blocks = metadata.unrecorded_blocks.into_iter().collect(); + let unrecorded_blocks = metadata + .unrecorded_blocks + .into_iter() + .chain(config.unrecorded_blocks.clone()) + .collect(); AlgorithmUpdaterV1 { new_scaled_exec_price: metadata.new_scaled_exec_price, l2_block_height: metadata.l2_block_height, diff --git a/crates/services/gas_price_service/src/v1/service.rs b/crates/services/gas_price_service/src/v1/service.rs new file mode 100644 index 00000000000..5ab56160334 --- /dev/null +++ b/crates/services/gas_price_service/src/v1/service.rs @@ -0,0 +1,516 @@ +use crate::{ + common::{ + gas_price_algorithm::SharedGasPriceAlgo, + l2_block_source::L2BlockSource, + updater_metadata::UpdaterMetadata, + utils::BlockInfo, + }, + ports::MetadataStorage, + v0::metadata::V0Metadata, + v1::{ + algorithm::SharedV1Algorithm, + da_source_service::{ + service::{ + DaBlockCostsSource, + DaSourceService, + SharedState as DaSharedState, + }, + DaBlockCosts, + }, + metadata::{ + v1_algorithm_from_metadata, + V1AlgorithmConfig, + V1Metadata, + }, + }, +}; +use anyhow::anyhow; +use async_trait::async_trait; +use fuel_core_services::{ + RunnableService, + RunnableTask, + StateWatcher, +}; +use fuel_gas_price_algorithm::{ + v0::AlgorithmUpdaterV0, + v1::{ + AlgorithmUpdaterV1, + AlgorithmV1, + }, +}; +use futures::FutureExt; +use std::num::NonZeroU64; +use tokio::sync::broadcast::Receiver; + +/// The service that updates the gas price algorithm. +pub struct GasPriceServiceV1 +where + DA: DaBlockCostsSource, +{ + /// The algorithm that can be used in the next block + shared_algo: SharedV1Algorithm, + /// The L2 block source + l2_block_source: L2, + /// The metadata storage + metadata_storage: Metadata, + /// The algorithm updater + algorithm_updater: AlgorithmUpdaterV1, + /// the da source adapter handle + da_source_adapter_handle: DaSourceService, + /// The da source channel + da_source_channel: Receiver, +} + +impl GasPriceServiceV1 +where + Metadata: MetadataStorage, + DA: DaBlockCostsSource, +{ + pub fn new( + l2_block_source: L2, + metadata_storage: Metadata, + shared_algo: SharedV1Algorithm, + algorithm_updater: AlgorithmUpdaterV1, + da_source_adapter_handle: DaSourceService, + ) -> Self { + let da_source_channel = + da_source_adapter_handle.shared_data().clone().subscribe(); + Self { + shared_algo, + l2_block_source, + metadata_storage, + algorithm_updater, + da_source_adapter_handle, + da_source_channel, + } + } + + pub fn algorithm_updater(&self) -> &AlgorithmUpdaterV1 { + &self.algorithm_updater + } + + pub fn next_block_algorithm(&self) -> SharedV1Algorithm { + self.shared_algo.clone() + } + + async fn update(&mut self, new_algorithm: AlgorithmV1) { + self.shared_algo.update(new_algorithm).await; + } + + fn validate_block_gas_capacity( + &self, + block_gas_capacity: u64, + ) -> anyhow::Result { + NonZeroU64::new(block_gas_capacity) + .ok_or_else(|| anyhow!("Block gas capacity must be non-zero")) + } + + async fn set_metadata(&mut self) -> anyhow::Result<()> { + let metadata: UpdaterMetadata = self.algorithm_updater.clone().into(); + self.metadata_storage + .set_metadata(&metadata) + .map_err(|err| anyhow!(err)) + } + + async fn handle_normal_block( + &mut self, + height: u32, + gas_used: u64, + block_gas_capacity: u64, + block_bytes: u64, + block_fees: u64, + ) -> anyhow::Result<()> { + let capacity = self.validate_block_gas_capacity(block_gas_capacity)?; + + self.algorithm_updater.update_l2_block_data( + height, + gas_used, + capacity, + block_bytes, + block_fees as u128, + )?; + + self.set_metadata().await?; + Ok(()) + } + + async fn handle_da_block_costs( + &mut self, + da_block_costs: DaBlockCosts, + ) -> anyhow::Result<()> { + self.algorithm_updater.update_da_record_data( + da_block_costs.l2_block_range, + da_block_costs.blob_cost_wei, + )?; + + self.set_metadata().await?; + Ok(()) + } + + async fn apply_block_info_to_gas_algorithm( + &mut self, + l2_block: BlockInfo, + ) -> anyhow::Result<()> { + match l2_block { + BlockInfo::GenesisBlock => { + self.set_metadata().await?; + } + BlockInfo::Block { + height, + gas_used, + block_gas_capacity, + block_bytes, + block_fees, + } => { + self.handle_normal_block( + height, + gas_used, + block_gas_capacity, + block_bytes, + block_fees, + ) + .await?; + } + } + + self.update(self.algorithm_updater.algorithm()).await; + Ok(()) + } + + async fn apply_da_block_costs_to_gas_algorithm( + &mut self, + da_block_costs: DaBlockCosts, + ) -> anyhow::Result<()> { + self.handle_da_block_costs(da_block_costs).await?; + self.update(self.algorithm_updater.algorithm()).await; + Ok(()) + } +} + +#[async_trait] +impl RunnableTask for GasPriceServiceV1 +where + L2: L2BlockSource, + Metadata: MetadataStorage, + DA: DaBlockCostsSource, +{ + async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + let should_continue; + tokio::select! { + biased; + _ = watcher.while_started() => { + tracing::debug!("Stopping gas price service"); + should_continue = false; + } + l2_block_res = self.l2_block_source.get_l2_block() => { + tracing::info!("Received L2 block result: {:?}", l2_block_res); + let block = l2_block_res?; + + tracing::debug!("Updating gas price algorithm"); + self.apply_block_info_to_gas_algorithm(block).await?; + should_continue = true; + } + da_block_costs = self.da_source_channel.recv() => { + tracing::info!("Received DA block costs: {:?}", da_block_costs); + let da_block_costs = da_block_costs?; + + tracing::debug!("Updating DA block costs"); + self.apply_da_block_costs_to_gas_algorithm(da_block_costs).await?; + should_continue = true; + } + } + Ok(should_continue) + } + + async fn shutdown(mut self) -> anyhow::Result<()> { + // handle all the remaining l2 blocks + while let Some(Ok(block)) = self.l2_block_source.get_l2_block().now_or_never() { + tracing::debug!("Updating gas price algorithm"); + self.apply_block_info_to_gas_algorithm(block).await?; + } + + while let Ok(da_block_costs) = self.da_source_channel.try_recv() { + tracing::debug!("Updating DA block costs"); + self.apply_da_block_costs_to_gas_algorithm(da_block_costs) + .await?; + } + + // run shutdown hooks for internal services + self.da_source_adapter_handle.shutdown().await?; + + Ok(()) + } +} + +fn convert_to_v1_metadata( + updater_metadata: UpdaterMetadata, + config: &V1AlgorithmConfig, +) -> crate::common::utils::Result { + if let Ok(v1_metadata) = V1Metadata::try_from(updater_metadata.clone()) { + Ok(v1_metadata) + } else { + let v0_metadata = V0Metadata::try_from(updater_metadata).map_err(|_| { + crate::common::utils::Error::CouldNotInitUpdater(anyhow::anyhow!( + "Could not convert metadata to V0Metadata" + )) + })?; + V1Metadata::construct_from_v0_metadata(v0_metadata, config).map_err(|err| { + crate::common::utils::Error::CouldNotInitUpdater(anyhow::anyhow!(err)) + }) + } +} + +pub fn initialize_algorithm( + config: &V1AlgorithmConfig, + latest_block_height: u32, + metadata_storage: &Metadata, +) -> crate::common::utils::Result<(AlgorithmUpdaterV1, SharedV1Algorithm)> +where + Metadata: MetadataStorage, +{ + let algorithm_updater; + if let Some(updater_metadata) = metadata_storage + .get_metadata(&latest_block_height.into()) + .map_err(|err| { + crate::common::utils::Error::CouldNotInitUpdater(anyhow::anyhow!(err)) + })? + { + let v1_metadata = convert_to_v1_metadata(updater_metadata, config)?; + algorithm_updater = v1_algorithm_from_metadata(v1_metadata, config); + } else { + algorithm_updater = AlgorithmUpdaterV1::from(config); + } + + let shared_algo = + SharedGasPriceAlgo::new_with_algorithm(algorithm_updater.algorithm()); + + Ok((algorithm_updater, shared_algo)) +} + +#[allow(clippy::arithmetic_side_effects)] +#[allow(non_snake_case)] +#[cfg(test)] +mod tests { + use crate::{ + common::{ + l2_block_source::L2BlockSource, + updater_metadata::UpdaterMetadata, + utils::{ + BlockInfo, + Result as GasPriceResult, + }, + }, + ports::MetadataStorage, + v1::{ + da_source_service::{ + dummy_costs::DummyDaBlockCosts, + service::DaSourceService, + DaBlockCosts, + }, + metadata::V1AlgorithmConfig, + service::{ + initialize_algorithm, + GasPriceServiceV1, + }, + }, + }; + use fuel_core_services::{ + RunnableTask, + StateWatcher, + }; + use fuel_core_types::fuel_types::BlockHeight; + use std::{ + num::NonZeroU64, + sync::Arc, + time::Duration, + }; + use tokio::sync::mpsc; + + struct FakeL2BlockSource { + l2_block: mpsc::Receiver, + } + + #[async_trait::async_trait] + impl L2BlockSource for FakeL2BlockSource { + async fn get_l2_block(&mut self) -> GasPriceResult { + let block = self.l2_block.recv().await.unwrap(); + Ok(block) + } + } + + struct FakeMetadata { + inner: Arc>>, + } + + impl FakeMetadata { + fn empty() -> Self { + Self { + inner: Arc::new(std::sync::Mutex::new(None)), + } + } + } + + impl MetadataStorage for FakeMetadata { + fn get_metadata( + &self, + _: &BlockHeight, + ) -> GasPriceResult> { + let metadata = self.inner.lock().unwrap().clone(); + Ok(metadata) + } + + fn set_metadata(&mut self, metadata: &UpdaterMetadata) -> GasPriceResult<()> { + *self.inner.lock().unwrap() = Some(metadata.clone()); + Ok(()) + } + } + + #[tokio::test] + async fn run__updates_gas_price_with_l2_block_source() { + // given + let block_height = 1; + let l2_block = BlockInfo::Block { + height: block_height, + gas_used: 60, + block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, + }; + + let (l2_block_sender, l2_block_receiver) = mpsc::channel(1); + let l2_block_source = FakeL2BlockSource { + l2_block: l2_block_receiver, + }; + + let metadata_storage = FakeMetadata::empty(); + let l2_block_height = 0; + let config = V1AlgorithmConfig { + new_exec_gas_price: 100, + min_exec_gas_price: 50, + exec_gas_price_change_percent: 20, + l2_block_fullness_threshold_percent: 20, + gas_price_factor: NonZeroU64::new(10).unwrap(), + min_da_gas_price: 10, + max_da_gas_price_change_percent: 20, + da_p_component: 4, + da_d_component: 2, + normal_range_size: 10, + capped_range_size: 100, + decrease_range_size: 4, + block_activity_threshold: 20, + unrecorded_blocks: vec![], + }; + let (algo_updater, shared_algo) = + initialize_algorithm(&config, l2_block_height, &metadata_storage).unwrap(); + + let notifier = Arc::new(tokio::sync::Notify::new()); + let dummy_da_source = DaSourceService::new( + DummyDaBlockCosts::new( + Err(anyhow::anyhow!("unused at the moment")), + notifier.clone(), + ), + None, + ); + + let mut service = GasPriceServiceV1::new( + l2_block_source, + metadata_storage, + shared_algo, + algo_updater, + dummy_da_source, + ); + let read_algo = service.next_block_algorithm(); + let mut watcher = StateWatcher::default(); + let initial_price = read_algo.next_gas_price(); + + // when + service.run(&mut watcher).await.unwrap(); + l2_block_sender.send(l2_block).await.unwrap(); + service.shutdown().await.unwrap(); + + // then + let actual_price = read_algo.next_gas_price(); + assert_ne!(initial_price, actual_price); + } + + #[tokio::test] + async fn run__updates_gas_price_with_da_block_cost_source() { + // given + let block_height = 1; + let l2_block = BlockInfo::Block { + height: block_height, + gas_used: 60, + block_gas_capacity: 100, + block_bytes: 100, + block_fees: 100, + }; + + let (l2_block_sender, l2_block_receiver) = mpsc::channel(1); + let l2_block_source = FakeL2BlockSource { + l2_block: l2_block_receiver, + }; + + let metadata_storage = FakeMetadata::empty(); + let config = V1AlgorithmConfig { + new_exec_gas_price: 100, + min_exec_gas_price: 50, + exec_gas_price_change_percent: 20, + l2_block_fullness_threshold_percent: 20, + gas_price_factor: NonZeroU64::new(10).unwrap(), + min_da_gas_price: 100, + max_da_gas_price_change_percent: 50, + da_p_component: 4, + da_d_component: 2, + normal_range_size: 10, + capped_range_size: 100, + decrease_range_size: 4, + block_activity_threshold: 20, + unrecorded_blocks: vec![(1, 100)], + }; + let (algo_updater, shared_algo) = + initialize_algorithm(&config, block_height, &metadata_storage).unwrap(); + + let notifier = Arc::new(tokio::sync::Notify::new()); + let da_source = DaSourceService::new( + DummyDaBlockCosts::new( + Ok(DaBlockCosts { + l2_block_range: 1..2, + blob_cost_wei: 9000, + blob_size_bytes: 3000, + }), + notifier.clone(), + ), + Some(Duration::from_millis(1)), + ); + let mut watcher = StateWatcher::default(); + + let mut service = GasPriceServiceV1::new( + l2_block_source, + metadata_storage, + shared_algo, + algo_updater, + da_source, + ); + let read_algo = service.next_block_algorithm(); + let initial_price = read_algo.next_gas_price(); + + // the RunnableTask depends on the handle passed to it for the da block cost source to already be running, + // which is the responsibility of the UninitializedTask in the `into_task` method of the RunnableService + // here we mimic that behaviour by running the da block cost service. + let mut da_source_watcher = StateWatcher::started(); + service + .da_source_adapter_handle + .run(&mut da_source_watcher) + .await + .unwrap(); + + // when + service.run(&mut watcher).await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; + service.shutdown().await.unwrap(); + + // then + let actual_price = read_algo.next_gas_price(); + assert_ne!(initial_price, actual_price); + } +}