-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[mtg-778] Clean asset slot update idx #325
Conversation
037fea8
to
2f8e7ee
Compare
rocks-db/src/asset_client.rs
Outdated
return Ok(()); | ||
} | ||
|
||
let mut batch = rocksdb::WriteBatchWithTransaction::<false>::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain the reason for using batch here? It seems like you perform a single operation on this batch, so does it make sense to use it at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed anymore, indeed. Previously it was multiple deletions via iterator
rocks-db/src/batch_savers.rs
Outdated
@@ -270,4 +270,13 @@ impl BatchSaveStorage { | |||
} | |||
Ok(()) | |||
} | |||
|
|||
pub fn clean_syncronized_idxs( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need such wrapper functions?
nft_ingester/tests/api_tests.rs
Outdated
let mut number_of_fungible_idxs = 0; | ||
let mut number_of_non_fungible_idxs = 0; | ||
let mut idx_fungible_asset_iter = env | ||
.rocks_env | ||
.storage | ||
.fungible_assets_update_idx | ||
.iter_start(); | ||
let mut idx_non_fungible_asset_iter = env.rocks_env.storage.assets_update_idx.iter_start(); | ||
|
||
while let Some(_) = idx_fungible_asset_iter.next() { | ||
number_of_fungible_idxs += 1; | ||
} | ||
assert_eq!(number_of_fungible_idxs, 1); | ||
|
||
while let Some(_) = idx_non_fungible_asset_iter.next() { | ||
number_of_non_fungible_idxs += 1; | ||
} | ||
assert_eq!(number_of_non_fungible_idxs, 3); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can use iter_start().count()
Great job, thank you! Left only a couple of minor comments |
SyncStatus::NoSyncRequired => {} | ||
} | ||
|
||
if let Some(encoded_key) = self.index_storage.fetch_last_synced_id(asset_type).await? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always expert fetch_last_synced_id() return here Some(_) because if we receive None it will mean that some jobs are incomplete or we have some bug in code. So, maybe use ok_or()? Or at least add an error log if fetch_last_synced_id returned None
306dcd9
to
7bfba09
Compare
&& called cleaning up from the ingester instead of the synchronizer
7bfba09
to
3e3f044
Compare
assert_eq!(idx_fungible_asset_iter.count(), 1); | ||
assert_eq!(idx_non_fungible_asset_iter.count(), cnt + 2); | ||
|
||
for asset_type in ASSET_TYPES { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: wdyt about wrapping part of this stuff with function? then we should not copy past it from ingester/main.rs
b35fa92
to
42f5add
Compare
42f5add
to
eadeb0d
Compare
@@ -75,6 +76,7 @@ pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb"; | |||
pub const ARWEAVE_WALLET_PATH: &str = "./arweave_wallet.json"; | |||
pub const DEFAULT_MIN_POSTGRES_CONNECTIONS: u32 = 100; | |||
pub const DEFAULT_MAX_POSTGRES_CONNECTIONS: u32 = 100; | |||
pub const SECONDS_TO_RETRY_IDXS_CLEANUP: u64 = 15; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make if less often, like once in 15 mins
error!("Failed to clean synchronized indexes for {:?} with error {}", asset_type, e); | ||
} | ||
} | ||
tokio::time::sleep(Duration::from_secs(SECONDS_TO_RETRY_IDXS_CLEANUP)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does it work with termination (ctrl+c) of the app? we usually switch on it and the shutdown_rx
}; | ||
|
||
let from = vec![]; | ||
self.db.delete_range_cf(&cf, from, last_synced_key)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add some additional metrics to track the execution time of this operation?
} | ||
Err(e) => { | ||
error!("Failed to clean synchronized indexes for {:?} with error {}", asset_type, e); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll stop trying after 1 error, that's not ideal solution
tokio::select! { | ||
_ = rx.recv() => {} | ||
_ = async move { | ||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tokio::select should be inside the loop, as we'll not exit from inside this loop on termination signal, will we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just drop that break and we're good to go
After each synchronization redundant idxs will be removed from the rocksDb