Skip to content

Commit

Permalink
Implemented cleaning up asset slot update idx && added tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kstepanovdev committed Nov 26, 2024
1 parent 74cd2ee commit 2f8e7ee
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 16 deletions.
43 changes: 35 additions & 8 deletions nft_ingester/src/index_syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,44 +172,71 @@ where
rx: &tokio::sync::broadcast::Receiver<()>,
run_full_sync_threshold: i64,
) -> Result<(), IngesterError> {
let asset_type = AssetType::NonFungible;

let state = self
.get_sync_state(run_full_sync_threshold, AssetType::NonFungible)
.get_sync_state(run_full_sync_threshold, asset_type)
.await?;
match state {
SyncStatus::FullSyncRequired(state) => {
tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq);
self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await
.await?;
}
SyncStatus::RegularSyncRequired(state) => {
self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await
.await?;
}
SyncStatus::NoSyncRequired => Ok(()),
SyncStatus::NoSyncRequired => {}
}

if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? {
self.clean_syncronized_idxs(asset_type, encoded_key)?;
}

Ok(())
}

pub async fn synchronize_fungible_asset_indexes(
&self,
rx: &tokio::sync::broadcast::Receiver<()>,
run_full_sync_threshold: i64,
) -> Result<(), IngesterError> {
let asset_type = AssetType::Fungible;

let state = self
.get_sync_state(run_full_sync_threshold, AssetType::Fungible)
.get_sync_state(run_full_sync_threshold, asset_type)
.await?;

match state {
SyncStatus::FullSyncRequired(state) => {
tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq);
self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await
.await?;
}
SyncStatus::RegularSyncRequired(state) => {
self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await
.await?;
}
SyncStatus::NoSyncRequired => Ok(()),
SyncStatus::NoSyncRequired => {}
}

if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? {
self.clean_syncronized_idxs(asset_type, encoded_key)?;
}

Ok(())
}

pub fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<(), IngesterError> {
self.primary_storage
.clean_syncronized_idxs(asset_type, last_synced_key)?;

Ok(())
}

pub async fn full_syncronize(
Expand Down
114 changes: 114 additions & 0 deletions nft_ingester/tests/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3714,4 +3714,118 @@ mod tests {
fungible_token_account1.to_string()
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_idx_cleaner() {
let cnt = 100;
let cli = Cli::default();

let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await;
let synchronizer = nft_ingester::index_syncronizer::Synchronizer::new(
env.rocks_env.storage.clone(),
env.pg_env.client.clone(),
env.pg_env.client.clone(),
200_000,
"".to_string(),
Arc::new(SynchronizerMetricsConfig::new()),
1,
false,
);
let non_fungible_token_mint = generated_assets.pubkeys[1];
let mint = Mint {
pubkey: non_fungible_token_mint,
supply: 100000,
decimals: 0,
mint_authority: None,
freeze_authority: None,
token_program: Default::default(),
slot_updated: 10,
write_version: 10,
extensions: None,
};

let owner: Pubkey = generated_assets.owners[1].owner.value.unwrap();
let token_account_addr = Pubkey::new_unique();
let token_account = TokenAccount {
pubkey: token_account_addr,
mint: non_fungible_token_mint,
delegate: None,
owner,
extensions: None,
frozen: false,
delegated_amount: 0,
slot_updated: 10,
amount: 100,
write_version: 10,
};
let mut batch_storage = BatchSaveStorage::new(
env.rocks_env.storage.clone(),
10,
Arc::new(IngesterMetricsConfig::new()),
);
let token_accounts_processor =
TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new()));
token_accounts_processor
.transform_and_save_token_account(
&mut batch_storage,
token_account_addr,
&token_account,
)
.unwrap();
token_accounts_processor
.transform_and_save_mint_account(&mut batch_storage, &mint)
.unwrap();
batch_storage.flush().unwrap();

// one more idx shoul've been added
let mut number_of_fungible_idxs = 0;
let mut number_of_non_fungible_idxs = 0;
let mut idx_fungible_asset_iter = env
.rocks_env
.storage
.fungible_assets_update_idx
.iter_start();
let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();

while let Some(_) = idx_fungible_asset_iter.next() {
number_of_fungible_idxs += 1;
}
assert_eq!(number_of_fungible_idxs, 1);

while let Some(_) = idx_non_fungible_asset_iter.next() {
number_of_non_fungible_idxs += 1;
}
assert_eq!(number_of_non_fungible_idxs, 3);

let (_, rx) = tokio::sync::broadcast::channel::<()>(1);

synchronizer
.synchronize_nft_asset_indexes(&rx, 0)
.await
.unwrap();
synchronizer
.synchronize_fungible_asset_indexes(&rx, 0)
.await
.unwrap();

// after sync idxs should be cleaned again
let mut number_of_fungible_idxs = 0;
let mut number_of_non_fungible_idxs = 0;
let mut idx_fungible_asset_iter = env
.rocks_env
.storage
.fungible_assets_update_idx
.iter_start();
let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();

while let Some(_) = idx_fungible_asset_iter.next() {
number_of_fungible_idxs += 1;
}
assert_eq!(number_of_fungible_idxs, 1);

while let Some(_) = idx_non_fungible_asset_iter.next() {
number_of_non_fungible_idxs += 1;
}
assert_eq!(number_of_non_fungible_idxs, 1);
}
}
47 changes: 43 additions & 4 deletions rocks-db/src/asset_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use crate::errors::StorageError;
use crate::key_encoders::encode_u64x2_pubkey;
use crate::{Result, Storage};
use entities::api_req_params::Options;
use entities::enums::TokenMetadataEdition;
use entities::enums::{AssetType, TokenMetadataEdition};
use entities::models::{EditionData, PubkeyWithSlot};
use futures_util::FutureExt;
use std::collections::HashMap;

