Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(gas_price_service_v1): allow stopping the service during sync with sigint/etc #2503

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 18 additions & 55 deletions crates/services/gas_price_service/src/v1/uninitialized_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use fuel_core_services::{
RunnableService,
Service,
ServiceRunner,
State,
StateWatcher,
};
use fuel_core_storage::{
Expand Down Expand Up @@ -139,6 +140,7 @@ where

pub async fn init(
mut self,
state_watcher: &StateWatcher,
) -> anyhow::Result<
GasPriceServiceV1<FuelL2BlockSource<SettingsProvider>, DA, AtomicStorage>,
> {
Expand All @@ -155,9 +157,8 @@ where
self.block_stream,
);

if let Some(bundle_id) = self
.gas_price_db
.get_bundle_id(&self.gas_metadata_height.into())?
if let Some(bundle_id) =
self.gas_price_db.get_bundle_id(&self.gas_metadata_height)?
{
self.da_source.set_last_value(bundle_id).await?;
}
Expand All @@ -166,9 +167,8 @@ where
.da_poll_interval
.map(|x| Duration::from_millis(x.into()));
// TODO: Dupe code
if let Some(bundle_id) = self
.gas_price_db
.get_bundle_id(&self.gas_metadata_height.into())?
if let Some(bundle_id) =
self.gas_price_db.get_bundle_id(&self.gas_metadata_height)?
{
self.da_source.set_last_value(bundle_id).await?;
}
Expand All @@ -187,13 +187,14 @@ where
Ok(service)
} else {
if latest_block_height > *self.gas_metadata_height {
sync_gas_price_db_with_on_chain_storage(
sync_v1_metadata(
&self.settings,
&mut self.algo_updater,
&self.on_chain_db,
*self.gas_metadata_height,
latest_block_height,
&mut self.algo_updater,
&mut self.gas_price_db,
state_watcher,
)?;
}

Expand Down Expand Up @@ -230,65 +231,21 @@ where

async fn into_task(
self,
_state_watcher: &StateWatcher,
state_watcher: &StateWatcher,
_params: Self::TaskParams,
) -> anyhow::Result<Self::Task> {
UninitializedTask::init(self).await
UninitializedTask::init(self, state_watcher).await
}
}

fn sync_gas_price_db_with_on_chain_storage<
L2DataStore,
L2DataStoreView,
SettingsProvider,
AtomicStorage,
>(
settings: &SettingsProvider,
updater: &mut AlgorithmUpdaterV1,
on_chain_db: &L2DataStoreView,
metadata_height: u32,
latest_block_height: u32,
persisted_data: &mut AtomicStorage,
) -> anyhow::Result<()>
where
L2DataStore: L2Data,
L2DataStoreView: AtomicView<LatestView = L2DataStore>,
SettingsProvider: GasPriceSettingsProvider,
AtomicStorage: GasPriceServiceAtomicStorage,
{
let metadata = persisted_data
.get_metadata(&metadata_height.into())?
.ok_or(anyhow::anyhow!(
"Expected metadata to exist for height: {metadata_height}"
))?;

// let metadata = match metadata {
// UpdaterMetadata::V1(metadata) => metadata,
// UpdaterMetadata::V0(metadata) => {
// V1Metadata::construct_from_v0_metadata(metadata, config)?
// }
// };
// let mut algo_updater = v1_algorithm_from_metadata(metadata, config);

sync_v1_metadata(
settings,
on_chain_db,
metadata_height,
latest_block_height,
updater,
persisted_data,
)?;

Ok(())
}

fn sync_v1_metadata<L2DataStore, L2DataStoreView, SettingsProvider, AtomicStorage>(
settings: &SettingsProvider,
on_chain_db: &L2DataStoreView,
metadata_height: u32,
latest_block_height: u32,
updater: &mut AlgorithmUpdaterV1,
da_storage: &mut AtomicStorage,
state_watcher: &StateWatcher,
) -> anyhow::Result<()>
where
L2DataStore: L2Data,
Expand All @@ -304,6 +261,12 @@ where
latest_block_height
);
for height in first..=latest_block_height {
// allows early exit if the service is stopping
let state = state_watcher.borrow();
if state.stopping() || state.stopped() {
return Ok(());
}

tracing::info!("Syncing gas price metadata for block {}", height);
let mut tx = da_storage.begin_transaction()?;
let block = view
Expand Down
Loading