From 2f8e7ee48b7bcd077d2bac6457ab3287cd301612 Mon Sep 17 00:00:00 2001 From: Kyrylo Stepanov Date: Tue, 26 Nov 2024 12:31:54 +0200 Subject: [PATCH 1/6] Implemented cleaning up asset slot update idx && added tests --- nft_ingester/src/index_syncronizer.rs | 43 ++++++++-- nft_ingester/tests/api_tests.rs | 114 ++++++++++++++++++++++++++ rocks-db/src/asset_client.rs | 47 ++++++++++- rocks-db/src/batch_client.rs | 12 ++- rocks-db/src/batch_savers.rs | 11 ++- rocks-db/src/storage_traits.rs | 17 +++- 6 files changed, 228 insertions(+), 16 deletions(-) diff --git a/nft_ingester/src/index_syncronizer.rs b/nft_ingester/src/index_syncronizer.rs index 5f6886652..dd317fe4a 100644 --- a/nft_ingester/src/index_syncronizer.rs +++ b/nft_ingester/src/index_syncronizer.rs @@ -172,21 +172,29 @@ where rx: &tokio::sync::broadcast::Receiver<()>, run_full_sync_threshold: i64, ) -> Result<(), IngesterError> { + let asset_type = AssetType::NonFungible; + let state = self - .get_sync_state(run_full_sync_threshold, AssetType::NonFungible) + .get_sync_state(run_full_sync_threshold, asset_type) .await?; match state { SyncStatus::FullSyncRequired(state) => { tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key) - .await + .await?; } SyncStatus::RegularSyncRequired(state) => { self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key) - .await + .await?; } - SyncStatus::NoSyncRequired => Ok(()), + SyncStatus::NoSyncRequired => {} + } + + if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? { + self.clean_syncronized_idxs(asset_type, encoded_key)?; } + + Ok(()) } pub async fn synchronize_fungible_asset_indexes( @@ -194,22 +202,41 @@ where rx: &tokio::sync::broadcast::Receiver<()>, run_full_sync_threshold: i64, ) -> Result<(), IngesterError> { + let asset_type = AssetType::Fungible; + let state = self - .get_sync_state(run_full_sync_threshold, AssetType::Fungible) + .get_sync_state(run_full_sync_threshold, asset_type) .await?; match state { SyncStatus::FullSyncRequired(state) => { tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key) - .await + .await?; } SyncStatus::RegularSyncRequired(state) => { self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key) - .await + .await?; } - SyncStatus::NoSyncRequired => Ok(()), + SyncStatus::NoSyncRequired => {} + } + + if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? { + self.clean_syncronized_idxs(asset_type, encoded_key)?; } + + Ok(()) + } + + pub fn clean_syncronized_idxs( + &self, + asset_type: AssetType, + last_synced_key: Vec, + ) -> Result<(), IngesterError> { + self.primary_storage + .clean_syncronized_idxs(asset_type, last_synced_key)?; + + Ok(()) } pub async fn full_syncronize( diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 4a91bd79a..4cf046e34 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -3714,4 +3714,118 @@ mod tests { fungible_token_account1.to_string() ); } + + #[tokio::test(flavor = "multi_thread")] + async fn test_idx_cleaner() { + let cnt = 100; + let cli = Cli::default(); + + let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; + let synchronizer = nft_ingester::index_syncronizer::Synchronizer::new( + env.rocks_env.storage.clone(), + env.pg_env.client.clone(), + env.pg_env.client.clone(), + 200_000, + "".to_string(), + Arc::new(SynchronizerMetricsConfig::new()), + 1, + false, + ); + let non_fungible_token_mint = generated_assets.pubkeys[1]; + let mint = Mint { + pubkey: non_fungible_token_mint, + supply: 100000, + decimals: 0, + mint_authority: None, + freeze_authority: None, + token_program: Default::default(), + slot_updated: 10, + write_version: 10, + extensions: None, + }; + + let owner: Pubkey = generated_assets.owners[1].owner.value.unwrap(); + let token_account_addr = Pubkey::new_unique(); + let token_account = TokenAccount { + pubkey: token_account_addr, + mint: non_fungible_token_mint, + delegate: None, + owner, + extensions: None, + frozen: false, + delegated_amount: 0, + slot_updated: 10, + amount: 100, + write_version: 10, + }; + let mut batch_storage = BatchSaveStorage::new( + env.rocks_env.storage.clone(), + 10, + Arc::new(IngesterMetricsConfig::new()), + ); + let token_accounts_processor = + TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new())); + token_accounts_processor + .transform_and_save_token_account( + &mut batch_storage, + token_account_addr, + &token_account, + ) + .unwrap(); + token_accounts_processor + .transform_and_save_mint_account(&mut batch_storage, &mint) + .unwrap(); + batch_storage.flush().unwrap(); + + // one more idx shoul've been added + let mut number_of_fungible_idxs = 0; + let mut number_of_non_fungible_idxs = 0; + let mut idx_fungible_asset_iter = env + .rocks_env + .storage + .fungible_assets_update_idx + .iter_start(); + let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); + + while let Some(_) = idx_fungible_asset_iter.next() { + number_of_fungible_idxs += 1; + } + assert_eq!(number_of_fungible_idxs, 1); + + while let Some(_) = idx_non_fungible_asset_iter.next() { + number_of_non_fungible_idxs += 1; + } + assert_eq!(number_of_non_fungible_idxs, 3); + + let (_, rx) = tokio::sync::broadcast::channel::<()>(1); + + synchronizer + .synchronize_nft_asset_indexes(&rx, 0) + .await + .unwrap(); + synchronizer + .synchronize_fungible_asset_indexes(&rx, 0) + .await + .unwrap(); + + // after sync idxs should be cleaned again + let mut number_of_fungible_idxs = 0; + let mut number_of_non_fungible_idxs = 0; + let mut idx_fungible_asset_iter = env + .rocks_env + .storage + .fungible_assets_update_idx + .iter_start(); + let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); + + while let Some(_) = idx_fungible_asset_iter.next() { + number_of_fungible_idxs += 1; + } + assert_eq!(number_of_fungible_idxs, 1); + + while let Some(_) = idx_non_fungible_asset_iter.next() { + number_of_non_fungible_idxs += 1; + } + assert_eq!(number_of_non_fungible_idxs, 1); + } } diff --git a/rocks-db/src/asset_client.rs b/rocks-db/src/asset_client.rs index b69b1f4ef..f9d361813 100644 --- a/rocks-db/src/asset_client.rs +++ b/rocks-db/src/asset_client.rs @@ -10,14 +10,14 @@ use crate::errors::StorageError; use crate::key_encoders::encode_u64x2_pubkey; use crate::{Result, Storage}; use entities::api_req_params::Options; -use entities::enums::TokenMetadataEdition; +use entities::enums::{AssetType, TokenMetadataEdition}; use entities::models::{EditionData, PubkeyWithSlot}; use futures_util::FutureExt; use std::collections::HashMap; impl Storage { fn get_next_fungible_asset_update_seq(&self) -> Result { - if self.fungible_assets_update_last_seq.load(Ordering::SeqCst) == 0 { + if self.fungible_assets_update_last_seq.load(Ordering::Relaxed) == 0 { // If fungible_assets_update_next_seq is zero, fetch the last key from fungible_assets_update_idx let mut iter = self.fungible_assets_update_idx.iter_end(); // Assuming iter_end method fetches the last item @@ -27,13 +27,13 @@ impl Storage { let seq = u64::from_be_bytes(last_key[..std::mem::size_of::()].try_into()?); self.fungible_assets_update_last_seq - .store(seq, Ordering::SeqCst); + .store(seq, Ordering::Relaxed); } } // Increment and return the sequence number let seq = self .fungible_assets_update_last_seq - .fetch_add(1, Ordering::SeqCst) + .fetch_add(1, Ordering::Relaxed) + 1; Ok(seq) } @@ -113,6 +113,45 @@ impl Storage { ); Ok(()) } + + pub fn clean_syncronized_idxs_with_batch( + &self, + asset_type: AssetType, + last_synced_key: Vec, + ) -> Result<()> { + let (from, cf) = match asset_type { + AssetType::Fungible => { + let cf = self.fungible_assets_update_idx.handle(); + let key_type_pairs = self.fungible_assets_update_idx.get_from_start(1); + + if key_type_pairs.is_empty() { + return Ok(()); + } + let from = key_type_pairs[0].0.clone(); + (from, cf) + } + AssetType::NonFungible => { + let cf = self.assets_update_idx.handle(); + let key_type_pairs = self.assets_update_idx.get_from_start(1); + + if key_type_pairs.is_empty() { + return Ok(()); + } + let from = key_type_pairs[0].0.clone(); + (from, cf) + } + }; + + if from == last_synced_key { + return Ok(()); + } + + let mut batch = rocksdb::WriteBatchWithTransaction::::default(); + batch.delete_range_cf(&cf, from, last_synced_key); + self.db.write(batch)?; + + Ok(()) + } } #[macro_export] diff --git a/rocks-db/src/batch_client.rs b/rocks-db/src/batch_client.rs index 305f16e40..c3184cf97 100644 --- a/rocks-db/src/batch_client.rs +++ b/rocks-db/src/batch_client.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use async_trait::async_trait; -use entities::enums::{SpecificationVersions, TokenMetadataEdition}; +use entities::enums::{AssetType, SpecificationVersions, TokenMetadataEdition}; use serde_json::json; use solana_sdk::pubkey::Pubkey; @@ -183,6 +183,14 @@ impl AssetUpdateIndexStorage for Storage { ); Ok((unique_pubkeys, last_key)) } + + fn clean_syncronized_idxs( + &self, + asset_type: AssetType, + last_synced_key: Vec, + ) -> Result<()> { + self.clean_syncronized_idxs_with_batch(asset_type, last_synced_key) + } } #[async_trait] @@ -200,9 +208,9 @@ impl AssetIndexReader for Storage { let fungible_asset_index = FungibleAssetIndex { pubkey: token_acc.pubkey, owner: Some(token_acc.owner), + slot_updated: token_acc.slot_updated, fungible_asset_mint: Some(token_acc.mint), fungible_asset_balance: Some(token_acc.amount as u64), - slot_updated: token_acc.slot_updated, }; fungible_assets_indexes.insert(token_acc.pubkey, fungible_asset_index); diff --git a/rocks-db/src/batch_savers.rs b/rocks-db/src/batch_savers.rs index ec3d7d39e..5d31a38aa 100644 --- a/rocks-db/src/batch_savers.rs +++ b/rocks-db/src/batch_savers.rs @@ -2,7 +2,7 @@ use crate::asset::{AssetCollection, MetadataMintMap}; use crate::token_accounts::{TokenAccountMintOwnerIdx, TokenAccountOwnerIdx}; use crate::Result; use crate::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage}; -use entities::enums::TokenMetadataEdition; +use entities::enums::{AssetType, TokenMetadataEdition}; use entities::models::{ InscriptionDataInfo, InscriptionInfo, Mint, TokenAccount, TokenAccountMintOwnerIdxKey, TokenAccountOwnerIdxKey, @@ -270,4 +270,13 @@ impl BatchSaveStorage { } Ok(()) } + + pub fn clean_syncronized_idxs( + &self, + asset_type: AssetType, + last_synced_key: Vec, + ) -> Result<()> { + self.storage + .clean_syncronized_idxs_with_batch(asset_type, last_synced_key) + } } diff --git a/rocks-db/src/storage_traits.rs b/rocks-db/src/storage_traits.rs index dadb5b212..80be54346 100644 --- a/rocks-db/src/storage_traits.rs +++ b/rocks-db/src/storage_traits.rs @@ -6,7 +6,10 @@ use solana_sdk::pubkey::Pubkey; pub use crate::Result; use crate::Storage; -use entities::models::{AssetIndex, FungibleAssetIndex}; +use entities::{ + enums::AssetType, + models::{AssetIndex, FungibleAssetIndex}, +}; #[derive(Clone, Debug, PartialEq)] pub struct AssetUpdatedKey { @@ -42,6 +45,9 @@ pub trait AssetUpdateIndexStorage { limit: usize, skip_keys: Option>, ) -> Result<(HashSet, Option)>; + + fn clean_syncronized_idxs(&self, asset_type: AssetType, last_synced_key: Vec) + -> Result<()>; } #[automock] @@ -130,6 +136,15 @@ impl AssetUpdateIndexStorage for MockAssetIndexStorage { self.mock_update_index_storage .fetch_fungible_asset_updated_keys(from, up_to, limit, skip_keys) } + + fn clean_syncronized_idxs( + &self, + asset_type: AssetType, + last_synced_key: Vec, + ) -> Result<()> { + self.mock_update_index_storage + .clean_syncronized_idxs(asset_type, last_synced_key) + } } #[async_trait] From 49f0096301816ce36194d9c0e1cce51db9e63bc5 Mon Sep 17 00:00:00 2001 From: Kyrylo Stepanov Date: Tue, 26 Nov 2024 14:29:54 +0200 Subject: [PATCH 2/6] Simplify cleaning up asset update index --- rocks-db/src/asset_client.rs | 29 ++++------------------------- 1 file changed, 4 insertions(+), 25 deletions(-) diff --git a/rocks-db/src/asset_client.rs b/rocks-db/src/asset_client.rs index f9d361813..f4348a412 100644 --- a/rocks-db/src/asset_client.rs +++ b/rocks-db/src/asset_client.rs @@ -119,34 +119,13 @@ impl Storage { asset_type: AssetType, last_synced_key: Vec, ) -> Result<()> { - let (from, cf) = match asset_type { - AssetType::Fungible => { - let cf = self.fungible_assets_update_idx.handle(); - let key_type_pairs = self.fungible_assets_update_idx.get_from_start(1); - - if key_type_pairs.is_empty() { - return Ok(()); - } - let from = key_type_pairs[0].0.clone(); - (from, cf) - } - AssetType::NonFungible => { - let cf = self.assets_update_idx.handle(); - let key_type_pairs = self.assets_update_idx.get_from_start(1); - - if key_type_pairs.is_empty() { - return Ok(()); - } - let from = key_type_pairs[0].0.clone(); - (from, cf) - } + let cf = match asset_type { + AssetType::Fungible => self.fungible_assets_update_idx.handle(), + AssetType::NonFungible => self.assets_update_idx.handle(), }; - if from == last_synced_key { - return Ok(()); - } - let mut batch = rocksdb::WriteBatchWithTransaction::::default(); + let from = vec![]; batch.delete_range_cf(&cf, from, last_synced_key); self.db.write(batch)?; From 3e3f044dd0ea0a43d3f9d85912b9626dd0650225 Mon Sep 17 00:00:00 2001 From: Kyrylo Stepanov Date: Tue, 26 Nov 2024 16:15:06 +0200 Subject: [PATCH 3/6] simplified interfaces and their number for assets idx cleaning up && called cleaning up from the ingester instead of the synchronizer --- nft_ingester/src/bin/ingester/main.rs | 34 ++++++++++++-- nft_ingester/src/index_syncronizer.rs | 35 +++----------- nft_ingester/tests/api_tests.rs | 67 ++++++++++++--------------- rocks-db/src/asset_client.rs | 6 +-- rocks-db/src/batch_client.rs | 10 +--- rocks-db/src/batch_savers.rs | 11 +---- rocks-db/src/storage_traits.rs | 17 +------ 7 files changed, 72 insertions(+), 108 deletions(-) diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 389977403..ed7c7f3fb 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -4,6 +4,7 @@ use entities::enums::{AssetType, ASSET_TYPES}; use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister}; use nft_ingester::scheduler::Scheduler; use postgre_client::PG_MIGRATIONS_PATH; +use rocks_db::key_encoders::encode_u64x2_pubkey; use std::panic; use std::path::PathBuf; use std::str::FromStr; @@ -59,7 +60,7 @@ use nft_ingester::transaction_ingester::BackfillTransactionIngester; use nft_ingester::{config::init_logger, error::IngesterError}; use rocks_db::backup_service; use rocks_db::backup_service::BackupService; -use rocks_db::storage_traits::AssetSlotStorage; +use rocks_db::storage_traits::{AssetSlotStorage, AssetUpdateIndexStorage}; use tonic::transport::Server; use usecase::asset_streamer::AssetStreamer; use usecase::proofs::MaybeProofChecker; @@ -75,6 +76,7 @@ pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb"; pub const ARWEAVE_WALLET_PATH: &str = "./arweave_wallet.json"; pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 100; pub const DEFAULT_MAX_POSTGRES_CONNECTIONS: u32 = 100; +pub const SECONDS_TO_RETRY_IDXS_CLEANUP: u64 = 15; #[derive(Parser, Debug)] struct Args { @@ -605,8 +607,6 @@ pub async fn main() -> Result<(), IngesterError> { } if !config.disable_synchronizer { - let synchronizer = Arc::new(synchronizer); - for asset_type in ASSET_TYPES { let rx = shutdown_rx.resubscribe(); let synchronizer = synchronizer.clone(); @@ -800,6 +800,34 @@ pub async fn main() -> Result<(), IngesterError> { batch_mint_persister.persist_batch_mints(rx).await }); + // clean indexes + for asset_type in ASSET_TYPES { + let primary_rocks_storage = primary_rocks_storage.clone(); + let mut rx = shutdown_rx.resubscribe(); + mutexed_tasks.lock().await.spawn(async move { + loop { + if rx.try_recv().is_ok() { + break; + } + let optional_last_synced_key = match asset_type { + AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(), + AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(), + }; + + if let Ok(Some(last_synced_key)) = optional_last_synced_key { + let last_synced_key = + encode_u64x2_pubkey(last_synced_key.seq, last_synced_key.slot, last_synced_key.pubkey); + primary_rocks_storage + .clean_syncronized_idxs(asset_type, last_synced_key) + .unwrap(); + }; + tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await; + } + + Ok(()) + }); + } + start_metrics(metrics_state.registry, config.metrics_port).await; // --stop diff --git a/nft_ingester/src/index_syncronizer.rs b/nft_ingester/src/index_syncronizer.rs index dd317fe4a..a4196974a 100644 --- a/nft_ingester/src/index_syncronizer.rs +++ b/nft_ingester/src/index_syncronizer.rs @@ -181,20 +181,14 @@ where SyncStatus::FullSyncRequired(state) => { tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key) - .await?; + .await } SyncStatus::RegularSyncRequired(state) => { self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key) - .await?; + .await } - SyncStatus::NoSyncRequired => {} - } - - if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? { - self.clean_syncronized_idxs(asset_type, encoded_key)?; + SyncStatus::NoSyncRequired => Ok(()), } - - Ok(()) } pub async fn synchronize_fungible_asset_indexes( @@ -212,31 +206,14 @@ where SyncStatus::FullSyncRequired(state) => { tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq); self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key) - .await?; + .await } SyncStatus::RegularSyncRequired(state) => { self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key) - .await?; + .await } - SyncStatus::NoSyncRequired => {} + SyncStatus::NoSyncRequired => Ok(()), } - - if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? { - self.clean_syncronized_idxs(asset_type, encoded_key)?; - } - - Ok(()) - } - - pub fn clean_syncronized_idxs( - &self, - asset_type: AssetType, - last_synced_key: Vec, - ) -> Result<(), IngesterError> { - self.primary_storage - .clean_syncronized_idxs(asset_type, last_synced_key)?; - - Ok(()) } pub async fn full_syncronize( diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 4cf046e34..3090e4005 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -7,6 +7,8 @@ mod tests { ShadowInterestBearingConfig, ShadowTransferFee, ShadowTransferFeeConfig, UnixTimestamp, }; use blockbuster::programs::token_extensions::MintAccountExtensions; + use rocks_db::key_encoders::encode_u64x2_pubkey; + use rocks_db::storage_traits::AssetUpdateIndexStorage; use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; @@ -3721,7 +3723,7 @@ mod tests { let cli = Cli::default(); let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; - let synchronizer = nft_ingester::index_syncronizer::Synchronizer::new( + let _synchronizer = nft_ingester::index_syncronizer::Synchronizer::new( env.rocks_env.storage.clone(), env.pg_env.client.clone(), env.pg_env.client.clone(), @@ -3778,54 +3780,45 @@ mod tests { batch_storage.flush().unwrap(); // one more idx shoul've been added - let mut number_of_fungible_idxs = 0; - let mut number_of_non_fungible_idxs = 0; - let mut idx_fungible_asset_iter = env + let idx_fungible_asset_iter = env .rocks_env .storage .fungible_assets_update_idx .iter_start(); - let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); - - while let Some(_) = idx_fungible_asset_iter.next() { - number_of_fungible_idxs += 1; - } - assert_eq!(number_of_fungible_idxs, 1); + let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); + assert_eq!(idx_fungible_asset_iter.count(), 1); + assert_eq!(idx_non_fungible_asset_iter.count(), cnt + 2); - while let Some(_) = idx_non_fungible_asset_iter.next() { - number_of_non_fungible_idxs += 1; - } - assert_eq!(number_of_non_fungible_idxs, 3); + for asset_type in ASSET_TYPES { + let optional_last_synced_key = match asset_type { + AssetType::NonFungible => env.rocks_env.storage.last_known_nft_asset_updated_key(), + AssetType::Fungible => env + .rocks_env + .storage + .last_known_fungible_asset_updated_key(), + }; - let (_, rx) = tokio::sync::broadcast::channel::<()>(1); + let last_synced_key = optional_last_synced_key.unwrap().unwrap(); + let last_synced_key = encode_u64x2_pubkey( + last_synced_key.seq, + last_synced_key.slot, + last_synced_key.pubkey, + ); - synchronizer - .synchronize_nft_asset_indexes(&rx, 0) - .await - .unwrap(); - synchronizer - .synchronize_fungible_asset_indexes(&rx, 0) - .await - .unwrap(); + env.rocks_env + .storage + .clean_syncronized_idxs(asset_type, last_synced_key) + .unwrap(); + } // after sync idxs should be cleaned again - let mut number_of_fungible_idxs = 0; - let mut number_of_non_fungible_idxs = 0; - let mut idx_fungible_asset_iter = env + let idx_fungible_asset_iter = env .rocks_env .storage .fungible_assets_update_idx .iter_start(); - let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); - - while let Some(_) = idx_fungible_asset_iter.next() { - number_of_fungible_idxs += 1; - } - assert_eq!(number_of_fungible_idxs, 1); - - while let Some(_) = idx_non_fungible_asset_iter.next() { - number_of_non_fungible_idxs += 1; - } - assert_eq!(number_of_non_fungible_idxs, 1); + let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); + assert_eq!(idx_fungible_asset_iter.count(), 1); + assert_eq!(idx_non_fungible_asset_iter.count(), 1); } } diff --git a/rocks-db/src/asset_client.rs b/rocks-db/src/asset_client.rs index f4348a412..0411a916c 100644 --- a/rocks-db/src/asset_client.rs +++ b/rocks-db/src/asset_client.rs @@ -114,7 +114,7 @@ impl Storage { Ok(()) } - pub fn clean_syncronized_idxs_with_batch( + pub fn clean_syncronized_idxs( &self, asset_type: AssetType, last_synced_key: Vec, @@ -124,10 +124,8 @@ impl Storage { AssetType::NonFungible => self.assets_update_idx.handle(), }; - let mut batch = rocksdb::WriteBatchWithTransaction::::default(); let from = vec![]; - batch.delete_range_cf(&cf, from, last_synced_key); - self.db.write(batch)?; + self.db.delete_range_cf(&cf, from, last_synced_key)?; Ok(()) } diff --git a/rocks-db/src/batch_client.rs b/rocks-db/src/batch_client.rs index c3184cf97..e0c8c64c8 100644 --- a/rocks-db/src/batch_client.rs +++ b/rocks-db/src/batch_client.rs @@ -1,7 +1,7 @@ use std::collections::{HashMap, HashSet}; use async_trait::async_trait; -use entities::enums::{AssetType, SpecificationVersions, TokenMetadataEdition}; +use entities::enums::{SpecificationVersions, TokenMetadataEdition}; use serde_json::json; use solana_sdk::pubkey::Pubkey; @@ -183,14 +183,6 @@ impl AssetUpdateIndexStorage for Storage { ); Ok((unique_pubkeys, last_key)) } - - fn clean_syncronized_idxs( - &self, - asset_type: AssetType, - last_synced_key: Vec, - ) -> Result<()> { - self.clean_syncronized_idxs_with_batch(asset_type, last_synced_key) - } } #[async_trait] diff --git a/rocks-db/src/batch_savers.rs b/rocks-db/src/batch_savers.rs index 5d31a38aa..ec3d7d39e 100644 --- a/rocks-db/src/batch_savers.rs +++ b/rocks-db/src/batch_savers.rs @@ -2,7 +2,7 @@ use crate::asset::{AssetCollection, MetadataMintMap}; use crate::token_accounts::{TokenAccountMintOwnerIdx, TokenAccountOwnerIdx}; use crate::Result; use crate::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage}; -use entities::enums::{AssetType, TokenMetadataEdition}; +use entities::enums::TokenMetadataEdition; use entities::models::{ InscriptionDataInfo, InscriptionInfo, Mint, TokenAccount, TokenAccountMintOwnerIdxKey, TokenAccountOwnerIdxKey, @@ -270,13 +270,4 @@ impl BatchSaveStorage { } Ok(()) } - - pub fn clean_syncronized_idxs( - &self, - asset_type: AssetType, - last_synced_key: Vec, - ) -> Result<()> { - self.storage - .clean_syncronized_idxs_with_batch(asset_type, last_synced_key) - } } diff --git a/rocks-db/src/storage_traits.rs b/rocks-db/src/storage_traits.rs index 80be54346..dadb5b212 100644 --- a/rocks-db/src/storage_traits.rs +++ b/rocks-db/src/storage_traits.rs @@ -6,10 +6,7 @@ use solana_sdk::pubkey::Pubkey; pub use crate::Result; use crate::Storage; -use entities::{ - enums::AssetType, - models::{AssetIndex, FungibleAssetIndex}, -}; +use entities::models::{AssetIndex, FungibleAssetIndex}; #[derive(Clone, Debug, PartialEq)] pub struct AssetUpdatedKey { @@ -45,9 +42,6 @@ pub trait AssetUpdateIndexStorage { limit: usize, skip_keys: Option>, ) -> Result<(HashSet, Option)>; - - fn clean_syncronized_idxs(&self, asset_type: AssetType, last_synced_key: Vec) - -> Result<()>; } #[automock] @@ -136,15 +130,6 @@ impl AssetUpdateIndexStorage for MockAssetIndexStorage { self.mock_update_index_storage .fetch_fungible_asset_updated_keys(from, up_to, limit, skip_keys) } - - fn clean_syncronized_idxs( - &self, - asset_type: AssetType, - last_synced_key: Vec, - ) -> Result<()> { - self.mock_update_index_storage - .clean_syncronized_idxs(asset_type, last_synced_key) - } } #[async_trait] From eadeb0d6bc6bc2c71788a927021e9c395f309847 Mon Sep 17 00:00:00 2001 From: Kyrylo Stepanov Date: Tue, 26 Nov 2024 17:46:24 +0200 Subject: [PATCH 4/6] reorganize cleaners into a module --- nft_ingester/src/bin/ingester/main.rs | 27 ++++++++--------- .../src/{ => cleaners}/fork_cleaner.rs | 0 nft_ingester/src/cleaners/indexer_cleaner.rs | 29 +++++++++++++++++++ nft_ingester/src/cleaners/mod.rs | 2 ++ nft_ingester/src/lib.rs | 2 +- nft_ingester/tests/api_tests.rs | 24 ++------------- nft_ingester/tests/clean_forks_test.rs | 2 +- 7 files changed, 48 insertions(+), 38 deletions(-) rename nft_ingester/src/{ => cleaners}/fork_cleaner.rs (100%) create mode 100644 nft_ingester/src/cleaners/indexer_cleaner.rs create mode 100644 nft_ingester/src/cleaners/mod.rs diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index ed7c7f3fb..4371cb0b3 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -2,9 +2,9 @@ use arweave_rs::consts::ARWEAVE_BASE_URL; use arweave_rs::Arweave; use entities::enums::{AssetType, ASSET_TYPES}; use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister}; +use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs; use nft_ingester::scheduler::Scheduler; use postgre_client::PG_MIGRATIONS_PATH; -use rocks_db::key_encoders::encode_u64x2_pubkey; use std::panic; use std::path::PathBuf; use std::str::FromStr; @@ -41,10 +41,10 @@ use nft_ingester::backfiller::{ }; use nft_ingester::batch_mint::batch_mint_processor::{process_batch_mints, BatchMintProcessor, NoopBatchMintTxSender}; use nft_ingester::buffer::{debug_buffer, Buffer}; +use nft_ingester::cleaners::fork_cleaner::{run_fork_cleaner, ForkCleaner}; use nft_ingester::config::{ setup_config, ApiConfig, BackfillerConfig, BackfillerMode, IngesterConfig, MessageSource, INGESTER_CONFIG_PREFIX, }; -use nft_ingester::fork_cleaner::{run_fork_cleaner, ForkCleaner}; use nft_ingester::gapfiller::{process_asset_details_stream_wrapper, run_sequence_consistent_gapfiller}; use nft_ingester::index_syncronizer::Synchronizer; use nft_ingester::init::{graceful_stop, init_index_storage_with_migration, init_primary_storage}; @@ -60,7 +60,7 @@ use nft_ingester::transaction_ingester::BackfillTransactionIngester; use nft_ingester::{config::init_logger, error::IngesterError}; use rocks_db::backup_service; use rocks_db::backup_service::BackupService; -use rocks_db::storage_traits::{AssetSlotStorage, AssetUpdateIndexStorage}; +use rocks_db::storage_traits::AssetSlotStorage; use tonic::transport::Server; use usecase::asset_streamer::AssetStreamer; use usecase::proofs::MaybeProofChecker; @@ -809,18 +809,15 @@ pub async fn main() -> Result<(), IngesterError> { if rx.try_recv().is_ok() { break; } - let optional_last_synced_key = match asset_type { - AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(), - AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(), - }; - - if let Ok(Some(last_synced_key)) = optional_last_synced_key { - let last_synced_key = - encode_u64x2_pubkey(last_synced_key.seq, last_synced_key.slot, last_synced_key.pubkey); - primary_rocks_storage - .clean_syncronized_idxs(asset_type, last_synced_key) - .unwrap(); - }; + + match clean_syncronized_idxs(primary_rocks_storage.clone(), asset_type) { + Ok(_) => { + info!("Cleaned synchronized indexes for {:?}", asset_type); + } + Err(e) => { + error!("Failed to clean synchronized indexes for {:?} with error {}", asset_type, e); + } + } tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await; } diff --git a/nft_ingester/src/fork_cleaner.rs b/nft_ingester/src/cleaners/fork_cleaner.rs similarity index 100% rename from nft_ingester/src/fork_cleaner.rs rename to nft_ingester/src/cleaners/fork_cleaner.rs diff --git a/nft_ingester/src/cleaners/indexer_cleaner.rs b/nft_ingester/src/cleaners/indexer_cleaner.rs new file mode 100644 index 000000000..5d5b9a503 --- /dev/null +++ b/nft_ingester/src/cleaners/indexer_cleaner.rs @@ -0,0 +1,29 @@ +use std::sync::Arc; + +use entities::enums::AssetType; +use rocks_db::{ + key_encoders::encode_u64x2_pubkey, storage_traits::AssetUpdateIndexStorage, Storage, +}; + +use crate::error::IngesterError; + +pub fn clean_syncronized_idxs( + primary_rocks_storage: Arc, + asset_type: AssetType, +) -> Result<(), IngesterError> { + let optional_last_synced_key = match asset_type { + AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(), + AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(), + }; + + if let Ok(Some(last_synced_key)) = optional_last_synced_key { + let last_synced_key = encode_u64x2_pubkey( + last_synced_key.seq, + last_synced_key.slot, + last_synced_key.pubkey, + ); + primary_rocks_storage.clean_syncronized_idxs(asset_type, last_synced_key)?; + }; + + Ok(()) +} diff --git a/nft_ingester/src/cleaners/mod.rs b/nft_ingester/src/cleaners/mod.rs new file mode 100644 index 000000000..9e37fcb07 --- /dev/null +++ b/nft_ingester/src/cleaners/mod.rs @@ -0,0 +1,2 @@ +pub mod fork_cleaner; +pub mod indexer_cleaner; diff --git a/nft_ingester/src/lib.rs b/nft_ingester/src/lib.rs index fe5517fc4..dedaf78b9 100644 --- a/nft_ingester/src/lib.rs +++ b/nft_ingester/src/lib.rs @@ -3,10 +3,10 @@ pub mod api; pub mod backfiller; pub mod batch_mint; pub mod buffer; +pub mod cleaners; pub mod config; pub mod error; pub mod flatbuffer_mapper; -pub mod fork_cleaner; pub mod gapfiller; pub mod index_syncronizer; pub mod init; diff --git a/nft_ingester/tests/api_tests.rs b/nft_ingester/tests/api_tests.rs index 3090e4005..fccea61c5 100644 --- a/nft_ingester/tests/api_tests.rs +++ b/nft_ingester/tests/api_tests.rs @@ -7,8 +7,8 @@ mod tests { ShadowInterestBearingConfig, ShadowTransferFee, ShadowTransferFeeConfig, UnixTimestamp, }; use blockbuster::programs::token_extensions::MintAccountExtensions; - use rocks_db::key_encoders::encode_u64x2_pubkey; - use rocks_db::storage_traits::AssetUpdateIndexStorage; + use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs; + use std::str::FromStr; use std::{collections::HashMap, sync::Arc}; @@ -3790,25 +3790,7 @@ mod tests { assert_eq!(idx_non_fungible_asset_iter.count(), cnt + 2); for asset_type in ASSET_TYPES { - let optional_last_synced_key = match asset_type { - AssetType::NonFungible => env.rocks_env.storage.last_known_nft_asset_updated_key(), - AssetType::Fungible => env - .rocks_env - .storage - .last_known_fungible_asset_updated_key(), - }; - - let last_synced_key = optional_last_synced_key.unwrap().unwrap(); - let last_synced_key = encode_u64x2_pubkey( - last_synced_key.seq, - last_synced_key.slot, - last_synced_key.pubkey, - ); - - env.rocks_env - .storage - .clean_syncronized_idxs(asset_type, last_synced_key) - .unwrap(); + clean_syncronized_idxs(env.rocks_env.storage.clone(), asset_type).unwrap(); } // after sync idxs should be cleaned again diff --git a/nft_ingester/tests/clean_forks_test.rs b/nft_ingester/tests/clean_forks_test.rs index 56141d48f..c6929c0bf 100644 --- a/nft_ingester/tests/clean_forks_test.rs +++ b/nft_ingester/tests/clean_forks_test.rs @@ -6,7 +6,7 @@ use metrics_utils::utils::start_metrics; use metrics_utils::{MetricState, MetricsTrait}; use mpl_bubblegum::types::{BubblegumEventType, LeafSchema, Version}; use mpl_bubblegum::{InstructionName, LeafSchemaEvent}; -use nft_ingester::fork_cleaner::ForkCleaner; +use nft_ingester::cleaners::fork_cleaner::ForkCleaner; use nft_ingester::processors::transaction_based::bubblegum_updates_processor::BubblegumTxProcessor; use rocks_db::cl_items::ClItem; use rocks_db::column::TypedColumn; From 089528a6a7dfa59b0016dc60322cf8ee6a9b30cd Mon Sep 17 00:00:00 2001 From: Kyrylo Stepanov Date: Wed, 27 Nov 2024 13:18:59 +0200 Subject: [PATCH 5/6] Fix receiving shutdown signal in ingester and cleaning up service --- nft_ingester/src/bin/ingester/main.rs | 30 ++++++++++++++------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 4371cb0b3..29b829074 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -76,7 +76,7 @@ pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb"; pub const ARWEAVE_WALLET_PATH: &str = "./arweave_wallet.json"; pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 100; pub const DEFAULT_MAX_POSTGRES_CONNECTIONS: u32 = 100; -pub const SECONDS_TO_RETRY_IDXS_CLEANUP: u64 = 15; +pub const SECONDS_TO_RETRY_IDXS_CLEANUP: u64 = 15 * 60; // 15 minutes #[derive(Parser, Debug)] struct Args { @@ -805,20 +805,22 @@ pub async fn main() -> Result<(), IngesterError> { let primary_rocks_storage = primary_rocks_storage.clone(); let mut rx = shutdown_rx.resubscribe(); mutexed_tasks.lock().await.spawn(async move { - loop { - if rx.try_recv().is_ok() { - break; - } - - match clean_syncronized_idxs(primary_rocks_storage.clone(), asset_type) { - Ok(_) => { - info!("Cleaned synchronized indexes for {:?}", asset_type); + tokio::select! { + _ = rx.recv() => {} + _ = async move { + loop { + match clean_syncronized_idxs(primary_rocks_storage.clone(), asset_type) { + Ok(_) => { + info!("Cleaned synchronized indexes for {:?}", asset_type); + } + Err(e) => { + error!("Failed to clean synchronized indexes for {:?} with error {}", asset_type, e); + break; + } + } + tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await; } - Err(e) => { - error!("Failed to clean synchronized indexes for {:?} with error {}", asset_type, e); - } - } - tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await; + } => {} } Ok(()) From 676f6057a4af182e5b21244ce64646ba9f3284d4 Mon Sep 17 00:00:00 2001 From: Kyrylo Stepanov Date: Wed, 27 Nov 2024 19:59:10 +0200 Subject: [PATCH 6/6] continue working after remove indexer cleaner got an error --- nft_ingester/src/bin/ingester/main.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 29b829074..bd8d844e4 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -815,7 +815,6 @@ pub async fn main() -> Result<(), IngesterError> { } Err(e) => { error!("Failed to clean synchronized indexes for {:?} with error {}", asset_type, e); - break; } } tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await;