-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mtg-778] Clean asset slot update idx #325
Changes from 1 commit
2f8e7ee
49f0096
3e3f044
eadeb0d
089528a
676f605
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3714,4 +3714,118 @@ mod tests { | |
fungible_token_account1.to_string() | ||
); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_idx_cleaner() { | ||
let cnt = 100; | ||
let cli = Cli::default(); | ||
|
||
let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; | ||
let synchronizer = nft_ingester::index_syncronizer::Synchronizer::new( | ||
env.rocks_env.storage.clone(), | ||
env.pg_env.client.clone(), | ||
env.pg_env.client.clone(), | ||
200_000, | ||
"".to_string(), | ||
Arc::new(SynchronizerMetricsConfig::new()), | ||
1, | ||
false, | ||
); | ||
let non_fungible_token_mint = generated_assets.pubkeys[1]; | ||
let mint = Mint { | ||
pubkey: non_fungible_token_mint, | ||
supply: 100000, | ||
decimals: 0, | ||
mint_authority: None, | ||
freeze_authority: None, | ||
token_program: Default::default(), | ||
slot_updated: 10, | ||
write_version: 10, | ||
extensions: None, | ||
}; | ||
|
||
let owner: Pubkey = generated_assets.owners[1].owner.value.unwrap(); | ||
let token_account_addr = Pubkey::new_unique(); | ||
let token_account = TokenAccount { | ||
pubkey: token_account_addr, | ||
mint: non_fungible_token_mint, | ||
delegate: None, | ||
owner, | ||
extensions: None, | ||
frozen: false, | ||
delegated_amount: 0, | ||
slot_updated: 10, | ||
amount: 100, | ||
write_version: 10, | ||
}; | ||
let mut batch_storage = BatchSaveStorage::new( | ||
env.rocks_env.storage.clone(), | ||
10, | ||
Arc::new(IngesterMetricsConfig::new()), | ||
); | ||
let token_accounts_processor = | ||
TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new())); | ||
token_accounts_processor | ||
.transform_and_save_token_account( | ||
&mut batch_storage, | ||
token_account_addr, | ||
&token_account, | ||
) | ||
.unwrap(); | ||
token_accounts_processor | ||
.transform_and_save_mint_account(&mut batch_storage, &mint) | ||
.unwrap(); | ||
batch_storage.flush().unwrap(); | ||
|
||
// one more idx shoul've been added | ||
let mut number_of_fungible_idxs = 0; | ||
let mut number_of_non_fungible_idxs = 0; | ||
let mut idx_fungible_asset_iter = env | ||
.rocks_env | ||
.storage | ||
.fungible_assets_update_idx | ||
.iter_start(); | ||
let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); | ||
|
||
while let Some(_) = idx_fungible_asset_iter.next() { | ||
number_of_fungible_idxs += 1; | ||
} | ||
assert_eq!(number_of_fungible_idxs, 1); | ||
|
||
while let Some(_) = idx_non_fungible_asset_iter.next() { | ||
number_of_non_fungible_idxs += 1; | ||
} | ||
assert_eq!(number_of_non_fungible_idxs, 3); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: you can use iter_start().count() |
||
|
||
let (_, rx) = tokio::sync::broadcast::channel::<()>(1); | ||
|
||
synchronizer | ||
.synchronize_nft_asset_indexes(&rx, 0) | ||
.await | ||
.unwrap(); | ||
synchronizer | ||
.synchronize_fungible_asset_indexes(&rx, 0) | ||
.await | ||
.unwrap(); | ||
|
||
// after sync idxs should be cleaned again | ||
let mut number_of_fungible_idxs = 0; | ||
let mut number_of_non_fungible_idxs = 0; | ||
let mut idx_fungible_asset_iter = env | ||
.rocks_env | ||
.storage | ||
.fungible_assets_update_idx | ||
.iter_start(); | ||
let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); | ||
|
||
while let Some(_) = idx_fungible_asset_iter.next() { | ||
number_of_fungible_idxs += 1; | ||
} | ||
assert_eq!(number_of_fungible_idxs, 1); | ||
|
||
while let Some(_) = idx_non_fungible_asset_iter.next() { | ||
number_of_non_fungible_idxs += 1; | ||
} | ||
assert_eq!(number_of_non_fungible_idxs, 1); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,14 +10,14 @@ use crate::errors::StorageError; | |
use crate::key_encoders::encode_u64x2_pubkey; | ||
use crate::{Result, Storage}; | ||
use entities::api_req_params::Options; | ||
use entities::enums::TokenMetadataEdition; | ||
use entities::enums::{AssetType, TokenMetadataEdition}; | ||
use entities::models::{EditionData, PubkeyWithSlot}; | ||
use futures_util::FutureExt; | ||
use std::collections::HashMap; | ||
|
||
impl Storage { | ||
fn get_next_fungible_asset_update_seq(&self) -> Result<u64> { | ||
if self.fungible_assets_update_last_seq.load(Ordering::SeqCst) == 0 { | ||
if self.fungible_assets_update_last_seq.load(Ordering::Relaxed) == 0 { | ||
// If fungible_assets_update_next_seq is zero, fetch the last key from fungible_assets_update_idx | ||
let mut iter = self.fungible_assets_update_idx.iter_end(); // Assuming iter_end method fetches the last item | ||
|
||
|
@@ -27,13 +27,13 @@ impl Storage { | |
|
||
let seq = u64::from_be_bytes(last_key[..std::mem::size_of::<u64>()].try_into()?); | ||
self.fungible_assets_update_last_seq | ||
.store(seq, Ordering::SeqCst); | ||
.store(seq, Ordering::Relaxed); | ||
} | ||
} | ||
// Increment and return the sequence number | ||
let seq = self | ||
.fungible_assets_update_last_seq | ||
.fetch_add(1, Ordering::SeqCst) | ||
.fetch_add(1, Ordering::Relaxed) | ||
+ 1; | ||
Ok(seq) | ||
} | ||
|
@@ -113,6 +113,45 @@ impl Storage { | |
); | ||
Ok(()) | ||
} | ||
|
||
pub fn clean_syncronized_idxs_with_batch( | ||
&self, | ||
asset_type: AssetType, | ||
last_synced_key: Vec<u8>, | ||
) -> Result<()> { | ||
let (from, cf) = match asset_type { | ||
AssetType::Fungible => { | ||
let cf = self.fungible_assets_update_idx.handle(); | ||
let key_type_pairs = self.fungible_assets_update_idx.get_from_start(1); | ||
|
||
if key_type_pairs.is_empty() { | ||
return Ok(()); | ||
} | ||
let from = key_type_pairs[0].0.clone(); | ||
(from, cf) | ||
} | ||
AssetType::NonFungible => { | ||
let cf = self.assets_update_idx.handle(); | ||
let key_type_pairs = self.assets_update_idx.get_from_start(1); | ||
|
||
if key_type_pairs.is_empty() { | ||
return Ok(()); | ||
} | ||
let from = key_type_pairs[0].0.clone(); | ||
(from, cf) | ||
} | ||
}; | ||
|
||
if from == last_synced_key { | ||
return Ok(()); | ||
} | ||
|
||
let mut batch = rocksdb::WriteBatchWithTransaction::<false>::default(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please explain the reason for using batch here? It seems like you perform a single operation on this batch, so does it make sense to use it at all? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not needed anymore, indeed. Previously it was multiple deletions via iterator |
||
batch.delete_range_cf(&cf, from, last_synced_key); | ||
self.db.write(batch)?; | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
#[macro_export] | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,7 +2,7 @@ use crate::asset::{AssetCollection, MetadataMintMap}; | |
use crate::token_accounts::{TokenAccountMintOwnerIdx, TokenAccountOwnerIdx}; | ||
use crate::Result; | ||
use crate::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage}; | ||
use entities::enums::TokenMetadataEdition; | ||
use entities::enums::{AssetType, TokenMetadataEdition}; | ||
use entities::models::{ | ||
InscriptionDataInfo, InscriptionInfo, Mint, TokenAccount, TokenAccountMintOwnerIdxKey, | ||
TokenAccountOwnerIdxKey, | ||
|
@@ -270,4 +270,13 @@ impl BatchSaveStorage { | |
} | ||
Ok(()) | ||
} | ||
|
||
pub fn clean_syncronized_idxs( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need such wrapper functions? |
||
&self, | ||
asset_type: AssetType, | ||
last_synced_key: Vec<u8>, | ||
) -> Result<()> { | ||
self.storage | ||
.clean_syncronized_idxs_with_batch(asset_type, last_synced_key) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always expert fetch_last_synced_id() return here Some(_) because if we receive None it will mean that some jobs are incomplete or we have some bug in code. So, maybe use ok_or()? Or at least add an error log if fetch_last_synced_id returned None