Skip to content

Commit

Permalink
simplified interfaces and their number for assets idx cleaning up
Browse files Browse the repository at this point in the history
&& called cleaning up from the ingester instead of the synchronizer
  • Loading branch information
kstepanovdev committed Nov 26, 2024
1 parent 49f0096 commit 3e3f044
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 108 deletions.
34 changes: 31 additions & 3 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use entities::enums::{AssetType, ASSET_TYPES};
use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister};
use nft_ingester::scheduler::Scheduler;
use postgre_client::PG_MIGRATIONS_PATH;
use rocks_db::key_encoders::encode_u64x2_pubkey;
use std::panic;
use std::path::PathBuf;
use std::str::FromStr;
Expand Down Expand Up @@ -59,7 +60,7 @@ use nft_ingester::transaction_ingester::BackfillTransactionIngester;
use nft_ingester::{config::init_logger, error::IngesterError};
use rocks_db::backup_service;
use rocks_db::backup_service::BackupService;
use rocks_db::storage_traits::AssetSlotStorage;
use rocks_db::storage_traits::{AssetSlotStorage, AssetUpdateIndexStorage};
use tonic::transport::Server;
use usecase::asset_streamer::AssetStreamer;
use usecase::proofs::MaybeProofChecker;
Expand All @@ -75,6 +76,7 @@ pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb";
pub const ARWEAVE_WALLET_PATH: &str = "./arweave_wallet.json";
pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 100;
pub const DEFAULT_MAX_POSTGRES_CONNECTIONS: u32 = 100;
pub const SECONDS_TO_RETRY_IDXS_CLEANUP: u64 = 15;

#[derive(Parser, Debug)]
struct Args {
Expand Down Expand Up @@ -605,8 +607,6 @@ pub async fn main() -> Result<(), IngesterError> {
}

if !config.disable_synchronizer {
let synchronizer = Arc::new(synchronizer);

for asset_type in ASSET_TYPES {
let rx = shutdown_rx.resubscribe();
let synchronizer = synchronizer.clone();
Expand Down Expand Up @@ -800,6 +800,34 @@ pub async fn main() -> Result<(), IngesterError> {
batch_mint_persister.persist_batch_mints(rx).await
});

// clean indexes
for asset_type in ASSET_TYPES {
let primary_rocks_storage = primary_rocks_storage.clone();
let mut rx = shutdown_rx.resubscribe();
mutexed_tasks.lock().await.spawn(async move {
loop {
if rx.try_recv().is_ok() {
break;
}
let optional_last_synced_key = match asset_type {
AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(),
AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(),
};

if let Ok(Some(last_synced_key)) = optional_last_synced_key {
let last_synced_key =
encode_u64x2_pubkey(last_synced_key.seq, last_synced_key.slot, last_synced_key.pubkey);
primary_rocks_storage
.clean_syncronized_idxs(asset_type, last_synced_key)
.unwrap();
};
tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await;
}

Ok(())
});
}

start_metrics(metrics_state.registry, config.metrics_port).await;

// --stop
Expand Down
35 changes: 6 additions & 29 deletions nft_ingester/src/index_syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,20 +181,14 @@ where
SyncStatus::FullSyncRequired(state) => {
tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq);
self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await?;
.await
}
SyncStatus::RegularSyncRequired(state) => {
self.regular_nft_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await?;
.await
}
SyncStatus::NoSyncRequired => {}
}

if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? {
self.clean_syncronized_idxs(asset_type, encoded_key)?;
SyncStatus::NoSyncRequired => Ok(()),
}

Ok(())
}

