Skip to content

Commit

Permalink
Splitted load_from_dump() for fungible and non-fungible parts
Browse files Browse the repository at this point in the history
  • Loading branch information
kstepanovdev committed Nov 19, 2024
1 parent 555d596 commit 024dab9
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 71 deletions.
7 changes: 5 additions & 2 deletions nft_ingester/src/index_syncronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,15 @@ where
self.metrics.clone(),
)
.await?;
tracing::info!("Dump is complete. Loading the dump into the index storage");
tracing::info!(
"{:?} Dump is complete. Loading the dump into the index storage",
asset_type
);

self.index_storage
.load_from_dump(path, last_included_rocks_key, asset_type)
.await?;
tracing::info!("Dump is loaded into the index storage");
tracing::info!("{:?} Dump is loaded into the index storage", asset_type);
Ok(())
}

Expand Down
108 changes: 61 additions & 47 deletions postgre-client/src/asset_index_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,55 +369,69 @@ impl AssetIndexStorage for PgClient {
last_key: &[u8],
asset_type: AssetType,
) -> Result<(), IndexDbError> {
let Some(metadata_path) = base_path.join("metadata.csv").to_str().map(str::to_owned) else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};
let Some(creators_path) = base_path.join("creators.csv").to_str().map(str::to_owned) else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};
let Some(assets_authorities_path) = base_path
.join("assets_authorities.csv")
.to_str()
.map(str::to_owned)
else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};
let Some(assets_path) = base_path.join("assets.csv").to_str().map(str::to_owned) else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};
let Some(fungible_tokens_path) = base_path
.join("fungible_tokens.csv")
.to_str()
.map(str::to_owned)
else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};
let mut transaction = self.start_transaction().await?;
match asset_type {
AssetType::NonFungible => {
let Some(metadata_path) =
base_path.join("metadata.csv").to_str().map(str::to_owned)
else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};
let Some(creators_path) =
base_path.join("creators.csv").to_str().map(str::to_owned)
else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};
let Some(assets_authorities_path) = base_path
.join("assets_authorities.csv")
.to_str()
.map(str::to_owned)
else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};
let Some(assets_path) = base_path.join("assets.csv").to_str().map(str::to_owned)
else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};

self.copy_non_fungibles(
metadata_path,
creators_path,
assets_path,
assets_authorities_path,
&mut transaction,
)
.await?;
}
AssetType::Fungible => {
let Some(fungible_tokens_path) = base_path
.join("fungible_tokens.csv")
.to_str()
.map(str::to_owned)
else {
return Err(IndexDbError::BadArgument(format!(
"invalid path '{:?}'",
base_path
)));
};

self.copy_fungibles(fungible_tokens_path, &mut transaction)
.await?;
}
}

self.copy_all(
metadata_path,
creators_path,
assets_path,
assets_authorities_path,
fungible_tokens_path,
&mut transaction,
)
.await?;
self.update_last_synced_key(last_key, &mut transaction, "last_synced_key", asset_type)
.await?;
self.commit_transaction(transaction).await?;
Expand Down
97 changes: 75 additions & 22 deletions postgre-client/src/load_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,25 @@ impl PgClient {
.await
}

