Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mtg-778] Clean asset slot update idx #325

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use arweave_rs::consts::ARWEAVE_BASE_URL;
use arweave_rs::Arweave;
use entities::enums::{AssetType, ASSET_TYPES};
use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister};
use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs;
use nft_ingester::scheduler::Scheduler;
use postgre_client::PG_MIGRATIONS_PATH;
use std::panic;
Expand Down Expand Up @@ -40,10 +41,10 @@ use nft_ingester::backfiller::{
};
use nft_ingester::batch_mint::batch_mint_processor::{process_batch_mints, BatchMintProcessor, NoopBatchMintTxSender};
use nft_ingester::buffer::{debug_buffer, Buffer};
use nft_ingester::cleaners::fork_cleaner::{run_fork_cleaner, ForkCleaner};
use nft_ingester::config::{
setup_config, ApiConfig, BackfillerConfig, BackfillerMode, IngesterConfig, MessageSource, INGESTER_CONFIG_PREFIX,
};
use nft_ingester::fork_cleaner::{run_fork_cleaner, ForkCleaner};
use nft_ingester::gapfiller::{process_asset_details_stream_wrapper, run_sequence_consistent_gapfiller};
use nft_ingester::index_syncronizer::Synchronizer;
use nft_ingester::init::{graceful_stop, init_index_storage_with_migration, init_primary_storage};
Expand Down Expand Up @@ -75,6 +76,7 @@ pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb";
pub const ARWEAVE_WALLET_PATH: &str = "./arweave_wallet.json";
pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 100;
pub const DEFAULT_MAX_POSTGRES_CONNECTIONS: u32 = 100;
pub const SECONDS_TO_RETRY_IDXS_CLEANUP: u64 = 15 * 60; // 15 minutes

