Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for DA L2 sync race condition #2512

2 changes: 1 addition & 1 deletion crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ pub fn init_sub_services(

tracing::debug!("da_committer_url: {:?}", config.da_committer_url);
let committer_api = BlockCommitterHttpApi::new(config.da_committer_url.clone());
let da_source = BlockCommitterDaBlockCosts::new(committer_api, None);
let da_source = BlockCommitterDaBlockCosts::new(committer_api);
let v1_config = GasPriceServiceConfig::from(config.clone())
.v1()
.ok_or(anyhow!(
Expand Down
2 changes: 1 addition & 1 deletion crates/services/gas_price_service/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum Error {
pub type Result<T, E = Error> = core::result::Result<T, E>;

// Info required about the l2 block for the gas price algorithm
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum BlockInfo {
// The genesis block of the L2 chain
GenesisBlock,
Expand Down
144 changes: 139 additions & 5 deletions crates/services/gas_price_service/src/v1/da_source_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,30 @@ mod tests {
use super::*;
use crate::v1::da_source_service::{
dummy_costs::DummyDaBlockCosts,
service::new_service,
service::{
new_da_service,
DaSourceService,
DA_BLOCK_COSTS_CHANNEL_SIZE,
},
};
use fuel_core_services::Service;
use fuel_core_services::{
RunnableTask,
Service,
StateWatcher,
};
use fuel_core_types::fuel_types::BlockHeight;
use std::{
sync::Arc,
sync::{
Arc,
Mutex,
},
time::Duration,
};

fn latest_l2_height(height: u32) -> Arc<Mutex<BlockHeight>> {
Arc::new(Mutex::new(BlockHeight::new(height)))
}

#[tokio::test]
async fn run__when_da_block_cost_source_gives_value_shared_state_is_updated() {
// given
Expand All @@ -43,7 +59,12 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(expected_da_cost.clone()), notifier.clone());
let service = new_service(da_block_costs_source, Some(Duration::from_millis(1)));
let latest_l2_height = Arc::new(Mutex::new(BlockHeight::new(10u32)));
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
);
let mut shared_state = &mut service.shared.subscribe();

// when
Expand All @@ -62,7 +83,12 @@ mod tests {
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Err(anyhow::anyhow!("boo!")), notifier.clone());
let service = new_service(da_block_costs_source, Some(Duration::from_millis(1)));
let latest_l2_height = latest_l2_height(0);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
);
let mut shared_state = &mut service.shared.subscribe();

// when
Expand All @@ -74,4 +100,112 @@ mod tests {
assert!(da_block_costs_res.is_err());
service.stop_and_await().await.unwrap();
}

#[tokio::test]
async fn run__will_not_return_cost_bundles_for_bundles_that_are_greater_than_l2_height(
) {
// given
let l2_height = 4;
let unexpected_costs = DaBlockCosts {
bundle_id: 1,
l2_blocks: 0..=9,
bundle_size_bytes: 1024 * 128,
blob_cost_wei: 2,
};
assert!(unexpected_costs.l2_blocks.end() > &l2_height);
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let service = new_da_service(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
);
let mut shared_state = &mut service.shared.subscribe();

// when
service.start_and_await().await.unwrap();
notifier.notified().await;

// then
let err = shared_state.try_recv();
tracing::info!("err: {:?}", err);
assert!(err.is_err());
}

#[tokio::test]
async fn run__filtered_da_block_costs_do_not_update_latest_recorded_block() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.try_init();

// given
let l2_height = 4;
let unexpected_costs = DaBlockCosts {
bundle_id: 1,
l2_blocks: 2..=9,
bundle_size_bytes: 1024 * 128,
blob_cost_wei: 2,
};
assert!(unexpected_costs.l2_blocks.end() > &l2_height);
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let mut service = DaSourceService::new(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
);
let mut watcher = StateWatcher::started();

// when
let _ = service.run(&mut watcher).await;

// then
let recorded_height = service.recorded_height();
let expected = 1;
assert!(recorded_height.is_none())
}

#[tokio::test]
async fn run__recorded_height_updated_by_da_costs() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.try_init();

// given
let l2_height = 10;
let recorded_height = 9;
let unexpected_costs = DaBlockCosts {
bundle_id: 1,
l2_blocks: 2..=recorded_height,
bundle_size_bytes: 1024 * 128,
blob_cost_wei: 2,
};
let notifier = Arc::new(tokio::sync::Notify::new());
let da_block_costs_source =
DummyDaBlockCosts::new(Ok(unexpected_costs.clone()), notifier.clone());
let latest_l2_height = latest_l2_height(l2_height);
let (sender, mut receiver) =
tokio::sync::broadcast::channel(DA_BLOCK_COSTS_CHANNEL_SIZE);
let mut service = DaSourceService::new_with_sender(
da_block_costs_source,
Some(Duration::from_millis(1)),
latest_l2_height,
None,
sender,
);
let mut watcher = StateWatcher::started();

// when
let next = service.run(&mut watcher).await;

// then
let actual = service.recorded_height().unwrap();
let expected = BlockHeight::from(recorded_height);
assert_eq!(expected, actual);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ pub trait BlockCommitterApi: Send + Sync {
/// which receives data from the block committer (only http api for now)
pub struct BlockCommitterDaBlockCosts<BlockCommitter> {
client: BlockCommitter,
last_recorded_height: Option<BlockHeight>,
}

#[derive(Debug, Deserialize, Serialize, Clone, Default, PartialEq)]
Expand Down Expand Up @@ -79,14 +78,8 @@ impl From<RawDaBlockCosts> for DaBlockCosts {

impl<BlockCommitter> BlockCommitterDaBlockCosts<BlockCommitter> {
/// Create a new instance of the block committer da block costs source
pub fn new(
client: BlockCommitter,
last_recorded_height: Option<BlockHeight>,
) -> Self {
Self {
client,
last_recorded_height,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MitchTurner was this change intended?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because in practice we only set the last_recorded_height after we've constructed the BlockCommitterDaBlockCosts.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okidoki

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some merge conflicts around this

pub fn new(client: BlockCommitter) -> Self {
Self { client }
}
}

Expand All @@ -95,32 +88,27 @@ impl<BlockCommitter> DaBlockCostsSource for BlockCommitterDaBlockCosts<BlockComm
where
BlockCommitter: BlockCommitterApi,
{
async fn request_da_block_costs(&mut self) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
let raw_da_block_costs: Vec<_> =
match self.last_recorded_height.and_then(|x| x.succ()) {
Some(ref next_height) => {
self.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?
}
None => self.client.get_latest_costs().await?.into_iter().collect(),
};
async fn request_da_block_costs(
&mut self,
last_recorded_height: &Option<BlockHeight>,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
let raw_da_block_costs: Vec<_> = match last_recorded_height.and_then(|x| x.succ())
{
Some(ref next_height) => {
self.client
.get_costs_by_l2_block_number(*next_height.deref())
.await?
}
None => self.client.get_latest_costs().await?.into_iter().collect(),
};

tracing::info!("raw_da_block_costs: {:?}", raw_da_block_costs);
let da_block_costs: Vec<_> =
raw_da_block_costs.iter().map(DaBlockCosts::from).collect();
tracing::info!("da_block_costs: {:?}", da_block_costs);
if let Some(cost) = raw_da_block_costs.last() {
self.last_recorded_height = Some(BlockHeight::from(cost.end_height));
}
Comment on lines -109 to -111
Copy link
Member

@rymnc rymnc Jan 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we set it here then? do we move the last_recorded_height elsewhere?
ignore pls

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's held by the DA service instead now:

pub struct DaSourceService<Source> {
    poll_interval: Interval,
    source: Source,
    shared_state: SharedState,
    latest_l2_height: Arc<Mutex<BlockHeight>>,
    recorded_height: Option<BlockHeight>,
}


Ok(da_block_costs)
}

async fn set_last_value(&mut self, height: BlockHeight) -> DaBlockCostsResult<()> {
self.last_recorded_height = Some(height);
Ok(())
}
}

pub struct BlockCommitterHttpApi {
Expand Down Expand Up @@ -510,10 +498,10 @@ mod tests {
let da_block_costs = test_da_block_costs();
let expected = vec![(&da_block_costs).into()];
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs));
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api, None);
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api);

// when
let actual = block_committer.request_da_block_costs().await.unwrap();
let actual = block_committer.request_da_block_costs(&None).await.unwrap();

// then
assert_eq!(actual, expected);
Expand All @@ -527,11 +515,13 @@ mod tests {
let da_block_costs_len = da_block_costs.end_height - da_block_costs.start_height;
let mock_api = MockBlockCommitterApi::new(Some(da_block_costs.clone()));
let latest_height = BlockHeight::new(da_block_costs.end_height);
let mut block_committer =
BlockCommitterDaBlockCosts::new(mock_api, Some(latest_height));
let mut block_committer = BlockCommitterDaBlockCosts::new(mock_api);

// when
let actual = block_committer.request_da_block_costs().await.unwrap();
let actual = block_committer
.request_da_block_costs(&Some(latest_height))
.await
.unwrap();

// then
let l2_blocks = actual.first().unwrap().l2_blocks.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ impl DummyDaBlockCosts {

#[async_trait::async_trait]
impl DaBlockCostsSource for DummyDaBlockCosts {
async fn request_da_block_costs(&mut self) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
async fn request_da_block_costs(
&mut self,
_latest_recorded_height: &Option<BlockHeight>,
) -> DaBlockCostsResult<Vec<DaBlockCosts>> {
match &self.value {
Ok(da_block_costs) => {
self.notifier.notify_waiters();
Expand All @@ -35,8 +38,4 @@ impl DaBlockCostsSource for DummyDaBlockCosts {
}
}
}

async fn set_last_value(&mut self, _height: BlockHeight) -> DaBlockCostsResult<()> {
unimplemented!("This is a dummy implementation");
}
}
Loading
Loading