Skip to content

Commit

Permalink
reorganize cleaners into a module
Browse files Browse the repository at this point in the history
  • Loading branch information
kstepanovdev committed Nov 26, 2024
1 parent 3e3f044 commit b35fa92
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 35 deletions.
26 changes: 12 additions & 14 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use arweave_rs::consts::ARWEAVE_BASE_URL;
use arweave_rs::Arweave;
use entities::enums::{AssetType, ASSET_TYPES};
use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister};
use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs;
use nft_ingester::scheduler::Scheduler;
use postgre_client::PG_MIGRATIONS_PATH;
use rocks_db::key_encoders::encode_u64x2_pubkey;
Expand Down Expand Up @@ -41,10 +42,10 @@ use nft_ingester::backfiller::{
};
use nft_ingester::batch_mint::batch_mint_processor::{process_batch_mints, BatchMintProcessor, NoopBatchMintTxSender};
use nft_ingester::buffer::{debug_buffer, Buffer};
use nft_ingester::cleaners::fork_cleaner::{run_fork_cleaner, ForkCleaner};
use nft_ingester::config::{
setup_config, ApiConfig, BackfillerConfig, BackfillerMode, IngesterConfig, MessageSource, INGESTER_CONFIG_PREFIX,
};
use nft_ingester::fork_cleaner::{run_fork_cleaner, ForkCleaner};
use nft_ingester::gapfiller::{process_asset_details_stream_wrapper, run_sequence_consistent_gapfiller};
use nft_ingester::index_syncronizer::Synchronizer;
use nft_ingester::init::{graceful_stop, init_index_storage_with_migration, init_primary_storage};
Expand All @@ -58,9 +59,9 @@ use nft_ingester::rocks_db::{perform_backup, receive_last_saved_slot, restore_ro
use nft_ingester::tcp_receiver::{connect_to_geyser, connect_to_snapshot_receiver, TcpReceiver};
use nft_ingester::transaction_ingester::BackfillTransactionIngester;
use nft_ingester::{config::init_logger, error::IngesterError};
use rocks_db::backup_service;
use rocks_db::backup_service::BackupService;
use rocks_db::storage_traits::{AssetSlotStorage, AssetUpdateIndexStorage};
use rocks_db::{backup_service, Storage};
use tonic::transport::Server;
use usecase::asset_streamer::AssetStreamer;
use usecase::proofs::MaybeProofChecker;
Expand Down Expand Up @@ -809,18 +810,15 @@ pub async fn main() -> Result<(), IngesterError> {
if rx.try_recv().is_ok() {
break;
}
let optional_last_synced_key = match asset_type {
AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(),
AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(),
};

if let Ok(Some(last_synced_key)) = optional_last_synced_key {
let last_synced_key =
encode_u64x2_pubkey(last_synced_key.seq, last_synced_key.slot, last_synced_key.pubkey);
primary_rocks_storage
.clean_syncronized_idxs(asset_type, last_synced_key)
.unwrap();
};

match clean_syncronized_idxs(primary_rocks_storage.clone(), asset_type) {
Ok(_) => {
info!("Cleaned synchronized indexes for {:?}", asset_type);
}
Err(e) => {
error!("Failed to clean synchronized indexes for {:?} with error {}", asset_type, e);
}
}
tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await;
}

Expand Down
File renamed without changes.
29 changes: 29 additions & 0 deletions nft_ingester/src/cleaners/indexer_cleaner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::sync::Arc;

use entities::enums::AssetType;
use rocks_db::{
key_encoders::encode_u64x2_pubkey, storage_traits::AssetUpdateIndexStorage, Storage,
};

use crate::error::IngesterError;

pub fn clean_syncronized_idxs(
primary_rocks_storage: Arc<Storage>,
asset_type: AssetType,
) -> Result<(), IngesterError> {
let optional_last_synced_key = match asset_type {
AssetType::NonFungible => primary_rocks_storage.last_known_nft_asset_updated_key(),
AssetType::Fungible => primary_rocks_storage.last_known_fungible_asset_updated_key(),
};

if let Ok(Some(last_synced_key)) = optional_last_synced_key {
let last_synced_key = encode_u64x2_pubkey(
last_synced_key.seq,
last_synced_key.slot,
last_synced_key.pubkey,
);
primary_rocks_storage.clean_syncronized_idxs(asset_type, last_synced_key)?;
};

Ok(())
}
2 changes: 2 additions & 0 deletions nft_ingester/src/cleaners/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod fork_cleaner;
pub mod indexer_cleaner;
2 changes: 1 addition & 1 deletion nft_ingester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ pub mod api;
pub mod backfiller;
pub mod batch_mint;
pub mod buffer;
pub mod cleaners;
pub mod config;
pub mod error;
pub mod flatbuffer_mapper;
pub mod fork_cleaner;
pub mod gapfiller;
pub mod index_syncronizer;
pub mod init;
Expand Down
21 changes: 2 additions & 19 deletions nft_ingester/tests/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod tests {
ShadowInterestBearingConfig, ShadowTransferFee, ShadowTransferFeeConfig, UnixTimestamp,
};
use blockbuster::programs::token_extensions::MintAccountExtensions;
use nft_ingester::cleaners::indexer_cleaner::clean_syncronized_idxs;
use rocks_db::key_encoders::encode_u64x2_pubkey;
use rocks_db::storage_traits::AssetUpdateIndexStorage;
use std::str::FromStr;
Expand Down Expand Up @@ -3790,25 +3791,7 @@ mod tests {
assert_eq!(idx_non_fungible_asset_iter.count(), cnt + 2);

for asset_type in ASSET_TYPES {
let optional_last_synced_key = match asset_type {
AssetType::NonFungible => env.rocks_env.storage.last_known_nft_asset_updated_key(),
AssetType::Fungible => env
.rocks_env
.storage
.last_known_fungible_asset_updated_key(),
};

let last_synced_key = optional_last_synced_key.unwrap().unwrap();
let last_synced_key = encode_u64x2_pubkey(
last_synced_key.seq,
last_synced_key.slot,
last_synced_key.pubkey,
);

env.rocks_env
.storage
.clean_syncronized_idxs(asset_type, last_synced_key)
.unwrap();
clean_syncronized_idxs(env.rocks_env.storage.clone(), asset_type).unwrap();
}

// after sync idxs should be cleaned again
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/tests/clean_forks_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use metrics_utils::utils::start_metrics;
use metrics_utils::{MetricState, MetricsTrait};
use mpl_bubblegum::types::{BubblegumEventType, LeafSchema, Version};
use mpl_bubblegum::{InstructionName, LeafSchemaEvent};
use nft_ingester::fork_cleaner::ForkCleaner;
use nft_ingester::cleaners::fork_cleaner::ForkCleaner;
use nft_ingester::processors::transaction_based::bubblegum_updates_processor::BubblegumTxProcessor;
use rocks_db::cl_items::ClItem;
use rocks_db::column::TypedColumn;
Expand Down

0 comments on commit b35fa92

Please sign in to comment.