Skip to content

Commit

Permalink
Fixed overwhelm_seq_gap
Browse files Browse the repository at this point in the history
&& updated tests
&& refacrored ASSET_TYPE constant
  • Loading branch information
kstepanovdev committed Nov 19, 2024
1 parent db7d5e9 commit dcf47db
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 51 deletions.
2 changes: 2 additions & 0 deletions entities/src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,5 @@ pub enum AssetType {
NonFungible = 1,
Fungible = 2,
}

pub const ASSET_TYPES: [AssetType; 2] = [AssetType::Fungible, AssetType::NonFungible];
25 changes: 18 additions & 7 deletions nft_ingester/src/api/backfilling_state_consistency.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::api::synchronization_state_consistency::CATCH_UP_SEQUENCES_TIMEOUT_SEC;
use entities::enums::{AssetType, ASSET_TYPES};
use interface::consistency_check::ConsistencyChecker;
use jsonrpc_core::Call;
use rocks_db::Storage;
Expand All @@ -10,27 +11,35 @@ use tokio::task::{JoinError, JoinSet};
use tracing::info;

pub struct BackfillingStateConsistencyChecker {
overwhelm_backfill_gap: Arc<AtomicBool>,
overwhelm_fungible_backfill_gap: Arc<AtomicBool>,
overwhelm_non_backfill_gap: Arc<AtomicBool>,
}

