Skip to content

Commit

Permalink
chore(gas_price_service_v1): allow stopping the service during sync w…
Browse files Browse the repository at this point in the history
…ith sigint/etc (#2503)

## Linked Issues/PRs
<!-- List of related issues/PRs -->
- from the notion doc

## Description
<!-- List of detailed changes -->
allows stopping the node when the gas price db is syncing with onchain
db.

## Checklist
- [ ] Breaking changes are clearly marked as such in the PR description
and changelog
- [ ] New behavior is reflected in tests
- [ ] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [ ] I have created follow-up issues caused by this PR and linked them
here

### After merging, notify other teams

[Add or remove entries as needed]

- [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/)
- [ ] [Sway compiler](https://github.com/FuelLabs/sway/)
- [ ] [Platform
documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+)
(for out-of-organization contributors, the person merging the PR will do
this)
- [ ] Someone else?
  • Loading branch information
rymnc authored Dec 17, 2024
1 parent 78f5b94 commit 084f1b7
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 58 deletions.
5 changes: 2 additions & 3 deletions crates/services/gas_price_service/src/v1/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ use fuel_core_services::{
StateWatcher,
};
use fuel_core_storage::{
iter::IteratorOverTable,
structured_storage::test::InMemoryStorage,
transactional::{
AtomicView,
Expand Down Expand Up @@ -728,7 +727,7 @@ async fn uninitialized_task__init__starts_da_service_with_bundle_id_in_storage()
.unwrap();

// when
service.init().await.unwrap();
service.init(&StateWatcher::started()).await.unwrap();

// then
let actual = bundle_id_handle.lock().unwrap();
Expand Down Expand Up @@ -785,7 +784,7 @@ async fn uninitialized_task__init__if_metadata_behind_l2_height_then_sync() {
.unwrap();

// when
service.init().await.unwrap();
service.init(&StateWatcher::started()).await.unwrap();

// then
// no panic
Expand Down
73 changes: 18 additions & 55 deletions crates/services/gas_price_service/src/v1/uninitialized_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use fuel_core_services::{
RunnableService,
Service,
ServiceRunner,
State,
StateWatcher,
};
use fuel_core_storage::{
Expand Down Expand Up @@ -139,6 +140,7 @@ where

pub async fn init(
mut self,
state_watcher: &StateWatcher,
) -> anyhow::Result<
GasPriceServiceV1<FuelL2BlockSource<SettingsProvider>, DA, AtomicStorage>,
> {
Expand All @@ -155,9 +157,8 @@ where
self.block_stream,
);

if let Some(bundle_id) = self
.gas_price_db
.get_bundle_id(&self.gas_metadata_height.into())?
if let Some(bundle_id) =
self.gas_price_db.get_bundle_id(&self.gas_metadata_height)?
{
self.da_source.set_last_value(bundle_id).await?;
}
Expand All @@ -166,9 +167,8 @@ where
.da_poll_interval
.map(|x| Duration::from_millis(x.into()));
// TODO: Dupe code
if let Some(bundle_id) = self
.gas_price_db
.get_bundle_id(&self.gas_metadata_height.into())?
if let Some(bundle_id) =
self.gas_price_db.get_bundle_id(&self.gas_metadata_height)?
{
self.da_source.set_last_value(bundle_id).await?;
}
Expand All @@ -187,13 +187,14 @@ where
Ok(service)
} else {
if latest_block_height > *self.gas_metadata_height {
sync_gas_price_db_with_on_chain_storage(
sync_v1_metadata(
&self.settings,
&mut self.algo_updater,
&self.on_chain_db,
*self.gas_metadata_height,
latest_block_height,
&mut self.algo_updater,
&mut self.gas_price_db,
state_watcher,
)?;
}

Expand Down Expand Up @@ -230,65 +231,21 @@ where

async fn into_task(
self,
_state_watcher: &StateWatcher,
state_watcher: &StateWatcher,
_params: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
UninitializedTask::init(self).await
UninitializedTask::init(self, state_watcher).await
}
}

fn sync_gas_price_db_with_on_chain_storage<
L2DataStore,
L2DataStoreView,
SettingsProvider,
AtomicStorage,
>(
settings: &SettingsProvider,
updater: &mut AlgorithmUpdaterV1,
on_chain_db: &L2DataStoreView,
metadata_height: u32,
latest_block_height: u32,
persisted_data: &mut AtomicStorage,
) -> anyhow::Result<()>
where
L2DataStore: L2Data,
L2DataStoreView: AtomicView<LatestView = L2DataStore>,
SettingsProvider: GasPriceSettingsProvider,
AtomicStorage: GasPriceServiceAtomicStorage,
{
let metadata = persisted_data
.get_metadata(&metadata_height.into())?
.ok_or(anyhow::anyhow!(
"Expected metadata to exist for height: {metadata_height}"
))?;

// let metadata = match metadata {
// UpdaterMetadata::V1(metadata) => metadata,
// UpdaterMetadata::V0(metadata) => {
// V1Metadata::construct_from_v0_metadata(metadata, config)?
// }
// };
// let mut algo_updater = v1_algorithm_from_metadata(metadata, config);

sync_v1_metadata(
settings,
on_chain_db,
metadata_height,
latest_block_height,
updater,
persisted_data,
)?;

Ok(())
}

fn sync_v1_metadata<L2DataStore, L2DataStoreView, SettingsProvider, AtomicStorage>(
settings: &SettingsProvider,
on_chain_db: &L2DataStoreView,
metadata_height: u32,
latest_block_height: u32,
updater: &mut AlgorithmUpdaterV1,
da_storage: &mut AtomicStorage,
state_watcher: &StateWatcher,
) -> anyhow::Result<()>
where
L2DataStore: L2Data,
Expand All @@ -304,6 +261,12 @@ where
latest_block_height
);
for height in first..=latest_block_height {
// allows early exit if the service is stopping
let state = state_watcher.borrow();
if state.stopping() || state.stopped() {
return Ok(());
}

tracing::info!("Syncing gas price metadata for block {}", height);
let mut tx = da_storage.begin_transaction()?;
let block = view
Expand Down

0 comments on commit 084f1b7

Please sign in to comment.