pub async fn synchronize_fungible_asset_indexes(
Expand All @@ -212,31 +206,14 @@ where
SyncStatus::FullSyncRequired(state) => {
tracing::info!("Should run dump synchronizer as the difference between last indexed and last known sequence is greater than the threshold. Last indexed: {:?}, Last known: {}", state.last_indexed_key.clone().map(|k|k.seq), state.last_known_key.seq);
self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await?;
.await
}
SyncStatus::RegularSyncRequired(state) => {
self.regular_fungible_syncronize(rx, state.last_indexed_key, state.last_known_key)
.await?;
.await
}
SyncStatus::NoSyncRequired => {}
SyncStatus::NoSyncRequired => Ok(()),
}

if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? {
self.clean_syncronized_idxs(asset_type, encoded_key)?;
}

Ok(())
}

pub fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<(), IngesterError> {
self.primary_storage
.clean_syncronized_idxs(asset_type, last_synced_key)?;

Ok(())
}

pub async fn full_syncronize(
Expand Down
67 changes: 30 additions & 37 deletions nft_ingester/tests/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ mod tests {
ShadowInterestBearingConfig, ShadowTransferFee, ShadowTransferFeeConfig, UnixTimestamp,
};
use blockbuster::programs::token_extensions::MintAccountExtensions;
use rocks_db::key_encoders::encode_u64x2_pubkey;
use rocks_db::storage_traits::AssetUpdateIndexStorage;
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc};

Expand Down Expand Up @@ -3721,7 +3723,7 @@ mod tests {
let cli = Cli::default();

let (env, generated_assets) = setup::TestEnvironment::create(&cli, cnt, 100).await;
let synchronizer = nft_ingester::index_syncronizer::Synchronizer::new(
let _synchronizer = nft_ingester::index_syncronizer::Synchronizer::new(
env.rocks_env.storage.clone(),
env.pg_env.client.clone(),
env.pg_env.client.clone(),
Expand Down Expand Up @@ -3778,54 +3780,45 @@ mod tests {
batch_storage.flush().unwrap();

// one more idx shoul've been added
let mut number_of_fungible_idxs = 0;
let mut number_of_non_fungible_idxs = 0;
let mut idx_fungible_asset_iter = env
let idx_fungible_asset_iter = env
.rocks_env
.storage
.fungible_assets_update_idx
.iter_start();
let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();

while let Some(_) = idx_fungible_asset_iter.next() {
number_of_fungible_idxs += 1;
}
assert_eq!(number_of_fungible_idxs, 1);
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();
assert_eq!(idx_fungible_asset_iter.count(), 1);
assert_eq!(idx_non_fungible_asset_iter.count(), cnt + 2);

while let Some(_) = idx_non_fungible_asset_iter.next() {
number_of_non_fungible_idxs += 1;
}
assert_eq!(number_of_non_fungible_idxs, 3);
for asset_type in ASSET_TYPES {
let optional_last_synced_key = match asset_type {
AssetType::NonFungible => env.rocks_env.storage.last_known_nft_asset_updated_key(),
AssetType::Fungible => env
.rocks_env
.storage
.last_known_fungible_asset_updated_key(),
};

let (_, rx) = tokio::sync::broadcast::channel::<()>(1);
let last_synced_key = optional_last_synced_key.unwrap().unwrap();
let last_synced_key = encode_u64x2_pubkey(
last_synced_key.seq,
last_synced_key.slot,
last_synced_key.pubkey,
);

synchronizer
.synchronize_nft_asset_indexes(&rx, 0)
.await
.unwrap();
synchronizer
.synchronize_fungible_asset_indexes(&rx, 0)
.await
.unwrap();
env.rocks_env
.storage
.clean_syncronized_idxs(asset_type, last_synced_key)
.unwrap();
}

// after sync idxs should be cleaned again
let mut number_of_fungible_idxs = 0;
let mut number_of_non_fungible_idxs = 0;
let mut idx_fungible_asset_iter = env
let idx_fungible_asset_iter = env
.rocks_env
.storage
.fungible_assets_update_idx
.iter_start();
let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();

while let Some(_) = idx_fungible_asset_iter.next() {
number_of_fungible_idxs += 1;
}
assert_eq!(number_of_fungible_idxs, 1);

while let Some(_) = idx_non_fungible_asset_iter.next() {
number_of_non_fungible_idxs += 1;
}
assert_eq!(number_of_non_fungible_idxs, 1);
let idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start();
assert_eq!(idx_fungible_asset_iter.count(), 1);
assert_eq!(idx_non_fungible_asset_iter.count(), 1);
}
}
6 changes: 2 additions & 4 deletions rocks-db/src/asset_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Storage {
Ok(())
}

pub fn clean_syncronized_idxs_with_batch(
pub fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
Expand All @@ -124,10 +124,8 @@ impl Storage {
AssetType::NonFungible => self.assets_update_idx.handle(),
};

let mut batch = rocksdb::WriteBatchWithTransaction::<false>::default();
let from = vec![];
batch.delete_range_cf(&cf, from, last_synced_key);
self.db.write(batch)?;
self.db.delete_range_cf(&cf, from, last_synced_key)?;

Ok(())
}
Expand Down
10 changes: 1 addition & 9 deletions rocks-db/src/batch_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::{HashMap, HashSet};

use async_trait::async_trait;
use entities::enums::{AssetType, SpecificationVersions, TokenMetadataEdition};
use entities::enums::{SpecificationVersions, TokenMetadataEdition};
use serde_json::json;
use solana_sdk::pubkey::Pubkey;

Expand Down Expand Up @@ -183,14 +183,6 @@ impl AssetUpdateIndexStorage for Storage {
);
Ok((unique_pubkeys, last_key))
}

fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
self.clean_syncronized_idxs_with_batch(asset_type, last_synced_key)
}
}

