-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mtg-778] Clean asset slot update idx #325
Changes from all commits
2f8e7ee
49f0096
3e3f044
eadeb0d
089528a
676f605
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
use std::sync::Arc; | ||
|
||
use entities::enums::AssetType; | ||
use rocks_db::{ | ||
key_encoders::encode_u64x2_pubkey, storage_traits::AssetUpdateIndexStorage, Storage, | ||
}; | ||
|
||
use crate::error::IngesterError; | ||
|
||
pub fn clean_syncronized_idxs( | ||
primary_rocks_storage: Arc<Storage>, | ||
asset_type: AssetType, | ||
) -> Result<(), IngesterError> { | ||
let optional_last_synced_key = match asset_type { | ||
AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(), | ||
AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(), | ||
}; | ||
|
||
if let Ok(Some(last_synced_key)) = optional_last_synced_key { | ||
let last_synced_key = encode_u64x2_pubkey( | ||
last_synced_key.seq, | ||
last_synced_key.slot, | ||
last_synced_key.pubkey, | ||
); | ||
primary_rocks_storage.clean_syncronized_idxs(asset_type, last_synced_key)?; | ||
}; | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
pub mod fork_cleaner; | ||
pub mod indexer_cleaner; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,8 @@ mod tests { | |
ShadowInterestBearingConfig, ShadowTransferFee, ShadowTransferFeeConfig, UnixTimestamp, | ||
}; | ||
use blockbuster::programs::token_extensions::MintAccountExtensions; | ||
use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs; | ||
|
||
use std::str::FromStr; | ||
use std::{collections::HashMap, sync::Arc}; | ||
|
||
|
@@ -3714,4 +3716,91 @@ mod tests { | |
fungible_token_account1.to_string() | ||
); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_idx_cleaner() { | ||
let cnt = 100; | ||
let cli = Cli::default(); | ||
|
||
let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; | ||
let _synchronizer = nft_ingester::index_syncronizer::Synchronizer::new( | ||
env.rocks_env.storage.clone(), | ||
env.pg_env.client.clone(), | ||
env.pg_env.client.clone(), | ||
200_000, | ||
"".to_string(), | ||
Arc::new(SynchronizerMetricsConfig::new()), | ||
1, | ||
false, | ||
); | ||
let non_fungible_token_mint = generated_assets.pubkeys[1]; | ||
let mint = Mint { | ||
pubkey: non_fungible_token_mint, | ||
supply: 100000, | ||
decimals: 0, | ||
mint_authority: None, | ||
freeze_authority: None, | ||
token_program: Default::default(), | ||
slot_updated: 10, | ||
write_version: 10, | ||
extensions: None, | ||
}; | ||
|
||
let owner: Pubkey = generated_assets.owners[1].owner.value.unwrap(); | ||
let token_account_addr = Pubkey::new_unique(); | ||
let token_account = TokenAccount { | ||
pubkey: token_account_addr, | ||
mint: non_fungible_token_mint, | ||
delegate: None, | ||
owner, | ||
extensions: None, | ||
frozen: false, | ||
delegated_amount: 0, | ||
slot_updated: 10, | ||
amount: 100, | ||
write_version: 10, | ||
}; | ||
let mut batch_storage = BatchSaveStorage::new( | ||
env.rocks_env.storage.clone(), | ||
10, | ||
Arc::new(IngesterMetricsConfig::new()), | ||
); | ||
let token_accounts_processor = | ||
TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new())); | ||
token_accounts_processor | ||
.transform_and_save_token_account( | ||
&mut batch_storage, | ||
token_account_addr, | ||
&token_account, | ||
) | ||
.unwrap(); | ||
token_accounts_processor | ||
.transform_and_save_mint_account(&mut batch_storage, &mint) | ||
.unwrap(); | ||
batch_storage.flush().unwrap(); | ||
|
||
// one more idx shoul've been added | ||
let idx_fungible_asset_iter = env | ||
.rocks_env | ||
.storage | ||
.fungible_assets_update_idx | ||
.iter_start(); | ||
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); | ||
assert_eq!(idx_fungible_asset_iter.count(), 1); | ||
assert_eq!(idx_non_fungible_asset_iter.count(), cnt + 2); | ||
|
||
for asset_type in ASSET_TYPES { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: wdyt about wrapping part of this stuff with function? then we should not copy past it from |
||
clean_syncronized_idxs(env.rocks_env.storage.clone(), asset_type).unwrap(); | ||
} | ||
|
||
// after sync idxs should be cleaned again | ||
let idx_fungible_asset_iter = env | ||
.rocks_env | ||
.storage | ||
.fungible_assets_update_idx | ||
.iter_start(); | ||
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); | ||
assert_eq!(idx_fungible_asset_iter.count(), 1); | ||
assert_eq!(idx_non_fungible_asset_iter.count(), 1); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,14 +10,14 @@ use crate::errors::StorageError; | |
use crate::key_encoders::encode_u64x2_pubkey; | ||
use crate::{Result, Storage}; | ||
use entities::api_req_params::Options; | ||
use entities::enums::TokenMetadataEdition; | ||
use entities::enums::{AssetType, TokenMetadataEdition}; | ||
use entities::models::{EditionData, PubkeyWithSlot}; | ||
use futures_util::FutureExt; | ||
use std::collections::HashMap; | ||
|
||
impl Storage { | ||
fn get_next_fungible_asset_update_seq(&self) -> Result<u64> { | ||
if self.fungible_assets_update_last_seq.load(Ordering::SeqCst) == 0 { | ||
if self.fungible_assets_update_last_seq.load(Ordering::Relaxed) == 0 { | ||
// If fungible_assets_update_next_seq is zero, fetch the last key from fungible_assets_update_idx | ||
let mut iter = self.fungible_assets_update_idx.iter_end(); // Assuming iter_end method fetches the last item | ||
|
||
|
@@ -27,13 +27,13 @@ impl Storage { | |
|
||
let seq = u64::from_be_bytes(last_key[..std::mem::size_of::<u64>()].try_into()?); | ||
self.fungible_assets_update_last_seq | ||
.store(seq, Ordering::SeqCst); | ||
.store(seq, Ordering::Relaxed); | ||
} | ||
} | ||
// Increment and return the sequence number | ||
let seq = self | ||
.fungible_assets_update_last_seq | ||
.fetch_add(1, Ordering::SeqCst) | ||
.fetch_add(1, Ordering::Relaxed) | ||
+ 1; | ||
Ok(seq) | ||
} | ||
|
@@ -113,6 +113,22 @@ impl Storage { | |
); | ||
Ok(()) | ||
} | ||
|
||
pub fn clean_syncronized_idxs( | ||
&self, | ||
asset_type: AssetType, | ||
last_synced_key: Vec<u8>, | ||
) -> Result<()> { | ||
let cf = match asset_type { | ||
AssetType::Fungible => self.fungible_assets_update_idx.handle(), | ||
AssetType::NonFungible => self.assets_update_idx.handle(), | ||
}; | ||
|
||
let from = vec![]; | ||
self.db.delete_range_cf(&cf, from, last_synced_key)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add some additional metrics to track the execution time of this operation? |
||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
#[macro_export] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::select should be inside the loop, as we'll not exit from inside this loop on termination signal, will we?