#[derive(Parser, Debug)]
struct Args {
Expand Down Expand Up @@ -605,8 +607,6 @@ pub async fn main() -> Result<(), IngesterError> {
}

if !config.disable_synchronizer {
let synchronizer = Arc::new(synchronizer);

for asset_type in ASSET_TYPES {
let rx = shutdown_rx.resubscribe();
let synchronizer = synchronizer.clone();
Expand Down Expand Up @@ -800,6 +800,32 @@ pub async fn main() -> Result<(), IngesterError> {
batch_mint_persister.persist_batch_mints(rx).await
});

// clean indexes
for asset_type in ASSET_TYPES {
let primary_rocks_storage = primary_rocks_storage.clone();
let mut rx = shutdown_rx.resubscribe();
mutexed_tasks.lock().await.spawn(async move {
tokio::select! {
_ = rx.recv() => {}
_ = async move {
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio::select should be inside the loop, as we'll not exit from inside this loop on termination signal, will we?

match clean_syncronized_idxs(primary_rocks_storage.clone(), asset_type) {
Ok(_) => {
info!("Cleaned synchronized indexes for {:?}", asset_type);
}
Err(e) => {
error!("Failed to clean synchronized indexes for {:?} with error {}", asset_type, e);
}
}
tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await;
}
} => {}
}

Ok(())
});
}

start_metrics(metrics_state.registry, config.metrics_port).await;

// --stop
Expand Down
29 changes: 29 additions & 0 deletions nft_ingester/src/cleaners/indexer_cleaner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::sync::Arc;

use entities::enums::AssetType;
use rocks_db::{
key_encoders::encode_u64x2_pubkey, storage_traits::AssetUpdateIndexStorage, Storage,
};

use crate::error::IngesterError;

pub fn clean_syncronized_idxs(
primary_rocks_storage: Arc<Storage>,
asset_type: AssetType,
) -> Result<(), IngesterError> {
let optional_last_synced_key = match asset_type {
AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(),
AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(),
};

if let Ok(Some(last_synced_key)) = optional_last_synced_key {
let last_synced_key = encode_u64x2_pubkey(
last_synced_key.seq,
last_synced_key.slot,
last_synced_key.pubkey,
);
primary_rocks_storage.clean_syncronized_idxs(asset_type, last_synced_key)?;
};

Ok(())
}
2 changes: 2 additions & 0 deletions nft_ingester/src/cleaners/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod fork_cleaner;
pub mod indexer_cleaner;
8 changes: 6 additions & 2 deletions nft_ingester/src/index_syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,10 @@ where
rx: &tokio::sync::broadcast::Receiver<()>,
run_full_sync_threshold: i64,
) -> Result<(), IngesterError> {
let asset_type = AssetType::NonFungible;

let state = self
.get_sync_state(run_full_sync_threshold, AssetType::NonFungible)
.get_sync_state(run_full_sync_threshold, asset_type)
.await?;
match state {
SyncStatus::FullSyncRequired(state) => {
Expand All @@ -194,8 +196,10 @@ where
rx: &tokio::sync::broadcast::Receiver<()>,
run_full_sync_threshold: i64,
) -> Result<(), IngesterError> {
let asset_type = AssetType::Fungible;

let state = self
.get_sync_state(run_full_sync_threshold, AssetType::Fungible)
.get_sync_state(run_full_sync_threshold, asset_type)
.await?;

match state {
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ pub mod api;
pub mod backfiller;
pub mod batch_mint;
pub mod buffer;
pub mod cleaners;
pub mod config;
pub mod error;
pub mod flatbuffer_mapper;
pub mod fork_cleaner;
pub mod gapfiller;
pub mod index_syncronizer;
pub mod init;
Expand Down
89 changes: 89 additions & 0 deletions nft_ingester/tests/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod tests {
ShadowInterestBearingConfig, ShadowTransferFee, ShadowTransferFeeConfig, UnixTimestamp,
};
use blockbuster::programs::token_extensions::MintAccountExtensions;
use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs;

use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -3714,4 +3716,91 @@ mod tests {
fungible_token_account1.to_string()
);
}

#[tokio::test(flavor = "multi_thread")]
async fn test_idx_cleaner() {
let cnt = 100;
let cli = Cli::default();

let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await;
let _synchronizer = nft_ingester::index_syncronizer::Synchronizer::new(
env.rocks_env.storage.clone(),
env.pg_env.client.clone(),
env.pg_env.client.clone(),
200_000,
"".to_string(),
Arc::new(SynchronizerMetricsConfig::new()),
1,
false,
);
let non_fungible_token_mint = generated_assets.pubkeys[1];
let mint = Mint {
pubkey: non_fungible_token_mint,
supply: 100000,
decimals: 0,
mint_authority: None,
freeze_authority: None,
token_program: Default::default(),
slot_updated: 10,
write_version: 10,
extensions: None,
};

let owner: Pubkey = generated_assets.owners[1].owner.value.unwrap();
let token_account_addr = Pubkey::new_unique();
let token_account = TokenAccount {
pubkey: token_account_addr,
mint: non_fungible_token_mint,
delegate: None,
owner,
extensions: None,
frozen: false,
delegated_amount: 0,
slot_updated: 10,
amount: 100,
write_version: 10,
};
let mut batch_storage = BatchSaveStorage::new(
env.rocks_env.storage.clone(),
10,
Arc::new(IngesterMetricsConfig::new()),
);
let token_accounts_processor =
TokenAccountsProcessor::new(Arc::new(IngesterMetricsConfig::new()));
token_accounts_processor
.transform_and_save_token_account(
&mut batch_storage,
token_account_addr,
&token_account,
)
.unwrap();
token_accounts_processor
.transform_and_save_mint_account(&mut batch_storage, &mint)
.unwrap();
batch_storage.flush().unwrap();

// one more idx shoul've been added
let idx_fungible_asset_iter = env
.rocks_env
.storage
.fungible_assets_update_idx
.iter_start();
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();
assert_eq!(idx_fungible_asset_iter.count(), 1);
assert_eq!(idx_non_fungible_asset_iter.count(), cnt + 2);

for asset_type in ASSET_TYPES {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: wdyt about wrapping part of this stuff with function? then we should not copy past it from ingester/main.rs

clean_syncronized_idxs(env.rocks_env.storage.clone(), asset_type).unwrap();
}

// after sync idxs should be cleaned again
let idx_fungible_asset_iter = env
.rocks_env
.storage
.fungible_assets_update_idx
.iter_start();
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();
assert_eq!(idx_fungible_asset_iter.count(), 1);
assert_eq!(idx_non_fungible_asset_iter.count(), 1);
}
}
2 changes: 1 addition & 1 deletion nft_ingester/tests/clean_forks_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use metrics_utils::utils::start_metrics;
use metrics_utils::{MetricState, MetricsTrait};
use mpl_bubblegum::types::{BubblegumEventType, LeafSchema, Version};
use mpl_bubblegum::{InstructionName, LeafSchemaEvent};
use nft_ingester::fork_cleaner::ForkCleaner;
use nft_ingester::cleaners::fork_cleaner::ForkCleaner;
use nft_ingester::processors::transaction_based::bubblegum_updates_processor::BubblegumTxProcessor;
use rocks_db::cl_items::ClItem;
use rocks_db::column::TypedColumn;
Expand Down
24 changes: 20 additions & 4 deletions rocks-db/src/asset_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use crate::errors::StorageError;
use crate::key_encoders::encode_u64x2_pubkey;
use crate::{Result, Storage};
use entities::api_req_params::Options;
use entities::enums::TokenMetadataEdition;
use entities::enums::{AssetType, TokenMetadataEdition};
use entities::models::{EditionData, PubkeyWithSlot};
use futures_util::FutureExt;
use std::collections::HashMap;

impl Storage {
fn get_next_fungible_asset_update_seq(&self) -> Result<u64> {
if self.fungible_assets_update_last_seq.load(Ordering::SeqCst) == 0 {
if self.fungible_assets_update_last_seq.load(Ordering::Relaxed) == 0 {
// If fungible_assets_update_next_seq is zero, fetch the last key from fungible_assets_update_idx
let mut iter = self.fungible_assets_update_idx.iter_end(); // Assuming iter_end method fetches the last item

Expand All @@ -27,13 +27,13 @@ impl Storage {

let seq = u64::from_be_bytes(last_key[..std::mem::size_of::<u64>()].try_into()?);
self.fungible_assets_update_last_seq
.store(seq, Ordering::SeqCst);
.store(seq, Ordering::Relaxed);
}
}
// Increment and return the sequence number
let seq = self
.fungible_assets_update_last_seq
.fetch_add(1, Ordering::SeqCst)
.fetch_add(1, Ordering::Relaxed)
+ 1;
Ok(seq)
}
Expand Down Expand Up @@ -113,6 +113,22 @@ impl Storage {
);
Ok(())
}

pub fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
let cf = match asset_type {
AssetType::Fungible => self.fungible_assets_update_idx.handle(),
AssetType::NonFungible => self.assets_update_idx.handle(),
};

let from = vec![];
self.db.delete_range_cf(&cf, from, last_synced_key)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add some additional metrics to track the execution time of this operation?


Ok(())
}
}

#[macro_export]
Expand Down
2 changes: 1 addition & 1 deletion rocks-db/src/batch_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ impl AssetIndexReader for Storage {
let fungible_asset_index = FungibleAssetIndex {
pubkey: token_acc.pubkey,
owner: Some(token_acc.owner),
slot_updated: token_acc.slot_updated,
fungible_asset_mint: Some(token_acc.mint),
fungible_asset_balance: Some(token_acc.amount as u64),
slot_updated: token_acc.slot_updated,
};

fungible_assets_indexes.insert(token_acc.pubkey, fungible_asset_index);
Expand Down
Loading