From 1c99d240840cc4ffa4de2b4e41b5b2e1977b9f88 Mon Sep 17 00:00:00 2001 From: Nicolas Pennie Date: Tue, 25 Jul 2023 17:30:56 -0700 Subject: [PATCH 1/2] fix: collection hash indexing --- .../bubblegum/collection_verification.rs | 168 +++++++++++++----- .../src/program_transformers/bubblegum/mod.rs | 12 +- 2 files changed, 123 insertions(+), 57 deletions(-) diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index 68e672c6..d1e1d70d 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -1,9 +1,13 @@ +use anchor_lang::prelude::Pubkey; use blockbuster::{ instruction::InstructionBundle, programs::bubblegum::{BubblegumInstruction, LeafSchema, Payload}, }; -use digital_asset_types::dao::{asset, asset_grouping}; +use digital_asset_types::dao::{asset, asset_grouping, cl_items}; +use log::{debug, info, warn}; +use mpl_bubblegum::{hash_metadata, state::metaplex_adapter::Collection}; use sea_orm::{entity::*, query::*, sea_query::OnConflict, DbBackend, Set, Unchanged}; +use solana_sdk::keccak; use super::{save_changelog_event, update_compressed_asset}; use crate::error::IngesterError; @@ -11,65 +15,131 @@ pub async fn process<'c, T>( parsing_result: &BubblegumInstruction, bundle: &InstructionBundle<'c>, txn: &'c T, - verify: bool, instruction: &str, ) -> Result<(), IngesterError> where T: ConnectionTrait + TransactionTrait, { - if let (Some(le), Some(cl)) = (&parsing_result.leaf_update, &parsing_result.tree_update) { - // Do we need to update the `slot_updated` field as well as part of the table - // updates below? + if let (Some(le), Some(cl), Some(payload)) = ( + &parsing_result.leaf_update, + &parsing_result.tree_update, + &parsing_result.payload, + ) { + let (collection, verify, metadata) = match payload { + Payload::CollectionVerification { + collection, + verify, + args, + } => (collection.clone(), verify.clone(), args.clone()), + _ => { + return Err(IngesterError::ParsingError( + "Ix not parsed correctly".to_string(), + )); + } + }; + debug!( + "Handling collection verification event for {} (verify: {}): {}", + collection, verify, bundle.txn_id + ); let seq = save_changelog_event(cl, bundle.slot, bundle.txn_id, txn, instruction).await?; - match le.schema { - LeafSchema::V1 { id, .. } => { - let id_bytes = id.to_bytes().to_vec(); + let id_bytes = match le.schema { + LeafSchema::V1 { id, .. } => id.to_bytes().to_vec(), + }; - let asset_to_update = asset::ActiveModel { - id: Unchanged(id_bytes.clone()), - leaf: Set(Some(le.leaf_hash.to_vec())), - seq: Set(seq as i64), - ..Default::default() - }; - update_compressed_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?; + let mut updated_metadata = metadata.clone(); + updated_metadata.collection = Some(Collection { + key: collection.clone(), + verified: verify, + }); - if verify { - if let Some(Payload::SetAndVerifyCollection { collection }) = - parsing_result.payload - { - let grouping = asset_grouping::ActiveModel { - asset_id: Set(id_bytes.clone()), - group_key: Set("collection".to_string()), - group_value: Set(collection.to_string()), - seq: Set(seq as i64), - slot_updated: Set(bundle.slot as i64), - ..Default::default() - }; - let mut query = asset_grouping::Entity::insert(grouping) - .on_conflict( - OnConflict::columns([ - asset_grouping::Column::AssetId, - asset_grouping::Column::GroupKey, - ]) - .update_columns([ - asset_grouping::Column::GroupKey, - asset_grouping::Column::GroupValue, - asset_grouping::Column::Seq, - asset_grouping::Column::SlotUpdated, - ]) - .to_owned(), - ) - .build(DbBackend::Postgres); - query.sql = format!( + let updated_data_hash = hash_metadata(&updated_metadata) + .map(|e| bs58::encode(e).into_string()) + .unwrap_or("".to_string()) + .trim() + .to_string(); + + let asset_to_update = asset::ActiveModel { + id: Unchanged(id_bytes.clone()), + leaf: Set(Some(le.leaf_hash.to_vec())), + seq: Set(seq as i64), + data_hash: Set(Some(updated_data_hash.clone())), // todo remove clone + ..Default::default() + }; + update_compressed_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?; + + // REMOVE: TESTING + let e = asset::Entity::find_by_id(id_bytes.clone()) + .one(txn) + .await? + .unwrap(); + let v: u8 = 1; + let dh = updated_data_hash.clone(); + info!("creator hash: {}", e.creator_hash.clone().unwrap()); + let computed_asset_hash = keccak::hashv(&[ + &[v], + id_bytes.clone().as_ref(), + e.owner.as_ref().unwrap(), + e.delegate.unwrap_or(Default::default()).as_ref(), + cl.index.to_le_bytes().as_ref(), + bs58::decode(dh).into_vec().unwrap().as_ref(), + // hash_metadata(&metadata.clone()).unwrap().as_ref(), + bs58::decode(e.creator_hash.unwrap().trim()) + .into_vec() + .unwrap() + .as_ref(), + ]); + let actual = bs58::encode(computed_asset_hash).into_string(); + let expected = bs58::encode(le.leaf_hash).into_string(); + info!("Computed asset hash: {}", actual); + if actual == expected { + info!("Asset hash matches leaf hash"); + } else { + warn!( + "Asset hash does not match leaf hash, expected: {}", + expected + ); + } + // END REMOVE + + if verify { + let grouping = asset_grouping::ActiveModel { + asset_id: Set(id_bytes.clone()), + group_key: Set("collection".to_string()), + group_value: Set(collection.to_string()), + seq: Set(seq as i64), + slot_updated: Set(bundle.slot as i64), + ..Default::default() + }; + let mut query = asset_grouping::Entity::insert(grouping) + .on_conflict( + OnConflict::columns([ + asset_grouping::Column::AssetId, + asset_grouping::Column::GroupKey, + ]) + .update_columns([ + asset_grouping::Column::GroupKey, + asset_grouping::Column::GroupValue, + asset_grouping::Column::Seq, + asset_grouping::Column::SlotUpdated, + ]) + .to_owned(), + ) + .build(DbBackend::Postgres); + query.sql = format!( "{} WHERE excluded.slot_updated > asset_grouping.slot_updated AND excluded.seq >= asset_grouping.seq", query.sql ); - txn.execute(query).await?; - } - } - id_bytes - } - }; + txn.execute(query).await?; + } else { + // TODO: Support collection unverification. + // We will likely need to nullify the collection field so we can maintain + // the seq value and avoid out-of-order indexing bugs. + warn!( + "Collection unverification not processed for asset {} and collection {}", + bs58::encode(id_bytes).into_string(), + collection + ); + } return Ok(()); }; diff --git a/nft_ingester/src/program_transformers/bubblegum/mod.rs b/nft_ingester/src/program_transformers/bubblegum/mod.rs index a96f3024..5dfee1b1 100644 --- a/nft_ingester/src/program_transformers/bubblegum/mod.rs +++ b/nft_ingester/src/program_transformers/bubblegum/mod.rs @@ -85,14 +85,10 @@ where InstructionName::UnverifyCreator => { creator_verification::process(parsing_result, bundle, txn, false, ix_str).await?; } - InstructionName::VerifyCollection => { - collection_verification::process(parsing_result, bundle, txn, true, ix_str).await?; - } - InstructionName::UnverifyCollection => { - collection_verification::process(parsing_result, bundle, txn, false, ix_str).await?; - } - InstructionName::SetAndVerifyCollection => { - collection_verification::process(parsing_result, bundle, txn, true, ix_str).await?; + InstructionName::VerifyCollection + | InstructionName::UnverifyCollection + | InstructionName::SetAndVerifyCollection => { + collection_verification::process(parsing_result, bundle, txn, ix_str).await?; } _ => debug!("Bubblegum: Not Implemented Instruction"), } From 69e14273b9a4402df585a16adda2494c94e70e8e Mon Sep 17 00:00:00 2001 From: Nicolas Pennie Date: Wed, 26 Jul 2023 17:09:34 -0700 Subject: [PATCH 2/2] prep for pr --- .../bubblegum/collection_verification.rs | 34 ------------------- 1 file changed, 34 deletions(-) diff --git a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs index d1e1d70d..08168928 100644 --- a/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs +++ b/nft_ingester/src/program_transformers/bubblegum/collection_verification.rs @@ -67,40 +67,6 @@ where }; update_compressed_asset(txn, id_bytes.clone(), Some(seq), asset_to_update).await?; - // REMOVE: TESTING - let e = asset::Entity::find_by_id(id_bytes.clone()) - .one(txn) - .await? - .unwrap(); - let v: u8 = 1; - let dh = updated_data_hash.clone(); - info!("creator hash: {}", e.creator_hash.clone().unwrap()); - let computed_asset_hash = keccak::hashv(&[ - &[v], - id_bytes.clone().as_ref(), - e.owner.as_ref().unwrap(), - e.delegate.unwrap_or(Default::default()).as_ref(), - cl.index.to_le_bytes().as_ref(), - bs58::decode(dh).into_vec().unwrap().as_ref(), - // hash_metadata(&metadata.clone()).unwrap().as_ref(), - bs58::decode(e.creator_hash.unwrap().trim()) - .into_vec() - .unwrap() - .as_ref(), - ]); - let actual = bs58::encode(computed_asset_hash).into_string(); - let expected = bs58::encode(le.leaf_hash).into_string(); - info!("Computed asset hash: {}", actual); - if actual == expected { - info!("Asset hash matches leaf hash"); - } else { - warn!( - "Asset hash does not match leaf hash, expected: {}", - expected - ); - } - // END REMOVE - if verify { let grouping = asset_grouping::ActiveModel { asset_id: Set(id_bytes.clone()),