Skip to content

Commit

Permalink
feat(gas_price_service): update block committer da source with establ…
Browse files Browse the repository at this point in the history
…ished contract (#2265)

> [!NOTE]
> This is PR 7/7 for #2139

## Linked Issues/PRs
<!-- List of related issues/PRs -->
- #2139

## Description
<!-- List of detailed changes -->
Updates the `BlockCommitterApi` to implement the contract established
with us and the block committer team. There is some business logic that
is slightly unclear, need @MitchTurner for insights.

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [ ] I have created follow-up issues caused by this PR and linked them
here

### After merging, notify other teams

[Add or remove entries as needed]

- [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/)
- [ ] [Sway compiler](https://github.com/FuelLabs/sway/)
- [ ] [Platform
documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+)
(for out-of-organization contributors, the person merging the PR will do
this)
- [ ] Someone else?

---------

Co-authored-by: Green Baneling <XgreenX9999@gmail.com>
  • Loading branch information
rymnc and xgreenx authored Oct 2, 2024
1 parent 538b2a4 commit 4ba9da8
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 29 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2131](https://github.com/FuelLabs/fuel-core/pull/2131): Add flow in TxPool in order to ask to newly connected peers to share their transaction pool
- [2182](https://github.com/FuelLabs/fuel-core/pull/2151): Limit number of transactions that can be fetched via TxSource::next
- [2189](https://github.com/FuelLabs/fuel-core/pull/2151): Select next DA height to never include more than u16::MAX -1 transactions from L1.

- [2265](https://github.com/FuelLabs/fuel-core/pull/2265): Integrate Block Committer API for DA Block Costs.

### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub const POLLING_INTERVAL_MS: u64 = 10_000;

#[derive(Debug, Default, Clone, Eq, Hash, PartialEq)]
pub struct DaBlockCosts {
pub l2_block_range: core::ops::Range<u32>,
pub l2_block_range: core::ops::Range<u64>,
pub blob_size_bytes: u32,
pub blob_cost_wei: u128,
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::arithmetic_side_effects)]

use crate::v1::da_source_adapter::{
service::{
DaBlockCostsSource,
Expand All @@ -6,57 +8,317 @@ use crate::v1::da_source_adapter::{
DaBlockCosts,
};
use anyhow::anyhow;
use reqwest::Url;
use fuel_core_types::blockchain::primitives::DaBlockHeight;
use serde::{
Deserialize,
Serialize,
};

/// This struct is used to denote the block committer da gas price source,,
#[async_trait::async_trait]
trait BlockCommitterApi: Send + Sync {
/// Used on first run to get the latest costs and seqno
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>>;
/// Used to get the costs for a specific seqno
async fn get_costs_by_seqno(
&self,
number: u64,
) -> DaBlockCostsResult<Option<RawDaBlockCosts>>;
/// Used to get the costs for a range of blocks (inclusive)
async fn get_cost_bundles_by_range(
&self,
range: core::ops::Range<u64>,
) -> DaBlockCostsResult<Vec<Option<RawDaBlockCosts>>>;
}

/// This struct is used to denote the block committer da block costs source
/// which receives data from the block committer (only http api for now)
pub struct BlockCommitterDaBlockCosts {
client: reqwest::Client,
url: Url,
pub struct BlockCommitterDaBlockCosts<BlockCommitter> {
client: BlockCommitter,
last_raw_da_block_costs: Option<RawDaBlockCosts>,
}

#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)]
struct RawDaBlockCosts {
pub l2_block_range: core::ops::Range<u32>,
pub blob_size_bytes: u32,
pub blob_cost: u128,
pub struct RawDaBlockCosts {
/// Sequence number (Monotonically increasing nonce)
pub sequence_number: u64,
/// The range of blocks that the costs apply to
pub blocks_range: core::ops::Range<u64>,
/// The DA block height of the last transaction for the range of blocks
pub da_block_height: DaBlockHeight,
/// Rolling sum cost of posting blobs (wei)
pub total_cost: u128,
/// Rolling sum size of blobs (bytes)
pub total_size_bytes: u32,
}

impl From<RawDaBlockCosts> for DaBlockCosts {
fn from(raw: RawDaBlockCosts) -> Self {
impl From<&RawDaBlockCosts> for DaBlockCosts {
fn from(raw_da_block_costs: &RawDaBlockCosts) -> Self {
DaBlockCosts {
l2_block_range: raw.l2_block_range,
blob_size_bytes: raw.blob_size_bytes,
blob_cost_wei: raw.blob_cost,
l2_block_range: raw_da_block_costs.blocks_range.clone(),
blob_size_bytes: raw_da_block_costs.total_size_bytes,
blob_cost_wei: raw_da_block_costs.total_cost,
}
}
}

impl BlockCommitterDaBlockCosts {
/// Create a new instance of the block committer da gas price source
pub fn new(url: String) -> Self {
impl<BlockCommitter> BlockCommitterDaBlockCosts<BlockCommitter> {
/// Create a new instance of the block committer da block costs source
pub fn new(client: BlockCommitter, last_value: Option<RawDaBlockCosts>) -> Self {
Self {
client: reqwest::Client::new(),
url: Url::parse(&url).unwrap(),
client,
last_raw_da_block_costs: last_value,
}
}
}

#[async_trait::async_trait]
impl DaBlockCostsSource for BlockCommitterDaBlockCosts {
impl<BlockCommitter> DaBlockCostsSource for BlockCommitterDaBlockCosts<BlockCommitter>
where
BlockCommitter: BlockCommitterApi,
{
async fn request_da_block_cost(&mut self) -> DaBlockCostsResult<DaBlockCosts> {
let response = self.client.get(self.url.clone()).send().await?;
if !response.status().is_success() {
return Err(anyhow!("failed with response: {}", response.status()));
let raw_da_block_costs = match self.last_raw_da_block_costs {
Some(ref last_value) => self
.client
.get_costs_by_seqno(last_value.sequence_number + 1),
_ => self.client.get_latest_costs(),
}
let response = response
.await?;

let Some(ref raw_da_block_costs) = raw_da_block_costs else {
return Err(anyhow!("No response from block committer"))
};

let da_block_costs = self.last_raw_da_block_costs.iter().fold(
Ok(raw_da_block_costs.into()),
|costs: DaBlockCostsResult<DaBlockCosts>, last_value| {
let costs = costs.expect("Defined to be OK");
let blob_size_bytes = costs
.blob_size_bytes
.checked_sub(last_value.total_size_bytes)
.ok_or(anyhow!("Blob size bytes underflow"))?;
let blob_cost_wei = raw_da_block_costs
.total_cost
.checked_sub(last_value.total_cost)
.ok_or(anyhow!("Blob cost wei underflow"))?;
Ok(DaBlockCosts {
blob_size_bytes,
blob_cost_wei,
..costs
})
},
)?;

self.last_raw_da_block_costs = Some(raw_da_block_costs.clone());
Ok(da_block_costs)
}
}

pub struct BlockCommitterHttpApi {
client: reqwest::Client,
url: String,
}

impl BlockCommitterHttpApi {
pub fn new(url: String) -> Self {
Self {
client: reqwest::Client::new(),
url,
}
}
}

#[async_trait::async_trait]
impl BlockCommitterApi for BlockCommitterHttpApi {
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
let response = self
.client
.get(&self.url)
.send()
.await?
.json::<RawDaBlockCosts>()
.await?;
Ok(Some(response))
}

async fn get_costs_by_seqno(
&self,
number: u64,
) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
let response = self
.client
.get(&format!("{}/{}", self.url, number))
.send()
.await?
.json::<RawDaBlockCosts>()
.await
.map_err(|err| anyhow!(err))?;
Ok(response.into())
.await?;
Ok(Some(response))
}

async fn get_cost_bundles_by_range(
&self,
range: core::ops::Range<u64>,
) -> DaBlockCostsResult<Vec<Option<RawDaBlockCosts>>> {
let response = self
.client
.get(&format!("{}/{}-{}", self.url, range.start, range.end))
.send()
.await?
.json::<Vec<RawDaBlockCosts>>()
.await?;
Ok(response.into_iter().map(Some).collect())
}
}

#[cfg(test)]
#[allow(non_snake_case)]
mod tests {
use super::*;

struct MockBlockCommitterApi {
value: Option<RawDaBlockCosts>,
}

impl MockBlockCommitterApi {
fn new(value: Option<RawDaBlockCosts>) -> Self {
Self { value }
}
}

#[async_trait::async_trait]
impl BlockCommitterApi for MockBlockCommitterApi {
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
Ok(self.value.clone())
}
async fn get_costs_by_seqno(
&self,
seq_no: u64,
) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
// arbitrary logic to generate a new value
let mut value = self.value.clone();
if let Some(value) = &mut value {
value.sequence_number = seq_no;
value.blocks_range =
value.blocks_range.end * seq_no..value.blocks_range.end * seq_no + 10;
value.da_block_height = value.da_block_height + (seq_no + 1).into();
value.total_cost += 1;
value.total_size_bytes += 1;
}
Ok(value)
}
async fn get_cost_bundles_by_range(
&self,
_: core::ops::Range<u64>,
) -> DaBlockCostsResult<Vec<Option<RawDaBlockCosts>>> {
Ok(vec![self.value.clone()])
}
}

fn test_da_block_costs() -> RawDaBlockCosts {
RawDaBlockCosts {
sequence_number: 1,
blocks_range: 0..10,
da_block_height: 1u64.into(),
total_cost: 1,
total_size_bytes: 1,
}
}

#[tokio::test]
async fn request_da_block_cost__when_last_value_is_none__then_get_latest_costs_is_called(
) {
// given
let da_block_costs = test_da_block_costs();
let expected = (&da_block_costs).into();
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs));
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None);

// when
let actual = block_committer.request_da_block_cost().await.unwrap();

// then
assert_eq!(actual, expected);
}

#[tokio::test]
async fn request_da_block_cost__when_last_value_is_some__then_get_costs_by_seqno_is_called(
) {
// given
let mut da_block_costs = test_da_block_costs();
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs.clone()));
let mut block_committer =
BlockCommitterDaBlockCosts::new(mock_api, Some(da_block_costs.clone()));

// when
let actual = block_committer.request_da_block_cost().await.unwrap();

// then
assert_ne!(da_block_costs.blocks_range, actual.l2_block_range);
}

#[tokio::test]
async fn request_da_block_cost__when_response_is_none__then_error() {
// given
let mock_api = MockBlockCommitterApi::new(None);
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None);

// when
let result = block_committer.request_da_block_cost().await;

// then
assert!(result.is_err());
}

struct UnderflowingMockBlockCommitterApi {
value: Option<RawDaBlockCosts>,
}

impl UnderflowingMockBlockCommitterApi {
fn new(value: Option<RawDaBlockCosts>) -> Self {
Self { value }
}
}

#[async_trait::async_trait]
impl BlockCommitterApi for UnderflowingMockBlockCommitterApi {
async fn get_latest_costs(&self) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
Ok(self.value.clone())
}
async fn get_costs_by_seqno(
&self,
seq_no: u64,
) -> DaBlockCostsResult<Option<RawDaBlockCosts>> {
// arbitrary logic to generate a new value
let mut value = self.value.clone();
if let Some(value) = &mut value {
value.sequence_number = seq_no;
value.blocks_range = value.blocks_range.end..value.blocks_range.end + 10;
value.da_block_height = value.da_block_height + 1u64.into();
value.total_cost -= 1;
value.total_size_bytes -= 1;
}
Ok(value)
}
async fn get_cost_bundles_by_range(
&self,
_: core::ops::Range<u64>,
) -> DaBlockCostsResult<Vec<Option<RawDaBlockCosts>>> {
Ok(vec![self.value.clone()])
}
}

#[tokio::test]
async fn request_da_block_cost__when_underflow__then_error() {
// given
let da_block_costs = test_da_block_costs();
let mock_api = UnderflowingMockBlockCommitterApi::new(Some(da_block_costs));
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None);
let _ = block_committer.request_da_block_cost().await.unwrap();

// when
let result = block_committer.request_da_block_cost().await;

// then
assert!(result.is_err());
}
}

0 comments on commit 4ba9da8

Please sign in to comment.