From ecd848303cf8a5e4a392b44445e579f7bcf9bc11 Mon Sep 17 00:00:00 2001 From: Haardik Date: Mon, 22 Jan 2024 22:14:06 -0500 Subject: [PATCH] add block timestamp cache to indexers --- lib/eth/src/id_registry.rs | 27 ++++++--- lib/eth/src/indexer.rs | 104 ++++++++++++++++++++++++-------- lib/eth/src/key_registry.rs | 12 ++-- lib/eth/src/storage_registry.rs | 5 +- lib/hub/src/main.rs | 2 +- 5 files changed, 110 insertions(+), 40 deletions(-) diff --git a/lib/eth/src/id_registry.rs b/lib/eth/src/id_registry.rs index 0e882a3..563f4e3 100644 --- a/lib/eth/src/id_registry.rs +++ b/lib/eth/src/id_registry.rs @@ -1,4 +1,5 @@ use crate::utils::read_abi; +use core::time; use ethers::{ contract::{parse_log, Contract as EthContract, ContractInstance, EthEvent}, core::utils::keccak256, @@ -7,11 +8,15 @@ use ethers::{ }; use std::error::Error; use std::sync::Arc; -use teleport_common::protobufs::generated::{ - on_chain_event, IdRegisterEventBody, IdRegisterEventType, OnChainEvent, OnChainEventType, +use teleport_common::{ + protobufs::generated::{ + on_chain_event, IdRegisterEventBody, IdRegisterEventType, OnChainEvent, OnChainEventType, + }, + time::to_farcaster_time, }; use teleport_storage::db::{self}; use teleport_storage::Store; +use uuid::timestamp; #[derive(Debug, Clone, EthEvent)] struct Register { @@ -93,6 +98,7 @@ impl Contract { store: &Store, log: &Log, chain_id: u32, + timestamp: i64, ) -> Result<(), Box> { let parsed_log: Register = parse_log(log.clone())?; @@ -108,7 +114,7 @@ impl Contract { chain_id, block_number: log.block_number.unwrap().as_u32(), block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(), - block_timestamp: 0, + block_timestamp: timestamp as u64, transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(), log_index: log.log_index.unwrap().as_u32(), fid: parsed_log.id.as_u64(), @@ -122,9 +128,7 @@ impl Contract { let fid_row = db::FidRow { fid: parsed_log.id.as_u64() as i64, - // TODO: there is no efficient way to get the timestamp from the block - // without fetching the block itself in another RPC call - registered_at: 0, + registered_at: timestamp.into(), chain_event_id: event_row.id, custody_address: parsed_log.to.to_fixed_bytes(), recovery_address: parsed_log.recovery.to_fixed_bytes(), @@ -155,6 +159,7 @@ impl Contract { store: &Store, log: &Log, chain_id: u32, + timestamp: i64, ) -> Result<(), Box> { let parsed_log: Transfer = parse_log(log.clone()).unwrap(); @@ -170,7 +175,7 @@ impl Contract { chain_id, block_number: log.block_number.unwrap().as_u32(), block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(), - block_timestamp: 0, + block_timestamp: timestamp as u64, transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(), log_index: log.log_index.unwrap().as_u32(), fid: parsed_log.id.as_u64(), @@ -213,6 +218,7 @@ impl Contract { store: &Store, log: &Log, chain_id: u32, + timestamp: i64, ) -> Result<(), Box> { let parsed_log: Recover = parse_log(log.clone())?; @@ -228,7 +234,7 @@ impl Contract { chain_id, block_number: log.block_number.unwrap().as_u32(), block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(), - block_timestamp: 0, + block_timestamp: timestamp as u64, transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(), log_index: log.log_index.unwrap().as_u32(), fid: parsed_log.id.as_u64(), @@ -272,6 +278,7 @@ impl Contract { store: &Store, log: &Log, chain_id: u32, + timestamp: i64, ) -> Result<(), Box> { let parsed_log: ChangeRecoveryAddress = parse_log(log.clone())?; @@ -287,7 +294,7 @@ impl Contract { chain_id, block_number: log.block_number.unwrap().as_u32(), block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(), - block_timestamp: 0, + block_timestamp: timestamp as u64, transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(), log_index: log.log_index.unwrap().as_u32(), fid: parsed_log.id.as_u64(), @@ -386,7 +393,7 @@ mod tests { let logs = id_registry.get_register_logs(0, 100000000).await.unwrap(); id_registry - .persist_register_log(&store, &logs[0], 10u32) + .persist_register_log(&store, &logs[0], 10u32, 0i64) .await .unwrap(); diff --git a/lib/eth/src/indexer.rs b/lib/eth/src/indexer.rs index 34bbe3c..9a56e72 100644 --- a/lib/eth/src/indexer.rs +++ b/lib/eth/src/indexer.rs @@ -1,10 +1,12 @@ use crate::id_registry; use crate::key_registry; use crate::storage_registry; +use ethers::types::H256; use ethers::{ providers::{JsonRpcClient, Middleware, Provider}, types::BlockNumber, }; +use std::collections::HashMap; use std::error::Error; use teleport_storage::{db, Store}; use tokio; @@ -20,6 +22,7 @@ pub struct Indexer { id_registry: id_registry::Contract, key_registry: key_registry::Contract, storage_registry: storage_registry::Contract, + block_timestamp_cache: HashMap, } impl Indexer { @@ -47,6 +50,7 @@ impl Indexer { storage_reg_address, format!("{}/StorageRegistry.json", abi_dir), )?; + let block_timestamp_cache = HashMap::new(); Ok(Indexer { store, @@ -55,6 +59,7 @@ impl Indexer { key_registry, storage_registry, chain_id, + block_timestamp_cache, }) } @@ -75,33 +80,53 @@ impl Indexer { Ok(latest_block.number.unwrap().as_u64()) } - async fn sync_register_logs(&self, start: u64, end: u64) -> Result<(), Box> { + pub async fn get_block_timestamp(&mut self, block_hash: H256) -> Result> { + if let Some(timestamp) = self.block_timestamp_cache.get(&block_hash) { + return Ok(*timestamp); + } + + let block = self.provider.get_block(block_hash).await?.unwrap(); + let timestamp = block.timestamp.as_u32().into(); + self.block_timestamp_cache.insert(block_hash, timestamp); + Ok(timestamp) + } + + async fn sync_register_logs(&mut self, start: u64, end: u64) -> Result<(), Box> { let register_logs = self.id_registry.get_register_logs(start, end).await?; for log in register_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.id_registry - .persist_register_log(&self.store, &log, self.chain_id) + .persist_register_log(&self.store, &log, self.chain_id, timestamp) .await? } Ok(()) } - async fn sync_transfer_logs(&self, start: u64, end: u64) -> Result<(), Box> { + async fn sync_transfer_logs(&mut self, start: u64, end: u64) -> Result<(), Box> { let transfer_logs = self.id_registry.get_transfer_logs(start, end).await?; for log in transfer_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.id_registry - .persist_transfer_log(&self.store, &log, self.chain_id) + .persist_transfer_log(&self.store, &log, self.chain_id, timestamp) .await?; } Ok(()) } - async fn sync_recovery_logs(&self, start: u64, end: u64) -> Result<(), Box> { + async fn sync_recovery_logs(&mut self, start: u64, end: u64) -> Result<(), Box> { let recovery_logs = self.id_registry.get_recovery_logs(start, end).await?; for log in recovery_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.id_registry - .persist_recovery_log(&self.store, &log, self.chain_id) + .persist_recovery_log(&self.store, &log, self.chain_id, timestamp) .await? } @@ -109,7 +134,7 @@ impl Indexer { } async fn sync_change_recovery_address_logs( - &self, + &mut self, start: u64, end: u64, ) -> Result<(), Box> { @@ -118,77 +143,102 @@ impl Indexer { .get_change_recovery_address_logs(start, end) .await?; for log in change_recovery_address_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.id_registry - .persist_change_recovery_address_log(&self.store, &log, self.chain_id) + .persist_change_recovery_address_log(&self.store, &log, self.chain_id, timestamp) .await? } Ok(()) } - async fn sync_add_logs(&self, start: u64, end: u64) -> Result<(), Box> { + async fn sync_add_logs(&mut self, start: u64, end: u64) -> Result<(), Box> { let add_logs = self.key_registry.get_add_logs(start, end).await?; for log in add_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.key_registry - .persist_add_log(&self.store, &log, self.chain_id) + .persist_add_log(&self.store, &log, self.chain_id, timestamp) .await? } Ok(()) } - async fn sync_remove_logs(&self, start: u64, end: u64) -> Result<(), Box> { + async fn sync_remove_logs(&mut self, start: u64, end: u64) -> Result<(), Box> { let remove_logs = self.key_registry.get_remove_logs(start, end).await?; for log in remove_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.key_registry - .persist_remove_log(&self.store, &log, self.chain_id) + .persist_remove_log(&self.store, &log, self.chain_id, timestamp) .await? } Ok(()) } - async fn sync_admin_reset_logs(&self, start: u64, end: u64) -> Result<(), Box> { + async fn sync_admin_reset_logs(&mut self, start: u64, end: u64) -> Result<(), Box> { let admin_reset_logs = self.key_registry.get_admin_reset_logs(start, end).await?; for log in admin_reset_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.key_registry - .persist_admin_reset_log(&self.store, &log, self.chain_id) + .persist_admin_reset_log(&self.store, &log, self.chain_id, timestamp) .await? } Ok(()) } - async fn sync_migrated_logs(&self, start: u64, end: u64) -> Result<(), Box> { + async fn sync_migrated_logs(&mut self, start: u64, end: u64) -> Result<(), Box> { let migrated_logs = self.key_registry.get_migrated_logs(start, end).await?; for log in migrated_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.key_registry - .persist_migrated_log(&self.store, &log, self.chain_id) + .persist_migrated_log(&self.store, &log, self.chain_id, timestamp) .await? } Ok(()) } - async fn sync_rent_logs(&self, start: u64, end: u64) -> Result<(), Box> { + async fn sync_rent_logs(&mut self, start: u64, end: u64) -> Result<(), Box> { let rent_logs = self.storage_registry.get_rent_logs(start, end).await?; for log in rent_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.storage_registry - .persist_rent_log(&self.store, &log, self.chain_id) + .persist_rent_log(&self.store, &log, self.chain_id, timestamp) .await? } Ok(()) } - async fn sync_set_max_units_logs(&self, start: u64, end: u64) -> Result<(), Box> { + async fn sync_set_max_units_logs( + &mut self, + start: u64, + end: u64, + ) -> Result<(), Box> { let set_max_units_logs = self .storage_registry .get_set_max_units_logs(start, end) .await?; for log in set_max_units_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.storage_registry - .persist_set_max_units_log(&self.store, &log, self.chain_id) + .persist_set_max_units_log(&self.store, &log, self.chain_id, timestamp) .await? } @@ -196,7 +246,7 @@ impl Indexer { } async fn sync_deprecation_timestamp_logs( - &self, + &mut self, start: u64, end: u64, ) -> Result<(), Box> { @@ -205,15 +255,18 @@ impl Indexer { .get_deprecation_timestamp_logs(start, end) .await?; for log in deprecation_timestamp_logs { + let block_hash = log.block_hash.unwrap(); + let timestamp = self.get_block_timestamp(block_hash).await?; + self.storage_registry - .persist_deprecation_timestamp_log(&self.store, &log, self.chain_id) + .persist_deprecation_timestamp_log(&self.store, &log, self.chain_id, timestamp) .await? } Ok(()) } - pub async fn subscribe(&self, start_block: u64) -> Result<(), Box> { + pub async fn subscribe(&mut self, start_block: u64) -> Result<(), Box> { let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(360)); let mut current_block = start_block; loop { @@ -224,7 +277,7 @@ impl Indexer { } } - pub async fn sync(&self, start_block: u64, end_block: u64) -> Result<(), Box> { + pub async fn sync(&mut self, start_block: u64, end_block: u64) -> Result<(), Box> { let mut current_block = start_block; log::info!( @@ -239,6 +292,9 @@ impl Indexer { (current_block - start_block) as f64 / (end_block - start_block) as f64; log::info!("events sync progress = {:.2}%", percent_complete * 100.0); + // Clear block timestamp cache to avoid overloading it with useless data + self.block_timestamp_cache.clear(); + let start = current_block; let end = current_block + 2000; diff --git a/lib/eth/src/key_registry.rs b/lib/eth/src/key_registry.rs index 8c873f3..53b12fb 100644 --- a/lib/eth/src/key_registry.rs +++ b/lib/eth/src/key_registry.rs @@ -132,6 +132,7 @@ impl Contract { store: &Store, log: &Log, chain_id: u32, + timestamp: i64, ) -> Result<(), Box> { let fid = U256::from_big_endian(log.topics[1].as_bytes()).as_u64(); let key_type = U256::from_big_endian(log.topics[2].as_bytes()).as_u32(); @@ -166,7 +167,7 @@ impl Contract { chain_id, block_number: log.block_number.unwrap().as_u32(), block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(), - block_timestamp: 0, + block_timestamp: timestamp as u64, transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(), log_index: log.log_index.unwrap().as_u32(), fid, @@ -225,6 +226,7 @@ impl Contract { store: &Store, log: &Log, chain_id: u32, + timestamp: i64, ) -> Result<(), Box> { let fid = U256::from_big_endian(log.topics[1].as_bytes()); let key_hash = Address::from(log.topics[2]); @@ -256,7 +258,7 @@ impl Contract { chain_id, block_number: log.block_number.unwrap().as_u32(), block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(), - block_timestamp: 0, + block_timestamp: timestamp as u64, transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(), log_index: log.log_index.unwrap().as_u32(), fid: fid.as_u64(), @@ -298,6 +300,7 @@ impl Contract { store: &Store, log: &Log, chain_id: u32, + timestamp: i64, ) -> Result<(), Box> { let fid = U256::from_big_endian(log.topics[1].as_bytes()); let key_hash = Address::from(log.topics[2]); @@ -327,7 +330,7 @@ impl Contract { chain_id, block_number: log.block_number.unwrap().as_u32(), block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(), - block_timestamp: 0, + block_timestamp: timestamp as u64, transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(), log_index: log.log_index.unwrap().as_u32(), fid: fid.as_u64(), @@ -366,6 +369,7 @@ impl Contract { store: &Store, log: &Log, chain_id: u32, + timestamp: i64, ) -> Result<(), Box> { let parsed_log: Migrated = parse_log(log.clone()).unwrap(); let body = SignerMigratedEventBody { @@ -379,7 +383,7 @@ impl Contract { chain_id, block_number: log.block_number.unwrap().as_u32(), block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(), - block_timestamp: 0, + block_timestamp: timestamp as u64, transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(), log_index: log.log_index.unwrap().as_u32(), fid: 0, diff --git a/lib/eth/src/storage_registry.rs b/lib/eth/src/storage_registry.rs index c0809c5..98c7bac 100644 --- a/lib/eth/src/storage_registry.rs +++ b/lib/eth/src/storage_registry.rs @@ -80,6 +80,7 @@ impl Contract { store: &Store, log: &Log, chain_id: u32, + timestamp: i64, ) -> Result<(), Box> { let parsed_log: Rent = parse_log(log.clone()).unwrap(); let units = parsed_log.units.as_u32(); @@ -98,7 +99,7 @@ impl Contract { chain_id, block_number: log.block_number.unwrap().as_u32(), block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(), - block_timestamp: 0, + block_timestamp: timestamp as u64, transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(), log_index: log.log_index.unwrap().as_u32(), fid, @@ -140,6 +141,7 @@ impl Contract { _store: &Store, log: &Log, _chain_id: u32, + _timestamp: i64, ) -> Result<(), Box> { let parsed_log: SetMaxUnits = parse_log(log.clone()).unwrap(); let _old_max = parsed_log.oldMax.as_u32(); @@ -172,6 +174,7 @@ impl Contract { _store: &Store, log: &Log, _chain_id: u32, + _timestamp: i64, ) -> Result<(), Box> { let parsed_log: SetDeprecationTimestamp = parse_log(log.clone()).unwrap(); let _old_timestamp = parsed_log.oldTimestamp.as_u32(); diff --git a/lib/hub/src/main.rs b/lib/hub/src/main.rs index 71c788c..713cbf2 100644 --- a/lib/hub/src/main.rs +++ b/lib/hub/src/main.rs @@ -70,7 +70,7 @@ async fn main() { let provider = Provider::::try_from(config.optimism_l2_rpc_url).unwrap(); - let indexer = Indexer::new( + let mut indexer = Indexer::new( store.clone(), provider, config.chain_id,