impl Storage {
fn get_next_fungible_asset_update_seq(&self) -> Result<u64> {
if self.fungible_assets_update_last_seq.load(Ordering::SeqCst) == 0 {
if self.fungible_assets_update_last_seq.load(Ordering::Relaxed) == 0 {
// If fungible_assets_update_next_seq is zero, fetch the last key from fungible_assets_update_idx
let mut iter = self.fungible_assets_update_idx.iter_end(); // Assuming iter_end method fetches the last item

Expand All @@ -27,13 +27,13 @@ impl Storage {

let seq = u64::from_be_bytes(last_key[..std::mem::size_of::<u64>()].try_into()?);
self.fungible_assets_update_last_seq
.store(seq, Ordering::SeqCst);
.store(seq, Ordering::Relaxed);
}
}
// Increment and return the sequence number
let seq = self
.fungible_assets_update_last_seq
.fetch_add(1, Ordering::SeqCst)
.fetch_add(1, Ordering::Relaxed)
+ 1;
Ok(seq)
}
Expand Down Expand Up @@ -113,6 +113,45 @@ impl Storage {
);
Ok(())
}

pub fn clean_syncronized_idxs_with_batch(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
let (from, cf) = match asset_type {
AssetType::Fungible => {
let cf = self.fungible_assets_update_idx.handle();
let key_type_pairs = self.fungible_assets_update_idx.get_from_start(1);

if key_type_pairs.is_empty() {
return Ok(());
}
let from = key_type_pairs[0].0.clone();
(from, cf)
}
AssetType::NonFungible => {
let cf = self.assets_update_idx.handle();
let key_type_pairs = self.assets_update_idx.get_from_start(1);

if key_type_pairs.is_empty() {
return Ok(());
}
let from = key_type_pairs[0].0.clone();
(from, cf)
}
};

if from == last_synced_key {
return Ok(());
}

let mut batch = rocksdb::WriteBatchWithTransaction::<false>::default();
batch.delete_range_cf(&cf, from, last_synced_key);
self.db.write(batch)?;

Ok(())
}
}

#[macro_export]
Expand Down
12 changes: 10 additions & 2 deletions rocks-db/src/batch_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{HashMap, HashSet};

use async_trait::async_trait;
use entities::enums::{SpecificationVersions, TokenMetadataEdition};
use entities::enums::{AssetType, SpecificationVersions, TokenMetadataEdition};
use serde_json::json;
use solana_sdk::pubkey::Pubkey;

Expand Down Expand Up @@ -183,6 +183,14 @@ impl AssetUpdateIndexStorage for Storage {
);
Ok((unique_pubkeys, last_key))
}

fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
self.clean_syncronized_idxs_with_batch(asset_type, last_synced_key)
}
}

#[async_trait]
Expand All @@ -200,9 +208,9 @@ impl AssetIndexReader for Storage {
let fungible_asset_index = FungibleAssetIndex {
pubkey: token_acc.pubkey,
owner: Some(token_acc.owner),
slot_updated: token_acc.slot_updated,
fungible_asset_mint: Some(token_acc.mint),
fungible_asset_balance: Some(token_acc.amount as u64),
slot_updated: token_acc.slot_updated,
};

fungible_assets_indexes.insert(token_acc.pubkey, fungible_asset_index);
Expand Down
11 changes: 10 additions & 1 deletion rocks-db/src/batch_savers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::asset::{AssetCollection, MetadataMintMap};
use crate::token_accounts::{TokenAccountMintOwnerIdx, TokenAccountOwnerIdx};
use crate::Result;
use crate::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage};
use entities::enums::TokenMetadataEdition;
use entities::enums::{AssetType, TokenMetadataEdition};
use entities::models::{
InscriptionDataInfo, InscriptionInfo, Mint, TokenAccount, TokenAccountMintOwnerIdxKey,
TokenAccountOwnerIdxKey,
Expand Down Expand Up @@ -270,4 +270,13 @@ impl BatchSaveStorage {
}
Ok(())
}

pub fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
self.storage
.clean_syncronized_idxs_with_batch(asset_type, last_synced_key)
}
}
17 changes: 16 additions & 1 deletion rocks-db/src/storage_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use solana_sdk::pubkey::Pubkey;

pub use crate::Result;
use crate::Storage;
use entities::models::{AssetIndex, FungibleAssetIndex};
use entities::{
enums::AssetType,
models::{AssetIndex, FungibleAssetIndex},
};

#[derive(Clone, Debug, PartialEq)]
pub struct AssetUpdatedKey {
Expand Down Expand Up @@ -42,6 +45,9 @@ pub trait AssetUpdateIndexStorage {
limit: usize,
skip_keys: Option<HashSet<Pubkey>>,
) -> Result<(HashSet<Pubkey>, Option<AssetUpdatedKey>)>;

fn clean_syncronized_idxs(&self, asset_type: AssetType, last_synced_key: Vec<u8>)
-> Result<()>;
}

#[automock]
Expand Down Expand Up @@ -130,6 +136,15 @@ impl AssetUpdateIndexStorage for MockAssetIndexStorage {
self.mock_update_index_storage
.fetch_fungible_asset_updated_keys(from, up_to, limit, skip_keys)
}

fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
self.mock_update_index_storage
.clean_syncronized_idxs(asset_type, last_synced_key)
}
}

#[async_trait]
Expand Down

0 comments on commit 2f8e7ee

Please sign in to comment.