-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mtg-778] Clean asset slot update idx #325
Changes from 4 commits
2f8e7ee
49f0096
3e3f044
eadeb0d
089528a
676f605
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ use arweave_rs::consts::ARWEAVE_BASE_URL; | |
use arweave_rs::Arweave; | ||
use entities::enums::{AssetType, ASSET_TYPES}; | ||
use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister}; | ||
use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs; | ||
use nft_ingester::scheduler::Scheduler; | ||
use postgre_client::PG_MIGRATIONS_PATH; | ||
use std::panic; | ||
|
@@ -40,10 +41,10 @@ use nft_ingester::backfiller::{ | |
}; | ||
use nft_ingester::batch_mint::batch_mint_processor::{process_batch_mints, BatchMintProcessor, NoopBatchMintTxSender}; | ||
use nft_ingester::buffer::{debug_buffer, Buffer}; | ||
use nft_ingester::cleaners::fork_cleaner::{run_fork_cleaner, ForkCleaner}; | ||
use nft_ingester::config::{ | ||
setup_config, ApiConfig, BackfillerConfig, BackfillerMode, IngesterConfig, MessageSource, INGESTER_CONFIG_PREFIX, | ||
}; | ||
use nft_ingester::fork_cleaner::{run_fork_cleaner, ForkCleaner}; | ||
use nft_ingester::gapfiller::{process_asset_details_stream_wrapper, run_sequence_consistent_gapfiller}; | ||
use nft_ingester::index_syncronizer::Synchronizer; | ||
use nft_ingester::init::{graceful_stop, init_index_storage_with_migration, init_primary_storage}; | ||
|
@@ -75,6 +76,7 @@ pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb"; | |
pub const ARWEAVE_WALLET_PATH: &str = "./arweave_wallet.json"; | ||
pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 100; | ||
pub const DEFAULT_MAX_POSTGRES_CONNECTIONS: u32 = 100; | ||
pub const SECONDS_TO_RETRY_IDXS_CLEANUP: u64 = 15; | ||
|
||
#[derive(Parser, Debug)] | ||
struct Args { | ||
|
@@ -605,8 +607,6 @@ pub async fn main() -> Result<(), IngesterError> { | |
} | ||
|
||
if !config.disable_synchronizer { | ||
let synchronizer = Arc::new(synchronizer); | ||
|
||
for asset_type in ASSET_TYPES { | ||
let rx = shutdown_rx.resubscribe(); | ||
let synchronizer = synchronizer.clone(); | ||
|
@@ -800,6 +800,31 @@ pub async fn main() -> Result<(), IngesterError> { | |
batch_mint_persister.persist_batch_mints(rx).await | ||
}); | ||
|
||
// clean indexes | ||
for asset_type in ASSET_TYPES { | ||
let primary_rocks_storage = primary_rocks_storage.clone(); | ||
let mut rx = shutdown_rx.resubscribe(); | ||
mutexed_tasks.lock().await.spawn(async move { | ||
loop { | ||
if rx.try_recv().is_ok() { | ||
break; | ||
} | ||
|
||
match clean_syncronized_idxs(primary_rocks_storage.clone(), asset_type) { | ||
Ok(_) => { | ||
info!("Cleaned synchronized indexes for {:?}", asset_type); | ||
} | ||
Err(e) => { | ||
error!("Failed to clean synchronized indexes for {:?} with error {}", asset_type, e); | ||
} | ||
} | ||
tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how does it work with termination (ctrl+c) of the app? we usually switch on it and the shutdown_rx |
||
} | ||
|
||
Ok(()) | ||
}); | ||
} | ||
|
||
start_metrics(metrics_state.registry, config.metrics_port).await; | ||
|
||
// --stop | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
use std::sync::Arc; | ||
|
||
use entities::enums::AssetType; | ||
use rocks_db::{ | ||
key_encoders::encode_u64x2_pubkey, storage_traits::AssetUpdateIndexStorage, Storage, | ||
}; | ||
|
||
use crate::error::IngesterError; | ||
|
||
pub fn clean_syncronized_idxs( | ||
primary_rocks_storage: Arc<Storage>, | ||
asset_type: AssetType, | ||
) -> Result<(), IngesterError> { | ||
let optional_last_synced_key = match asset_type { | ||
AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(), | ||
AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(), | ||
}; | ||
|
||
if let Ok(Some(last_synced_key)) = optional_last_synced_key { | ||
let last_synced_key = encode_u64x2_pubkey( | ||
last_synced_key.seq, | ||
last_synced_key.slot, | ||
last_synced_key.pubkey, | ||
); | ||
primary_rocks_storage.clean_syncronized_idxs(asset_type, last_synced_key)?; | ||
}; | ||
|
||
Ok(()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
pub mod fork_cleaner; | ||
pub mod indexer_cleaner; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,8 @@ mod tests { | |
ShadowInterestBearingConfig, ShadowTransferFee, ShadowTransferFeeConfig, UnixTimestamp, | ||
}; | ||
use blockbuster::programs::token_extensions::MintAccountExtensions; | ||
use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs; | ||
|
||
use std::str::FromStr; | ||
use std::{collections::HashMap, sync::Arc}; | ||
|
||
|
@@ -3714,4 +3716,91 @@ mod tests { | |
fungible_token_account1.to_string() | ||
); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_idx_cleaner() { | ||
let cnt = 100; | ||
let cli = Cli::default(); | ||
|
||
let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await; | ||
let _synchronizer = nft_ingester::index_syncronizer::Synchronizer::new( | ||
env.rocks_env.storage.clone(), | ||
env.pg_env.client.clone(), | ||
env.pg_env.client.clone(), | ||
200_000, | ||
"".to_string(), | ||
Arc::new(SynchronizerMetricsConfig::new()), | ||
1, | ||
false, | ||
); | ||
let non_fungible_token_mint = generated_assets.pubkeys[1]; | ||
let mint = Mint { | ||
pubkey: non_fungible_token_mint, | ||
supply: 100000, | ||
decimals: 0, | ||
mint_authority: None, | ||
freeze_authority: None, | ||
token_program: Default::default(), | ||
slot_updated: 10, | ||
write_version: 10, | ||
extensions: None, | ||
}; | ||
|
||
let owner: Pubkey = generated_assets.owners[1].owner.value.unwrap(); | ||
let token_account_addr = Pubkey::new_unique(); | ||
let token_account = TokenAccount { | ||
pubkey: token_account_addr, | ||
mint: non_fungible_token_mint, | ||
delegate: None, | ||
owner, | ||
extensions: None, | ||
frozen: false, | ||
delegated_amount: 0, | ||
slot_updated: 10, | ||
amount: 100, | ||
write_version: 10, | ||
}; | ||
let mut batch_storage = BatchSaveStorage::new( | ||
env.rocks_env.storage.clone(), | ||
10, | ||
Arc::new(IngesterMetricsConfig::new()), | ||
); | ||
let token_accounts_processor = | ||
TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new())); | ||
token_accounts_processor | ||
.transform_and_save_token_account( | ||
&mut batch_storage, | ||
token_account_addr, | ||
&token_account, | ||
) | ||
.unwrap(); | ||
token_accounts_processor | ||
.transform_and_save_mint_account(&mut batch_storage, &mint) | ||
.unwrap(); | ||
batch_storage.flush().unwrap(); | ||
|
||
// one more idx shoul've been added | ||
let idx_fungible_asset_iter = env | ||
.rocks_env | ||
.storage | ||
.fungible_assets_update_idx | ||
.iter_start(); | ||
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); | ||
assert_eq!(idx_fungible_asset_iter.count(), 1); | ||
assert_eq!(idx_non_fungible_asset_iter.count(), cnt + 2); | ||
|
||
for asset_type in ASSET_TYPES { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: wdyt about wrapping part of this stuff with function? then we should not copy past it from |
||
clean_syncronized_idxs(env.rocks_env.storage.clone(), asset_type).unwrap(); | ||
} | ||
|
||
// after sync idxs should be cleaned again | ||
let idx_fungible_asset_iter = env | ||
.rocks_env | ||
.storage | ||
.fungible_assets_update_idx | ||
.iter_start(); | ||
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); | ||
assert_eq!(idx_fungible_asset_iter.count(), 1); | ||
assert_eq!(idx_non_fungible_asset_iter.count(), 1); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,14 +10,14 @@ use crate::errors::StorageError; | |
use crate::key_encoders::encode_u64x2_pubkey; | ||
use crate::{Result, Storage}; | ||
use entities::api_req_params::Options; | ||
use entities::enums::TokenMetadataEdition; | ||
use entities::enums::{AssetType, TokenMetadataEdition}; | ||
use entities::models::{EditionData, PubkeyWithSlot}; | ||
use futures_util::FutureExt; | ||
use std::collections::HashMap; | ||
|
||
impl Storage { | ||
fn get_next_fungible_asset_update_seq(&self) -> Result<u64> { | ||
if self.fungible_assets_update_last_seq.load(Ordering::SeqCst) == 0 { | ||
if self.fungible_assets_update_last_seq.load(Ordering::Relaxed) == 0 { | ||
// If fungible_assets_update_next_seq is zero, fetch the last key from fungible_assets_update_idx | ||
let mut iter = self.fungible_assets_update_idx.iter_end(); // Assuming iter_end method fetches the last item | ||
|
||
|
@@ -27,13 +27,13 @@ impl Storage { | |
|
||
let seq = u64::from_be_bytes(last_key[..std::mem::size_of::<u64>()].try_into()?); | ||
self.fungible_assets_update_last_seq | ||
.store(seq, Ordering::SeqCst); | ||
.store(seq, Ordering::Relaxed); | ||
} | ||
} | ||
// Increment and return the sequence number | ||
let seq = self | ||
.fungible_assets_update_last_seq | ||
.fetch_add(1, Ordering::SeqCst) | ||
.fetch_add(1, Ordering::Relaxed) | ||
+ 1; | ||
Ok(seq) | ||
} | ||
|
@@ -113,6 +113,22 @@ impl Storage { | |
); | ||
Ok(()) | ||
} | ||
|
||
pub fn clean_syncronized_idxs( | ||
&self, | ||
asset_type: AssetType, | ||
last_synced_key: Vec<u8>, | ||
) -> Result<()> { | ||
let cf = match asset_type { | ||
AssetType::Fungible => self.fungible_assets_update_idx.handle(), | ||
AssetType::NonFungible => self.assets_update_idx.handle(), | ||
}; | ||
|
||
let from = vec![]; | ||
self.db.delete_range_cf(&cf, from, last_synced_key)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe add some additional metrics to track the execution time of this operation? |
||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
#[macro_export] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make if less often, like once in 15 mins