Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mtg-836] Separate fungible token accounts from non-fungible ones #305

Merged
merged 34 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3ff2e46
Write a fungible token-related events into a dedicated RocksDb column
kstepanovdev Nov 5, 2024
15f7b20
Added writing fungible assets into RocksDb
kstepanovdev Nov 6, 2024
c8afedd
add multithread operations for writing/reading from rocksDb and Postg…
kstepanovdev Nov 6, 2024
8d4b5d8
Fixed migration
kstepanovdev Nov 6, 2024
35265ab
turn the fork cleaner on/off from configuration
StanChe Nov 4, 2024
b0bc8d7
nit the comment
StanChe Nov 4, 2024
354734c
updated dependencies and imports to support other SVMs
StanChe Nov 6, 2024
61d09a6
Split methods for fungible and non-fungible assets
kstepanovdev Nov 8, 2024
1651dc3
Fix non-integration tests
kstepanovdev Nov 8, 2024
ae88520
chore: naming in env (#308)
n00m4d Nov 7, 2024
4328c4a
chore: env example changes
n00m4d Nov 7, 2024
23f726f
Merge branch 'main' of github.com:metaplex-foundation/aura into separ…
kstepanovdev Nov 11, 2024
a8f2132
Add FungibleAssets to discriptors and update the RocksDB data model
kstepanovdev Nov 11, 2024
a7fa0c7
Await for syncronizer to finish its work
kstepanovdev Nov 11, 2024
b4866af
Await for synchronizer execution
kstepanovdev Nov 11, 2024
0cf3ab9
Satisfy clippy
kstepanovdev Nov 11, 2024
fe3ccd8
Even further separation of fungible assets updates from non-fungible
kstepanovdev Nov 12, 2024
2d9046d
Fix comments from the review
kstepanovdev Nov 15, 2024
20a4572
Merge branch 'main' of github.com:metaplex-foundation/aura into separ…
kstepanovdev Nov 15, 2024
a39c1bc
Separate timeouts for two sub-synchronizers
kstepanovdev Nov 15, 2024
0622c99
Satisfy clippy
kstepanovdev Nov 15, 2024
290175a
Add transactions for some update assets indexes batch operations
kstepanovdev Nov 19, 2024
13bef3e
Removed redundant call for fungible token index, \
kstepanovdev Nov 19, 2024
1ac335f
Add a possibility to store fungible assets into the temporary db
kstepanovdev Nov 19, 2024
db7d5e9
Fixed tests
kstepanovdev Nov 19, 2024
555d596
Fixed overwhelm_seq_gap
kstepanovdev Nov 19, 2024
024dab9
Splitted load_from_dump() for fungible and non-fungible parts
kstepanovdev Nov 19, 2024
0746bc1
Fixed dump tests && unparallel dump test
kstepanovdev Nov 19, 2024
5745429
improve SQL readability
kstepanovdev Nov 20, 2024
63a1c4a
Change naming for non fungible tokens in methods
kstepanovdev Nov 22, 2024
08f705a
Fix tests
kstepanovdev Nov 22, 2024
5067244
Added explicit rollbacks
kstepanovdev Nov 22, 2024
86b4587
Separate dumping of fungible tokens from NFTs
kstepanovdev Nov 22, 2024
4a9b34f
Fix explicit rollback transaction on error && add metrics
kstepanovdev Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions entities/src/enums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,11 @@ pub enum TokenType {
CompressedNFT,
All,
}

#[derive(Debug, Copy, Clone, PartialEq)]
pub enum AssetType {
NonFungible = 1,
Fungible = 2,
}

pub const ASSET_TYPES: [AssetType; 2] = [AssetType::Fungible, AssetType::NonFungible];
10 changes: 10 additions & 0 deletions entities/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ pub struct AssetIndex {
pub fungible_asset_balance: Option<u64>,
}

/// FungibleAssetIndex is the struct that is stored in the postgres, and is used to query the fungible asset pubkeys.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default)]
pub struct FungibleAssetIndex {
RequescoS marked this conversation as resolved.
Show resolved Hide resolved
pub pubkey: Pubkey,
pub owner: Option<Pubkey>,
pub slot_updated: i64,
pub fungible_asset_mint: Option<Pubkey>,
pub fungible_asset_balance: Option<u64>,
}

/// FungibleToken is associated token account
/// owned by some user
/// key - token account's pubkey
Expand Down
3 changes: 1 addition & 2 deletions grpc/src/asseturls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ impl DownloadError {
/// Generated client implementations.
pub mod asset_url_service_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::http::Uri;
use tonic::codegen::*;
use tonic::codegen::{http::Uri, *};
#[derive(Debug, Clone)]
pub struct AssetUrlServiceClient<T> {
inner: tonic::client::Grpc<T>,
Expand Down
2 changes: 1 addition & 1 deletion migrations/1_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,4 @@ CREATE TABLE last_synced_key (
);

-- Insert an initial row (assuming there's no last_synced_key initially)
INSERT INTO last_synced_key (last_synced_asset_update_key) VALUES (null);
INSERT INTO last_synced_key (last_synced_asset_update_key) VALUES (null);
3 changes: 3 additions & 0 deletions migrations/9_insert_fungible_idx.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE last_synced_key DROP CONSTRAINT only_one_row;
INSERT INTO last_synced_key (id, last_synced_asset_update_key) VALUES (2, NULL);
ALTER TABLE last_synced_key ADD CONSTRAINT only_two_rows CHECK (id IN (1, 2));
StanChe marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion nft_ingester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,4 @@ name = "raw_backfiller"
name = "synchronizer"

[[bin]]
name = "raw_backup"
name = "raw_backup"
25 changes: 18 additions & 7 deletions nft_ingester/src/api/backfilling_state_consistency.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::api::synchronization_state_consistency::CATCH_UP_SEQUENCES_TIMEOUT_SEC;
use entities::enums::{AssetType, ASSET_TYPES};
use interface::consistency_check::ConsistencyChecker;
use jsonrpc_core::Call;
use rocks_db::Storage;
Expand All @@ -10,27 +11,35 @@ use tokio::task::{JoinError, JoinSet};
use tracing::info;

pub struct BackfillingStateConsistencyChecker {
overwhelm_backfill_gap: Arc<AtomicBool>,
overwhelm_fungible_backfill_gap: Arc<AtomicBool>,
overwhelm_nft_backfill_gap: Arc<AtomicBool>,
}

impl BackfillingStateConsistencyChecker {
pub(crate) fn new() -> Self {
Self {
overwhelm_backfill_gap: Arc::new(AtomicBool::new(false)),
overwhelm_fungible_backfill_gap: Arc::new(AtomicBool::new(false)),
overwhelm_nft_backfill_gap: Arc::new(AtomicBool::new(false)),
}
}

pub(crate) async fn run(
&self,
tasks: Arc<Mutex<JoinSet<Result<(), JoinError>>>>,
mut rx: tokio::sync::broadcast::Receiver<()>,
rx: tokio::sync::broadcast::Receiver<()>,
rocks_db: Arc<Storage>,
consistence_backfilling_slots_threshold: u64,
) {
let overwhelm_backfill_gap_clone = self.overwhelm_backfill_gap.clone();
tasks.lock().await.spawn(async move {
for asset_type in ASSET_TYPES {
let rocks_db = rocks_db.clone();
let mut rx = rx.resubscribe();
let overwhelm_backfill_gap = match asset_type {
AssetType::NonFungible => self.overwhelm_nft_backfill_gap.clone(),
AssetType::Fungible => self.overwhelm_fungible_backfill_gap.clone(),
};
tasks.lock().await.spawn(async move {
while rx.is_empty() {
overwhelm_backfill_gap_clone.store(
overwhelm_backfill_gap.store(
rocks_db.bubblegum_slots.iter_start().count().saturating_add(rocks_db.ingestable_slots.iter_start().count())
>= consistence_backfilling_slots_threshold as usize,
Ordering::Relaxed,
Expand All @@ -45,11 +54,13 @@ impl BackfillingStateConsistencyChecker {
}
Ok(())
});
}
}
}

impl ConsistencyChecker for BackfillingStateConsistencyChecker {
fn should_cancel_request(&self, _call: &Call) -> bool {
self.overwhelm_backfill_gap.load(Ordering::Relaxed)
self.overwhelm_nft_backfill_gap.load(Ordering::Relaxed)
&& self.overwhelm_fungible_backfill_gap.load(Ordering::Relaxed)
}
}
80 changes: 49 additions & 31 deletions nft_ingester/src/api/synchronization_state_consistency.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use entities::enums::{AssetType, ASSET_TYPES};
use interface::consistency_check::ConsistencyChecker;
use jsonrpc_core::Call;
use postgre_client::storage_traits::AssetIndexStorage;
Expand Down Expand Up @@ -29,60 +30,77 @@ const INDEX_STORAGE_DEPENDS_METHODS: &[&str] = &[
];

pub struct SynchronizationStateConsistencyChecker {
overwhelm_seq_gap: Arc<AtomicBool>,
overwhelm_nft_seq_gap: Arc<AtomicBool>,
overwhelm_fungible_seq_gap: Arc<AtomicBool>,
}

impl SynchronizationStateConsistencyChecker {
pub(crate) fn new() -> Self {
Self {
overwhelm_seq_gap: Arc::new(AtomicBool::new(false)),
overwhelm_nft_seq_gap: Arc::new(AtomicBool::new(false)),
overwhelm_fungible_seq_gap: Arc::new(AtomicBool::new(false)),
}
}

pub(crate) async fn run(
&self,
tasks: Arc<Mutex<JoinSet<Result<(), JoinError>>>>,
mut rx: tokio::sync::broadcast::Receiver<()>,
rx: tokio::sync::broadcast::Receiver<()>,
pg_client: Arc<PgClient>,
rocks_db: Arc<Storage>,
synchronization_api_threshold: u64,
) {
let overwhelm_seq_gap_clone = self.overwhelm_seq_gap.clone();
tasks.lock().await.spawn(async move {
while rx.is_empty() {
let Ok(Some(index_seq)) = pg_client.fetch_last_synced_id().await else {
continue;
};
let Ok(decoded_index_update_key) = decode_u64x2_pubkey(index_seq) else {
continue;
};
let Ok(Some(primary_update_key)) = rocks_db.last_known_asset_updated_key() else {
continue;
};
for asset_type in ASSET_TYPES {
let overwhelm_seq_gap = match asset_type {
AssetType::NonFungible => self.overwhelm_nft_seq_gap.clone(),
AssetType::Fungible => self.overwhelm_fungible_seq_gap.clone(),
};
let pg_client = pg_client.clone();
let rocks_db = rocks_db.clone();
let mut rx = rx.resubscribe();
tasks.lock().await.spawn(async move {
while rx.is_empty() {
let Ok(Some(index_seq)) = pg_client.fetch_last_synced_id(asset_type).await else {
continue;
};
let Ok(decoded_index_update_key) = decode_u64x2_pubkey(index_seq) else {
continue;
};

overwhelm_seq_gap_clone.store(
primary_update_key
.seq
.saturating_sub(decoded_index_update_key.seq)
>= synchronization_api_threshold,
Ordering::Relaxed,
);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(CATCH_UP_SEQUENCES_TIMEOUT_SEC))=> {},
_ = rx.recv() => {
info!("Received stop signal, stopping SynchronizationStateConsistencyChecker...");
return Ok(());
let last_known_updated_asset = match asset_type {
AssetType::NonFungible => rocks_db.last_known_nft_asset_updated_key(),
AssetType::Fungible => rocks_db.last_known_fungible_asset_updated_key(),
};
let Ok(Some(primary_update_key)) = last_known_updated_asset else {
continue;
};

overwhelm_seq_gap.store(
StanChe marked this conversation as resolved.
Show resolved Hide resolved
primary_update_key
.seq
.saturating_sub(decoded_index_update_key.seq)
>= synchronization_api_threshold,
Ordering::Relaxed,
);
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(CATCH_UP_SEQUENCES_TIMEOUT_SEC))=> {},
_ = rx.recv() => {
info!("Received stop signal, stopping SynchronizationStateConsistencyChecker...");
return Ok(());
}
}
}
}
Ok(())
});
Ok(())
});
}
}
}

impl ConsistencyChecker for SynchronizationStateConsistencyChecker {
fn should_cancel_request(&self, call: &Call) -> bool {
if !self.overwhelm_seq_gap.load(Ordering::Relaxed) {
if !&self.overwhelm_nft_seq_gap.load(Ordering::Relaxed)
|| !&self.overwhelm_fungible_seq_gap.load(Ordering::Relaxed)
{
return false;
}

Expand Down
66 changes: 56 additions & 10 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use arweave_rs::consts::ARWEAVE_BASE_URL;
use arweave_rs::Arweave;
use entities::enums::{AssetType, ASSET_TYPES};
use nft_ingester::batch_mint::batch_mint_persister::{BatchMintDownloaderForPersister, BatchMintPersister};
use nft_ingester::scheduler::Scheduler;
use postgre_client::PG_MIGRATIONS_PATH;
use std::panic;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
Expand Down Expand Up @@ -121,7 +123,7 @@ pub async fn main() -> Result<(), IngesterError> {
.await?,
);

let synchronizer = Synchronizer::new(
let synchronizer = Arc::new(Synchronizer::new(
primary_rocks_storage.clone(),
index_pg_storage.clone(),
index_pg_storage.clone(),
Expand All @@ -130,11 +132,33 @@ pub async fn main() -> Result<(), IngesterError> {
metrics_state.synchronizer_metrics.clone(),
config.synchronizer_parallel_tasks,
config.run_temp_sync_during_dump,
);
));

if config.run_dump_synchronize_on_start {
info!("Running dump synchronizer on start!");
synchronizer.full_syncronize(&shutdown_rx.resubscribe()).await?;
for asset_type in ASSET_TYPES {
let synchronizer = synchronizer.clone();
let shutdown_rx = shutdown_rx.resubscribe();

mutexed_tasks.lock().await.spawn(async move {
if let Err(e) = synchronizer
.full_syncronize(&shutdown_rx.resubscribe(), asset_type)
.await
{
error!("Failed to syncronize on {:?} with error {}", asset_type, e);
panic!("Failed to syncronize on {:?} with error {}", asset_type, e);
}

Ok(())
});
}
}
while let Some(res) = mutexed_tasks.lock().await.join_next().await {
match res {
Ok(_) => {}
Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
Err(err) => panic!("{err}"),
}
}

// setup receiver
Expand Down Expand Up @@ -581,15 +605,37 @@ pub async fn main() -> Result<(), IngesterError> {
}

if !config.disable_synchronizer {
let rx = shutdown_rx.resubscribe();
mutexed_tasks.lock().await.spawn(async move {
synchronizer
.run(&rx, config.dump_sync_threshold, Duration::from_secs(5))
.await;
let synchronizer = Arc::new(synchronizer);

Ok(())
});
for asset_type in ASSET_TYPES {
let rx = shutdown_rx.resubscribe();
let synchronizer = synchronizer.clone();
mutexed_tasks.lock().await.spawn(async move {
match asset_type {
AssetType::NonFungible => {
synchronizer
.nft_run(&rx, config.dump_sync_threshold, Duration::from_secs(5))
.await
}
AssetType::Fungible => {
synchronizer
.fungible_run(&rx, config.dump_sync_threshold, Duration::from_secs(5))
.await
}
}

Ok(())
});
}
}
while let Some(res) = mutexed_tasks.lock().await.join_next().await {
match res {
Ok(_) => {}
Err(err) if err.is_panic() => panic::resume_unwind(err.into_panic()),
Err(err) => panic!("{err}"),
}
}

// setup dependencies for grpc server
let uc = AssetStreamer::new(config.peer_grpc_max_gap_slots, primary_rocks_storage.clone());
let bs = BlocksStreamer::new(config.peer_grpc_max_gap_slots, primary_rocks_storage.clone());
Expand Down
Loading
Loading