#[async_trait]
Expand Down
11 changes: 1 addition & 10 deletions rocks-db/src/batch_savers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::asset::{AssetCollection, MetadataMintMap};
use crate::token_accounts::{TokenAccountMintOwnerIdx, TokenAccountOwnerIdx};
use crate::Result;
use crate::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage};
use entities::enums::{AssetType, TokenMetadataEdition};
use entities::enums::TokenMetadataEdition;
use entities::models::{
InscriptionDataInfo, InscriptionInfo, Mint, TokenAccount, TokenAccountMintOwnerIdxKey,
TokenAccountOwnerIdxKey,
Expand Down Expand Up @@ -270,13 +270,4 @@ impl BatchSaveStorage {
}
Ok(())
}

pub fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
self.storage
.clean_syncronized_idxs_with_batch(asset_type, last_synced_key)
}
}
17 changes: 1 addition & 16 deletions rocks-db/src/storage_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use solana_sdk::pubkey::Pubkey;

pub use crate::Result;
use crate::Storage;
use entities::{
enums::AssetType,
models::{AssetIndex, FungibleAssetIndex},
};
use entities::models::{AssetIndex, FungibleAssetIndex};

#[derive(Clone, Debug, PartialEq)]
pub struct AssetUpdatedKey {
Expand Down Expand Up @@ -45,9 +42,6 @@ pub trait AssetUpdateIndexStorage {
limit: usize,
skip_keys: Option<HashSet<Pubkey>>,
) -> Result<(HashSet<Pubkey>, Option<AssetUpdatedKey>)>;

fn clean_syncronized_idxs(&self, asset_type: AssetType, last_synced_key: Vec<u8>)
-> Result<()>;
}

#[automock]
Expand Down Expand Up @@ -136,15 +130,6 @@ impl AssetUpdateIndexStorage for MockAssetIndexStorage {
self.mock_update_index_storage
.fetch_fungible_asset_updated_keys(from, up_to, limit, skip_keys)
}

fn clean_syncronized_idxs(
&self,
asset_type: AssetType,
last_synced_key: Vec<u8>,
) -> Result<()> {
self.mock_update_index_storage
.clean_syncronized_idxs(asset_type, last_synced_key)
}
}

#[async_trait]
Expand Down

0 comments on commit 3e3f044

Please sign in to comment.