pub async fn drop_indexes(
pub async fn drop_fungible_indexes(
&self,
transaction: &mut Transaction<'_, Postgres>,
) -> Result<(), IndexDbError> {
let mut query_builder: QueryBuilder<'_, Postgres> =
QueryBuilder::new("ALTER TABLE assets_v3 DISABLE TRIGGER ALL;");
self.execute_query_with_metrics(transaction, &mut query_builder, ALTER_ACTION, "assets_v3")
.await?;

for index in [
"fungible_tokens_fbt_owner_balance_idx",
"fungible_tokens_fbt_asset_idx",
] {
self.drop_index(transaction, index).await?;
}
Ok(())
}

pub async fn drop_non_fungible_indexes(
&self,
transaction: &mut Transaction<'_, Postgres>,
) -> Result<(), IndexDbError> {
Expand Down Expand Up @@ -118,8 +136,6 @@ impl PgClient {
"assets_v3_is_frozen",
"assets_v3_supply",
"assets_v3_slot_updated",
"fungible_tokens_fbt_owner_balance_idx",
"fungible_tokens_fbt_asset_idx",
] {
self.drop_index(transaction, index).await?;
}
Expand Down Expand Up @@ -156,7 +172,33 @@ impl PgClient {
self.execute_query_with_metrics(transaction, &mut query_builder, CREATE_ACTION, name)
.await
}
pub async fn recreate_indexes(

pub async fn recreate_fungible_indexes(
&self,
transaction: &mut Transaction<'_, Postgres>,
) -> Result<(), IndexDbError> {
let mut query_builder: QueryBuilder<'_, Postgres> =
QueryBuilder::new("ALTER TABLE assets_v3 ENABLE TRIGGER ALL;");
self.execute_query_with_metrics(transaction, &mut query_builder, ALTER_ACTION, "assets_v3")
.await?;

for (index, on_query_string) in [
(
"fungible_tokens_fbt_owner_balance_idx",
"fungible_tokens(fbt_owner, fbt_balance)",
),
(
"fungible_tokens_fbt_asset_idx",
"fungible_tokens(fbt_asset)",
),
] {
self.create_index(transaction, index, on_query_string)
.await?;
}
Ok(())
}

pub async fn recreate_non_fungible_indexes(
&self,
transaction: &mut Transaction<'_, Postgres>,
) -> Result<(), IndexDbError> {
Expand Down Expand Up @@ -185,8 +227,6 @@ impl PgClient {
("assets_v3_is_frozen", "assets_v3(ast_is_frozen) WHERE ast_is_frozen IS TRUE"),
("assets_v3_supply", "assets_v3(ast_supply) WHERE ast_supply IS NOT NULL"),
("assets_v3_slot_updated", "assets_v3(ast_slot_updated)"),
("fungible_tokens_fbt_owner_balance_idx", "fungible_tokens(fbt_owner, fbt_balance)"),
("fungible_tokens_fbt_asset_idx", "fungible_tokens(fbt_asset)"),
]{
self.create_index(transaction, index, on_query_string).await?;
}
Expand Down Expand Up @@ -226,23 +266,42 @@ impl PgClient {
Ok(())
}

pub(crate) async fn copy_all(
pub(crate) async fn copy_fungibles(
&self,
fungible_tokens_copy_path: String,
transaction: &mut Transaction<'_, Postgres>,
) -> Result<(), IndexDbError> {
self.drop_fungible_indexes(transaction).await?;
self.drop_constraints(transaction).await?;
self.truncate_table(transaction, "fungible_tokens").await?;

let table = "tasks";
self.create_temp_tables(table, transaction, true, TEMP_TABLE_PREFIX)
.await?;
self.insert_from_temp_table(transaction, table).await?;
self.copy_table_from(
transaction,
fungible_tokens_copy_path,
"fungible_tokens",
"fbt_pubkey, fbt_owner, fbt_asset, fbt_balance, fbt_slot_updated",
)
.await?;
self.recreate_fungible_indexes(transaction).await?;
self.recreate_constraints(transaction).await?;
Ok(())
}

pub(crate) async fn copy_non_fungibles(
&self,
matadata_copy_path: String,
asset_creators_copy_path: String,
assets_copy_path: String,
assets_authorities_copy_path: String,
fungible_tokens_copy_path: String,
transaction: &mut Transaction<'_, Postgres>,
) -> Result<(), IndexDbError> {
self.drop_indexes(transaction).await?;
self.drop_non_fungible_indexes(transaction).await?;
self.drop_constraints(transaction).await?;
for table in [
"assets_v3",
"asset_creators_v3",
"assets_authorities",
"fungible_tokens",
] {
for table in ["assets_v3", "asset_creators_v3", "assets_authorities"] {
self.truncate_table(transaction, table).await?;
}

Expand All @@ -257,7 +316,6 @@ impl PgClient {
)
.await?;
self.insert_from_temp_table(transaction, table).await?;

for (table, path, columns) in [
(
"asset_creators_v3",
Expand All @@ -274,15 +332,10 @@ impl PgClient {
assets_copy_path,
"ast_pubkey, ast_specification_version, ast_specification_asset_class, ast_royalty_target_type, ast_royalty_amount, ast_slot_created, ast_owner_type, ast_owner, ast_delegate, ast_authority_fk, ast_collection, ast_is_collection_verified, ast_is_burnt, ast_is_compressible, ast_is_compressed, ast_is_frozen, ast_supply, ast_metadata_url_id, ast_slot_updated",
),
(
"fungible_tokens",
fungible_tokens_copy_path,
"fbt_pubkey, fbt_owner, fbt_asset, fbt_balance, fbt_slot_updated",
),
] {
self.copy_table_from(transaction, path, table, columns).await?;
}
self.recreate_indexes(transaction).await?;
self.recreate_non_fungible_indexes(transaction).await?;
self.recreate_constraints(transaction).await?;
Ok(())
}
Expand Down

0 comments on commit 024dab9

Please sign in to comment.