impl BackfillingStateConsistencyChecker {
pub(crate) fn new() -> Self {
Self {
overwhelm_backfill_gap: Arc::new(AtomicBool::new(false)),
overwhelm_fungible_backfill_gap: Arc::new(AtomicBool::new(false)),
overwhelm_non_backfill_gap: Arc::new(AtomicBool::new(false)),
}
}

pub(crate) async fn run(
&self,
tasks: Arc<Mutex<JoinSet<Result<(), JoinError>>>>,
mut rx: tokio::sync::broadcast::Receiver<()>,
rx: tokio::sync::broadcast::Receiver<()>,
rocks_db: Arc<Storage>,
consistence_backfilling_slots_threshold: u64,
) {
let overwhelm_backfill_gap_clone = self.overwhelm_backfill_gap.clone();
tasks.lock().await.spawn(async move {
for asset_type in ASSET_TYPES {
let rocks_db = rocks_db.clone();
let mut rx = rx.resubscribe();
let overwhelm_backfill_gap = match asset_type {
AssetType::NonFungible => self.overwhelm_non_backfill_gap.clone(),
AssetType::Fungible => self.overwhelm_fungible_backfill_gap.clone(),
};
tasks.lock().await.spawn(async move {
while rx.is_empty() {
overwhelm_backfill_gap_clone.store(
overwhelm_backfill_gap.store(
rocks_db.bubblegum_slots.iter_start().count().saturating_add(rocks_db.ingestable_slots.iter_start().count())
>= consistence_backfilling_slots_threshold as usize,
Ordering::Relaxed,
Expand All @@ -45,11 +54,13 @@ impl BackfillingStateConsistencyChecker {
}
Ok(())
});
}
}
}

impl ConsistencyChecker for BackfillingStateConsistencyChecker {
fn should_cancel_request(&self, _call: &Call) -> bool {
self.overwhelm_backfill_gap.load(Ordering::Relaxed)
self.overwhelm_non_backfill_gap.load(Ordering::Relaxed)
&& self.overwhelm_fungible_backfill_gap.load(Ordering::Relaxed)
}
}
21 changes: 13 additions & 8 deletions nft_ingester/src/api/synchronization_state_consistency.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use entities::enums::AssetType;
use entities::enums::{AssetType, ASSET_TYPES};
use interface::consistency_check::ConsistencyChecker;
use jsonrpc_core::Call;
use postgre_client::storage_traits::AssetIndexStorage;
Expand Down Expand Up @@ -30,13 +30,15 @@ const INDEX_STORAGE_DEPENDS_METHODS: &[&str] = &[
];

pub struct SynchronizationStateConsistencyChecker {
overwhelm_seq_gap: Arc<AtomicBool>,
overwhelm_non_fungible_seq_gap: Arc<AtomicBool>,
overwhelm_fungible_seq_gap: Arc<AtomicBool>,
}

impl SynchronizationStateConsistencyChecker {
pub(crate) fn new() -> Self {
Self {
overwhelm_seq_gap: Arc::new(AtomicBool::new(false)),
overwhelm_non_fungible_seq_gap: Arc::new(AtomicBool::new(false)),
overwhelm_fungible_seq_gap: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -48,10 +50,11 @@ impl SynchronizationStateConsistencyChecker {
rocks_db: Arc<Storage>,
synchronization_api_threshold: u64,
) {
let asset_types = [AssetType::Fungible, AssetType::NonFungible];

for asset_type in asset_types {
let overwhelm_seq_gap = self.overwhelm_seq_gap.clone();
for asset_type in ASSET_TYPES {
let overwhelm_seq_gap = match asset_type {
AssetType::NonFungible => self.overwhelm_non_fungible_seq_gap.clone(),
AssetType::Fungible => self.overwhelm_fungible_seq_gap.clone(),
};
let pg_client = pg_client.clone();
let rocks_db = rocks_db.clone();
let mut rx = rx.resubscribe();
Expand Down Expand Up @@ -95,7 +98,9 @@ impl SynchronizationStateConsistencyChecker {

impl ConsistencyChecker for SynchronizationStateConsistencyChecker {
fn should_cancel_request(&self, call: &Call) -> bool {
if !self.overwhelm_seq_gap.load(Ordering::Relaxed) {
if !&self.overwhelm_non_fungible_seq_gap.load(Ordering::Relaxed)
|| !&self.overwhelm_fungible_seq_gap.load(Ordering::Relaxed)
{
return false;
}

Expand Down
6 changes: 3 additions & 3 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arweave_rs::consts::ARWEAVE_BASE_URL;
use arweave_rs::Arweave;
use entities::enums::AssetType;
use entities::enums::{AssetType, ASSET_TYPES};
use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister};
use nft_ingester::scheduler::Scheduler;
use postgre_client::PG_MIGRATIONS_PATH;
Expand Down Expand Up @@ -136,7 +136,7 @@ pub async fn main() -> Result<(), IngesterError> {

if config.run_dump_synchronize_on_start {
info!("Running dump synchronizer on start!");
for asset_type in [AssetType::Fungible, AssetType::NonFungible].into_iter() {
for asset_type in ASSET_TYPES {
let synchronizer = synchronizer.clone();
let shutdown_rx = shutdown_rx.resubscribe();

Expand Down Expand Up @@ -607,7 +607,7 @@ pub async fn main() -> Result<(), IngesterError> {
if !config.disable_synchronizer {
let synchronizer = Arc::new(synchronizer);

for asset_type in [AssetType::Fungible, AssetType::NonFungible].into_iter() {
for asset_type in ASSET_TYPES {
let rx = shutdown_rx.resubscribe();
let synchronizer = synchronizer.clone();
mutexed_tasks.lock().await.spawn(async move {
Expand Down
5 changes: 2 additions & 3 deletions nft_ingester/src/bin/synchronizer/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use entities::enums::AssetType;
use entities::enums::ASSET_TYPES;
use nft_ingester::config::{
init_logger, setup_config, SynchronizerConfig, SYNCHRONIZER_CONFIG_PREFIX,
};
Expand Down Expand Up @@ -126,8 +126,7 @@ pub async fn main() -> Result<(), IngesterError> {
tracing::error!("Sync rocksdb error: {}", e);
}

let asset_types = [AssetType::Fungible, AssetType::NonFungible];
for asset_type in asset_types.into_iter() {
for asset_type in ASSET_TYPES {
let synchronizer = synchronizer.clone();
let shutdown_rx = shutdown_rx.resubscribe();

Expand Down
29 changes: 13 additions & 16 deletions nft_ingester/src/index_syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,10 @@ where
#[cfg(test)]
mod tests {
use super::*;
use entities::models::{AssetIndex, UrlWithStatus};
use entities::{
enums::ASSET_TYPES,
models::{AssetIndex, UrlWithStatus},
};
use metrics_utils::{MetricState, MetricsTrait};
use mockall;
use postgre_client::storage_traits::{MockAssetIndexStorageMock, MockTempClientProviderMock};
Expand Down Expand Up @@ -611,7 +614,6 @@ mod tests {
let mut metrics_state = MetricState::new();
let temp_client_provider = MockTempClientProviderMock::new();
metrics_state.register_metrics();
let asset_types = [AssetType::Fungible, AssetType::NonFungible];

index_storage
.expect_fetch_last_synced_id()
Expand All @@ -637,7 +639,7 @@ mod tests {
let (_, rx) = tokio::sync::broadcast::channel::<()>(1);
let synchronizer = Arc::new(synchronizer);

for asset_type in asset_types {
for asset_type in ASSET_TYPES {
let synchronizer = synchronizer.clone();
let rx = rx.resubscribe();

Expand Down Expand Up @@ -669,8 +671,7 @@ mod tests {
let mut metrics_state = MetricState::new();
let temp_client_provider = MockTempClientProviderMock::new();
metrics_state.register_metrics();
let asset_types = [AssetType::Fungible, AssetType::NonFungible];
asset_types.iter().for_each(|_e| {
ASSET_TYPES.iter().for_each(|_e| {
index_storage
.expect_fetch_last_synced_id()
.once()
Expand Down Expand Up @@ -731,7 +732,7 @@ mod tests {
);
let (_, rx) = tokio::sync::broadcast::channel::<()>(1);
let synchronizer = Arc::new(synchronizer);
for asset_type in asset_types {
for asset_type in ASSET_TYPES {
let synchronizer = synchronizer.clone();
let rx = rx.resubscribe();

Expand Down Expand Up @@ -763,10 +764,9 @@ mod tests {
let mut metrics_state = MetricState::new();
let temp_client_provider = MockTempClientProviderMock::new();
metrics_state.register_metrics();
let asset_types = [AssetType::Fungible, AssetType::NonFungible];

// Index storage starts empty
asset_types.iter().for_each(|_| {
ASSET_TYPES.iter().for_each(|_| {
index_storage
.expect_fetch_last_synced_id()
.once()
Expand Down Expand Up @@ -837,7 +837,7 @@ mod tests {
); // Small batch size
let (_, rx) = tokio::sync::broadcast::channel::<()>(1);
let synchronizer = Arc::new(synchronizer);
for asset_type in asset_types {
for asset_type in ASSET_TYPES {
let synchronizer = synchronizer.clone();
let rx = rx.resubscribe();

Expand Down Expand Up @@ -874,9 +874,7 @@ mod tests {
let last_synced_binary_key =
encode_u64x2_pubkey(index_key.seq, index_key.slot, index_key.pubkey.clone());

let asset_types = [AssetType::Fungible, AssetType::NonFungible];

asset_types.iter().for_each(|_| {
ASSET_TYPES.iter().for_each(|_| {
let last_synced_binary_key = last_synced_binary_key.clone();
index_storage
.expect_fetch_last_synced_id()
Expand Down Expand Up @@ -991,7 +989,7 @@ mod tests {
);
let (_, rx) = tokio::sync::broadcast::channel::<()>(1);
let synchronizer = Arc::new(synchronizer);
for asset_type in [AssetType::Fungible, AssetType::NonFungible] {
for asset_type in ASSET_TYPES {
let synchronizer = synchronizer.clone();
let rx = rx.resubscribe();

Expand Down Expand Up @@ -1033,8 +1031,7 @@ mod tests {
.once()
.return_once(move || Ok(Some(index_key_clone)));

let asset_types = [AssetType::Fungible, AssetType::NonFungible];
asset_types.iter().for_each(|_| {
ASSET_TYPES.iter().for_each(|_| {
let index_key_clone = index_key.clone();
index_storage
.expect_fetch_last_synced_id()
Expand Down Expand Up @@ -1066,7 +1063,7 @@ mod tests {
);
let (_, rx) = tokio::sync::broadcast::channel::<()>(1);
let synchronizer = Arc::new(synchronizer);
for asset_type in asset_types {
for asset_type in ASSET_TYPES {
let synchronizer = synchronizer.clone();
let rx = rx.resubscribe();

Expand Down
6 changes: 3 additions & 3 deletions nft_ingester/tests/api_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod tests {
DisplayOptions, GetAssetProof, GetAssetSignatures, GetByMethodsOptions, GetCoreFees,
GetTokenAccounts, Options, SearchAssetsOptions,
};
use entities::enums::{AssetType, TokenType};
use entities::enums::{AssetType, TokenType, ASSET_TYPES};
use entities::models::{
AssetSignature, AssetSignatureKey, BurntMetadataSlot, MetadataInfo, Mint, OffChainData,
TokenAccount,
Expand Down Expand Up @@ -3144,7 +3144,7 @@ mod tests {
let synchronizer = Arc::new(synchronizer);
let mut tasks = JoinSet::new();

for asset_type in [AssetType::Fungible, AssetType::NonFungible] {
for asset_type in ASSET_TYPES {
let rx = rx.resubscribe();
let synchronizer = synchronizer.clone();
match asset_type {
Expand Down Expand Up @@ -3625,7 +3625,7 @@ mod tests {
let synchronizer = Arc::new(synchronizer);
let mut tasks = JoinSet::new();

for asset_type in [AssetType::Fungible, AssetType::NonFungible] {
for asset_type in ASSET_TYPES {
let rx = rx.resubscribe();
let synchronizer = synchronizer.clone();
match asset_type {
Expand Down
4 changes: 2 additions & 2 deletions nft_ingester/tests/dump_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
mod tests {
use std::sync::Arc;

use entities::enums::AssetType;
use entities::enums::{AssetType, ASSET_TYPES};
use entities::models::TokenAccount;
use entities::{api_req_params::GetByMethodsOptions, models::UrlWithStatus};
use metrics_utils::{IngesterMetricsConfig, SynchronizerMetricsConfig};
Expand Down Expand Up @@ -71,7 +71,7 @@ mod tests {
false,
));
let mut join_set = JoinSet::new();
for asset_type in [AssetType::Fungible, AssetType::NonFungible] {
for asset_type in ASSET_TYPES {
let syncronizer = syncronizer.clone();
let rx = rx.resubscribe();
join_set.spawn(async move { syncronizer.full_syncronize(&rx, asset_type).await });
Expand Down
7 changes: 3 additions & 4 deletions postgre-client/tests/asset_index_client_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod tests {
use setup::pg::*;

use entities::{
enums::AssetType,
enums::{AssetType, ASSET_TYPES},
models::{AssetIndex, Creator},
};
use postgre_client::storage_traits::AssetIndexStorage;
Expand All @@ -19,10 +19,9 @@ mod tests {
let asset_index_storage = &env.client;

// Verify initial fetch_last_synced_id returns None
let asset_types = [AssetType::Fungible, AssetType::NonFungible];
for asset_type in asset_types.iter() {
for asset_type in ASSET_TYPES {
assert!(asset_index_storage
.fetch_last_synced_id(*asset_type)
.fetch_last_synced_id(asset_type)
.await
.unwrap()
.is_none());
Expand Down
4 changes: 2 additions & 2 deletions tests/setup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub mod rocks;
use std::sync::Arc;

use crate::rocks::RocksTestEnvironmentSetup;
use entities::enums::AssetType;
use entities::enums::{AssetType, ASSET_TYPES};
use metrics_utils::MetricsTrait;
use rocks_db::asset::AssetCollection;
use rocks_db::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails};
Expand Down Expand Up @@ -83,7 +83,7 @@ impl<'a> TestEnvironment<'a> {
let synchronizer = Arc::new(syncronizer);

let mut tasks = JoinSet::new();
for asset_type in [AssetType::NonFungible, AssetType::Fungible] {
for asset_type in ASSET_TYPES {
let synchronizer = synchronizer.clone();
let rx = rx.resubscribe();
tasks.spawn(async move {
Expand Down
4 changes: 1 addition & 3 deletions tests/setup/src/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,7 @@ pub async fn setup_database<T: Image>(node: &Container<'_, T>) -> (Pool<Postgres
.unwrap();

// Verify initial fetch_last_synced_id returns None

let asset_types = [AssetType::Fungible, AssetType::NonFungible];
for asset_type in asset_types {
for asset_type in ASSET_TYPES {
assert!(asset_index_storage
.fetch_last_synced_id(asset_type)
.await
Expand Down

0 comments on commit dcf47db

Please sign in to comment.