Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: collection hash indexing #104

Merged
merged 3 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,75 +1,111 @@
use anchor_lang::prelude::Pubkey;
use blockbuster::{
instruction::InstructionBundle,
programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload},
};
use digital_asset_types::dao::{asset, asset_grouping};
use digital_asset_types::dao::{asset, asset_grouping, cl_items};
use log::{debug, info, warn};
use mpl_bubblegum::{hash_metadata, state::metaplex_adapter::Collection};
use sea_orm::{entity::*, query::*, sea_query::OnConflict, DbBackend, Set, Unchanged};
use solana_sdk::keccak;

use super::{save_changelog_event, update_compressed_asset};
use crate::error::IngesterError;
pub async fn process<'c, T>(
parsing_result: &BubblegumInstruction,
bundle: &InstructionBundle<'c>,
txn: &'c T,
verify: bool,
instruction: &str,
) -> Result<(), IngesterError>
where
T: ConnectionTrait + TransactionTrait,
{
if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) {
// Do we need to update the `slot_updated` field as well as part of the table
// updates below?
if let (Some(le), Some(cl), Some(payload)) = (
&parsing_result.leaf_update,
&parsing_result.tree_update,
&parsing_result.payload,
) {
let (collection, verify, metadata) = match payload {
Payload::CollectionVerification {
collection,
verify,
args,
} => (collection.clone(), verify.clone(), args.clone()),
_ => {
return Err(IngesterError::ParsingError(
"Ix not parsed correctly".to_string(),
));
}
};
debug!(
"Handling collection verification event for {} (verify: {}): {}",
collection, verify, bundle.txn_id
);
let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?;
match le.schema {
LeafSchema::V1 { id, .. } => {
let id_bytes = id.to_bytes().to_vec();
let id_bytes = match le.schema {
LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(),
};

let mut updated_metadata = metadata.clone();
updated_metadata.collection = Some(Collection {
key: collection.clone(),
verified: verify,
});

let asset_to_update = asset::ActiveModel {
id: Unchanged(id_bytes.clone()),
leaf: Set(Some(le.leaf_hash.to_vec())),
seq: Set(seq as i64),
..Default::default()
};
update_compressed_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?;
let updated_data_hash = hash_metadata(&updated_metadata)
.map(|e| bs58::encode(e).into_string())
.unwrap_or("".to_string())
.trim()
.to_string();

if verify {
if let Some(Payload::SetAndVerifyCollection { collection }) =
parsing_result.payload
{
let grouping = asset_grouping::ActiveModel {
asset_id: Set(id_bytes.clone()),
group_key: Set("collection".to_string()),
group_value: Set(Some(collection.to_string())),
seq: Set(seq as i64),
slot_updated: Set(bundle.slot as i64),
..Default::default()
};
let mut query = asset_grouping::Entity::insert(grouping)
.on_conflict(
OnConflict::columns([
asset_grouping::Column::AssetId,
asset_grouping::Column::GroupKey,
])
.update_columns([
asset_grouping::Column::GroupKey,
asset_grouping::Column::GroupValue,
asset_grouping::Column::Seq,
asset_grouping::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
let asset_to_update = asset::ActiveModel {
id: Unchanged(id_bytes.clone()),
leaf: Set(Some(le.leaf_hash.to_vec())),
seq: Set(seq as i64),
data_hash: Set(Some(updated_data_hash.clone())), // todo remove clone
..Default::default()
};
update_compressed_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?;

if verify {
let grouping = asset_grouping::ActiveModel {
asset_id: Set(id_bytes.clone()),
group_key: Set("collection".to_string()),
group_value: Set(Some(collection.to_string())),
seq: Set(seq as i64),
slot_updated: Set(bundle.slot as i64),
..Default::default()
};
let mut query = asset_grouping::Entity::insert(grouping)
.on_conflict(
OnConflict::columns([
asset_grouping::Column::AssetId,
asset_grouping::Column::GroupKey,
])
.update_columns([
asset_grouping::Column::GroupKey,
asset_grouping::Column::GroupValue,
asset_grouping::Column::Seq,
asset_grouping::Column::SlotUpdated,
])
.to_owned(),
)
.build(DbBackend::Postgres);
query.sql = format!(
"{} WHERE excluded.slot_updated > asset_grouping.slot_updated AND excluded.seq >= asset_grouping.seq",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah not sure if we want the slot_updated here because there could be multiple tx in the same slot when just dealing with bubblegum transactions?

query.sql
);
txn.execute(query).await?;
}
}
id_bytes
}
};
txn.execute(query).await?;
} else {
// TODO: Support collection unverification.
// We will likely need to nullify the collection field so we can maintain
// the seq value and avoid out-of-order indexing bugs.
Comment on lines +100 to +102
Copy link

@danenbm danenbm Jul 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally was thinking along this route but ended up just adding a verified field the asset_grouping table, and then changing the Read API calls to filter on Verified.eq(true). I did that because it is more consistent with the asset_creators table approach and the contract does in fact have the concept of verify so it made sense to me.

Maybe after your change to blockbuster, I would not need a separate sequence number because i could update collection and slot_updated when just doing VerifyCollection and UnverifyCollection, all protected by a single sequence number. Either way I need to make some change based on the blockbuster change to pick up the new data hash. We should chat offline about approach before I merge my PR.

Here is my PR for reference: metaplex-foundation#90

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing to factor in is that creators can be removed, so we need to be able to support that case too.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I agree nullification is better future proofing for when we have metadata updates

warn!(
"Collection unverification not processed for asset {} and collection {}",
bs58::encode(id_bytes).into_string(),
collection
);
}

return Ok(());
};
Expand Down
12 changes: 4 additions & 8 deletions nft_ingester/src/program_transformers/bubblegum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,10 @@ where
InstructionName::UnverifyCreator => {
creator_verification::process(parsing_result, bundle, txn, false, ix_str).await?;
}
InstructionName::VerifyCollection => {
collection_verification::process(parsing_result, bundle, txn, true, ix_str).await?;
}
InstructionName::UnverifyCollection => {
collection_verification::process(parsing_result, bundle, txn, false, ix_str).await?;
}
InstructionName::SetAndVerifyCollection => {
collection_verification::process(parsing_result, bundle, txn, true, ix_str).await?;
InstructionName::VerifyCollection
| InstructionName::UnverifyCollection
| InstructionName::SetAndVerifyCollection => {
collection_verification::process(parsing_result, bundle, txn, ix_str).await?;
}
_ => debug!("Bubblegum: Not Implemented Instruction"),
}
Expand Down