Skip to content

Commit

Permalink
Pass last updated key to the cleaup function instead of using naive i…
Browse files Browse the repository at this point in the history
…teration
  • Loading branch information
kstepanovdev committed Nov 20, 2024
1 parent dbe9e3f commit 037fea8
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 17 deletions.
20 changes: 16 additions & 4 deletions nft_ingester/src/index_syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,11 @@ where
SyncStatus::NoSyncRequired => {}
}

self.clean_syncronized_idxs(asset_type)
if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? {
self.clean_syncronized_idxs(asset_type, encoded_key)?;
}

Ok(())
}

pub async fn synchronize_fungible_asset_indexes(
Expand All @@ -226,12 +230,20 @@ where
}
SyncStatus::NoSyncRequired => {}
}
if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? {
self.clean_syncronized_idxs(asset_type, encoded_key)?;
}

self.clean_syncronized_idxs(asset_type)
Ok(())
}

pub fn clean_syncronized_idxs(&self, asset_type: AssetType) -> Result<(), IngesterError> {
self.primary_storage.clean_syncronized_idxs(asset_type)?;
pub fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<(), IngesterError> {
self.primary_storage
.clean_syncronized_idxs(asset_type, last_synced_key)?;

Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,13 @@ impl TokenAccountsProcessor {
&self,
storage: &mut BatchSaveStorage,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<(), StorageError> {
self.finalize_processing(
storage,
|storage: &mut BatchSaveStorage| storage.clean_syncronized_idxs(asset_type),
|storage: &mut BatchSaveStorage| {
storage.clean_syncronized_idxs(asset_type, last_synced_key.clone())
},
"clean_syncronized_idxs",
)
}
Expand Down
14 changes: 9 additions & 5 deletions rocks-db/src/asset_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,23 @@ impl Storage {
Ok(())
}

pub fn clean_syncronized_idxs_with_batch(&self, asset_type: AssetType) -> Result<()> {
pub fn clean_syncronized_idxs_with_batch(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
let (mut iter, cf) = match asset_type {
AssetType::Fungible => {
let cf = self.fungible_assets_update_idx.handle();
// TODO: use regular iter to avoid data inconcistency
let iter = self.fungible_assets_update_idx.iter_end();
let iter = self
.fungible_assets_update_idx
.iter_reverse(last_synced_key);

(iter, cf)
}
AssetType::NonFungible => {
let cf = self.assets_update_idx.handle();
// TODO: use regular iter to avoid data inconcistency
let iter = self.assets_update_idx.iter_end();
let iter = self.assets_update_idx.iter_reverse(last_synced_key);

(iter, cf)
}
Expand Down
8 changes: 6 additions & 2 deletions rocks-db/src/batch_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,12 @@ impl AssetUpdateIndexStorage for Storage {
Ok((unique_pubkeys, last_key))
}

fn clean_syncronized_idxs(&self, asset_type: AssetType) -> Result<()> {
self.clean_syncronized_idxs_with_batch(asset_type)
fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
self.clean_syncronized_idxs_with_batch(asset_type, last_synced_key)
}
}

Expand Down
9 changes: 7 additions & 2 deletions rocks-db/src/batch_savers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,13 @@ impl BatchSaveStorage {
)
}

pub fn clean_syncronized_idxs(&self, asset_type: AssetType) -> Result<()> {
self.storage.clean_syncronized_idxs_with_batch(asset_type)
pub fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
self.storage
.clean_syncronized_idxs_with_batch(asset_type, last_synced_key)
}

pub fn get_authority(&self, address: Pubkey) -> Pubkey {
Expand Down
11 changes: 8 additions & 3 deletions rocks-db/src/storage_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ pub trait AssetUpdateIndexStorage {
skip_keys: Option<HashSet<Pubkey>>,
) -> Result<(HashSet<Pubkey>, Option<AssetUpdatedKey>)>;

fn clean_syncronized_idxs(&self, asset_type: AssetType) -> Result<()>;
fn clean_syncronized_idxs(&self, asset_type: AssetType, last_synced_key: Vec<u8>)
-> Result<()>;
}

#[automock]
Expand Down Expand Up @@ -128,9 +129,13 @@ impl AssetUpdateIndexStorage for MockAssetIndexStorage {
.fetch_fungible_asset_updated_keys(from, up_to, limit, skip_keys)
}

fn clean_syncronized_idxs(&self, asset_type: AssetType) -> Result<()> {
fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
self.mock_update_index_storage
.clean_syncronized_idxs(asset_type)
.clean_syncronized_idxs(asset_type, last_synced_key)
}
}

Expand Down

0 comments on commit 037fea8

Please sign in to comment.