Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add block timestamp cache to indexers #10

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 17 additions & 10 deletions lib/eth/src/id_registry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::utils::read_abi;
use core::time;
use ethers::{
contract::{parse_log, Contract as EthContract, ContractInstance, EthEvent},
core::utils::keccak256,
Expand All @@ -7,11 +8,15 @@ use ethers::{
};
use std::error::Error;
use std::sync::Arc;
use teleport_common::protobufs::generated::{
on_chain_event, IdRegisterEventBody, IdRegisterEventType, OnChainEvent, OnChainEventType,
use teleport_common::{
protobufs::generated::{
on_chain_event, IdRegisterEventBody, IdRegisterEventType, OnChainEvent, OnChainEventType,
},
time::to_farcaster_time,
};
use teleport_storage::db::{self};
use teleport_storage::Store;
use uuid::timestamp;

#[derive(Debug, Clone, EthEvent)]
struct Register {
Expand Down Expand Up @@ -93,6 +98,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
store: &Store,
log: &Log,
chain_id: u32,
timestamp: i64,
) -> Result<(), Box<dyn Error>> {
let parsed_log: Register = parse_log(log.clone())?;

Expand All @@ -108,7 +114,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
chain_id,
block_number: log.block_number.unwrap().as_u32(),
block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(),
block_timestamp: 0,
block_timestamp: timestamp as u64,
transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(),
log_index: log.log_index.unwrap().as_u32(),
fid: parsed_log.id.as_u64(),
Expand All @@ -122,9 +128,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {

let fid_row = db::FidRow {
fid: parsed_log.id.as_u64() as i64,
// TODO: there is no efficient way to get the timestamp from the block
// without fetching the block itself in another RPC call
registered_at: 0,
registered_at: timestamp.into(),
chain_event_id: event_row.id,
custody_address: parsed_log.to.to_fixed_bytes(),
recovery_address: parsed_log.recovery.to_fixed_bytes(),
Expand Down Expand Up @@ -155,6 +159,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
store: &Store,
log: &Log,
chain_id: u32,
timestamp: i64,
) -> Result<(), Box<dyn Error>> {
let parsed_log: Transfer = parse_log(log.clone()).unwrap();

Expand All @@ -170,7 +175,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
chain_id,
block_number: log.block_number.unwrap().as_u32(),
block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(),
block_timestamp: 0,
block_timestamp: timestamp as u64,
transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(),
log_index: log.log_index.unwrap().as_u32(),
fid: parsed_log.id.as_u64(),
Expand Down Expand Up @@ -213,6 +218,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
store: &Store,
log: &Log,
chain_id: u32,
timestamp: i64,
) -> Result<(), Box<dyn Error>> {
let parsed_log: Recover = parse_log(log.clone())?;

Expand All @@ -228,7 +234,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
chain_id,
block_number: log.block_number.unwrap().as_u32(),
block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(),
block_timestamp: 0,
block_timestamp: timestamp as u64,
transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(),
log_index: log.log_index.unwrap().as_u32(),
fid: parsed_log.id.as_u64(),
Expand Down Expand Up @@ -272,6 +278,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
store: &Store,
log: &Log,
chain_id: u32,
timestamp: i64,
) -> Result<(), Box<dyn Error>> {
let parsed_log: ChangeRecoveryAddress = parse_log(log.clone())?;

Expand All @@ -287,7 +294,7 @@ impl<T: JsonRpcClient + Clone> Contract<T> {
chain_id,
block_number: log.block_number.unwrap().as_u32(),
block_hash: log.block_hash.unwrap().to_fixed_bytes().to_vec(),
block_timestamp: 0,
block_timestamp: timestamp as u64,
transaction_hash: log.transaction_hash.unwrap().as_bytes().to_vec(),
log_index: log.log_index.unwrap().as_u32(),
fid: parsed_log.id.as_u64(),
Expand Down Expand Up @@ -386,7 +393,7 @@ mod tests {

let logs = id_registry.get_register_logs(0, 100000000).await.unwrap();
id_registry
.persist_register_log(&store, &logs[0], 10u32)
.persist_register_log(&store, &logs[0], 10u32, 0i64)
.await
.unwrap();

Expand Down
104 changes: 80 additions & 24 deletions lib/eth/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use crate::id_registry;
use crate::key_registry;
use crate::storage_registry;
use ethers::types::H256;
use ethers::{
providers::{JsonRpcClient, Middleware, Provider},
types::BlockNumber,
};
use std::collections::HashMap;
use std::error::Error;
use teleport_storage::{db, Store};
use tokio;
Expand All @@ -20,6 +22,7 @@ pub struct Indexer<T> {
id_registry: id_registry::Contract<T>,
key_registry: key_registry::Contract<T>,
storage_registry: storage_registry::Contract<T>,
block_timestamp_cache: HashMap<H256, i64>,
}

impl<T: JsonRpcClient + Clone> Indexer<T> {
Expand Down Expand Up @@ -47,6 +50,7 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
storage_reg_address,
format!("{}/StorageRegistry.json", abi_dir),
)?;
let block_timestamp_cache = HashMap::new();

Ok(Indexer {
store,
Expand All @@ -55,6 +59,7 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
key_registry,
storage_registry,
chain_id,
block_timestamp_cache,
})
}

Expand All @@ -75,41 +80,61 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
Ok(latest_block.number.unwrap().as_u64())
}

async fn sync_register_logs(&self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
pub async fn get_block_timestamp(&mut self, block_hash: H256) -> Result<i64, Box<dyn Error>> {
if let Some(timestamp) = self.block_timestamp_cache.get(&block_hash) {
return Ok(*timestamp);
}

let block = self.provider.get_block(block_hash).await?.unwrap();
let timestamp = block.timestamp.as_u32().into();
self.block_timestamp_cache.insert(block_hash, timestamp);
Ok(timestamp)
}

async fn sync_register_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let register_logs = self.id_registry.get_register_logs(start, end).await?;
for log in register_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.id_registry
.persist_register_log(&self.store, &log, self.chain_id)
.persist_register_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

async fn sync_transfer_logs(&self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
async fn sync_transfer_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let transfer_logs = self.id_registry.get_transfer_logs(start, end).await?;
for log in transfer_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.id_registry
.persist_transfer_log(&self.store, &log, self.chain_id)
.persist_transfer_log(&self.store, &log, self.chain_id, timestamp)
.await?;
}

Ok(())
}

async fn sync_recovery_logs(&self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
async fn sync_recovery_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let recovery_logs = self.id_registry.get_recovery_logs(start, end).await?;
for log in recovery_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.id_registry
.persist_recovery_log(&self.store, &log, self.chain_id)
.persist_recovery_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

async fn sync_change_recovery_address_logs(
&self,
&mut self,
start: u64,
end: u64,
) -> Result<(), Box<dyn Error>> {
Expand All @@ -118,85 +143,110 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
.get_change_recovery_address_logs(start, end)
.await?;
for log in change_recovery_address_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.id_registry
.persist_change_recovery_address_log(&self.store, &log, self.chain_id)
.persist_change_recovery_address_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

async fn sync_add_logs(&self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
async fn sync_add_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let add_logs = self.key_registry.get_add_logs(start, end).await?;
for log in add_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.key_registry
.persist_add_log(&self.store, &log, self.chain_id)
.persist_add_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

async fn sync_remove_logs(&self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
async fn sync_remove_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let remove_logs = self.key_registry.get_remove_logs(start, end).await?;
for log in remove_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.key_registry
.persist_remove_log(&self.store, &log, self.chain_id)
.persist_remove_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

async fn sync_admin_reset_logs(&self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
async fn sync_admin_reset_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let admin_reset_logs = self.key_registry.get_admin_reset_logs(start, end).await?;
for log in admin_reset_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.key_registry
.persist_admin_reset_log(&self.store, &log, self.chain_id)
.persist_admin_reset_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

async fn sync_migrated_logs(&self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
async fn sync_migrated_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let migrated_logs = self.key_registry.get_migrated_logs(start, end).await?;
for log in migrated_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.key_registry
.persist_migrated_log(&self.store, &log, self.chain_id)
.persist_migrated_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

async fn sync_rent_logs(&self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
async fn sync_rent_logs(&mut self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
let rent_logs = self.storage_registry.get_rent_logs(start, end).await?;
for log in rent_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.storage_registry
.persist_rent_log(&self.store, &log, self.chain_id)
.persist_rent_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

async fn sync_set_max_units_logs(&self, start: u64, end: u64) -> Result<(), Box<dyn Error>> {
async fn sync_set_max_units_logs(
&mut self,
start: u64,
end: u64,
) -> Result<(), Box<dyn Error>> {
let set_max_units_logs = self
.storage_registry
.get_set_max_units_logs(start, end)
.await?;
for log in set_max_units_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.storage_registry
.persist_set_max_units_log(&self.store, &log, self.chain_id)
.persist_set_max_units_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

async fn sync_deprecation_timestamp_logs(
&self,
&mut self,
start: u64,
end: u64,
) -> Result<(), Box<dyn Error>> {
Expand All @@ -205,15 +255,18 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
.get_deprecation_timestamp_logs(start, end)
.await?;
for log in deprecation_timestamp_logs {
let block_hash = log.block_hash.unwrap();
let timestamp = self.get_block_timestamp(block_hash).await?;

self.storage_registry
.persist_deprecation_timestamp_log(&self.store, &log, self.chain_id)
.persist_deprecation_timestamp_log(&self.store, &log, self.chain_id, timestamp)
.await?
}

Ok(())
}

pub async fn subscribe(&self, start_block: u64) -> Result<(), Box<dyn Error>> {
pub async fn subscribe(&mut self, start_block: u64) -> Result<(), Box<dyn Error>> {
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(360));
let mut current_block = start_block;
loop {
Expand All @@ -224,7 +277,7 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
}
}

pub async fn sync(&self, start_block: u64, end_block: u64) -> Result<(), Box<dyn Error>> {
pub async fn sync(&mut self, start_block: u64, end_block: u64) -> Result<(), Box<dyn Error>> {
let mut current_block = start_block;

log::info!(
Expand All @@ -239,6 +292,9 @@ impl<T: JsonRpcClient + Clone> Indexer<T> {
(current_block - start_block) as f64 / (end_block - start_block) as f64;
log::info!("events sync progress = {:.2}%", percent_complete * 100.0);

// Clear block timestamp cache to avoid overloading it with useless data
self.block_timestamp_cache.clear();

let start = current_block;
let end = current_block + 2000;

Expand Down
Loading
Loading