From eb2c7efc19182376cb5e95fc546bab53cc524a04 Mon Sep 17 00:00:00 2001 From: Luis Herasme Date: Fri, 26 Jul 2024 12:07:43 -0400 Subject: [PATCH] feat: Load "event_handler" config dynamically at runtime --- ghost-crab-macros/src/lib.rs | 77 ++----------------------------- ghost-crab/src/event_handler.rs | 15 ++---- ghost-crab/src/indexer/indexer.rs | 64 +++++++++++++++++++------ 3 files changed, 60 insertions(+), 96 deletions(-) diff --git a/ghost-crab-macros/src/lib.rs b/ghost-crab-macros/src/lib.rs index bbf2916..6a44e63 100644 --- a/ghost-crab-macros/src/lib.rs +++ b/ghost-crab-macros/src/lib.rs @@ -1,5 +1,5 @@ extern crate proc_macro; -use ghost_crab_common::config::{self, ExecutionMode}; +use ghost_crab_common::config; use proc_macro::TokenStream; use proc_macro2::{Ident, Literal}; use quote::{format_ident, quote}; @@ -103,51 +103,12 @@ fn create_handler(metadata: TokenStream, input: TokenStream, is_template: bool) let (name, event_name) = get_source_and_event(metadata); let config = config::load().unwrap(); - let abi; - let network; - let execution_mode; - let address; - let start_block; - - if is_template { + let abi = if is_template { let source = config.templates.get(&name).expect("Source not found."); - - abi = source.abi.clone(); - network = source.network.clone(); - execution_mode = source.execution_mode.clone().unwrap_or(ExecutionMode::Parallel); - address = quote! { - Address::ZERO - }; - start_block = Literal::u64_suffixed(0); + Literal::string(&source.abi) } else { let source = config.data_sources.get(&name).expect("Source not found."); - - abi = source.abi.clone(); - network = source.network.clone(); - execution_mode = source.execution_mode.clone().unwrap_or(ExecutionMode::Parallel); - - let address_literal = Literal::string(&source.address[2..]); - - address = quote! { - address!(#address_literal) - }; - start_block = Literal::u64_suffixed(source.start_block); - }; - - let network_config = config.networks.get(&network).expect("RPC url not found for network"); - let rpc_url = Literal::string(&network_config.rpc_url); - let requests_per_second = Literal::u64_suffixed(network_config.requests_per_second); - - let abi = Literal::string(&abi); - let network = Literal::string(&network); - - let execution_mode = match execution_mode { - ExecutionMode::Parallel => quote! { - ExecutionMode::Parallel - }, - ExecutionMode::Serial => quote! { - ExecutionMode::Serial - }, + Literal::string(&source.abi) }; let parsed = parse_macro_input!(input as ItemFn); @@ -187,38 +148,10 @@ fn create_handler(metadata: TokenStream, input: TokenStream, is_template: bool) #fn_body } - fn start_block(&self) -> u64 { - #start_block - } - - fn get_source(&self) -> String { + fn name(&self) -> String { String::from(#data_source) } - fn is_template(&self) -> bool { - #is_template - } - - fn address(&self) -> Address { - #address - } - - fn network(&self) -> String { - String::from(#network) - } - - fn rpc_url(&self) -> String { - String::from(#rpc_url) - } - - fn rate_limit(&self) -> u64 { - #requests_per_second - } - - fn execution_mode(&self) -> ExecutionMode { - #execution_mode - } - fn event_signature(&self) -> String { #contract_name::#event_name::SIGNATURE.to_string() } diff --git a/ghost-crab/src/event_handler.rs b/ghost-crab/src/event_handler.rs index 71d8429..075da6a 100644 --- a/ghost-crab/src/event_handler.rs +++ b/ghost-crab/src/event_handler.rs @@ -38,14 +38,7 @@ pub type EventHandlerInstance = Arc>; #[async_trait] pub trait EventHandler { async fn handle(&self, params: EventContext); - fn get_source(&self) -> String; - fn is_template(&self) -> bool; - fn start_block(&self) -> u64; - fn address(&self) -> Address; - fn network(&self) -> String; - fn rpc_url(&self) -> String; - fn rate_limit(&self) -> u64; - fn execution_mode(&self) -> ExecutionMode; + fn name(&self) -> String; fn event_signature(&self) -> String; } @@ -57,12 +50,12 @@ pub struct ProcessEventsInput { pub handler: EventHandlerInstance, pub templates: TemplateManager, pub provider: CacheProvider, + pub execution_mode: ExecutionMode, } pub async fn process_events( - ProcessEventsInput { start_block, step, address, handler, templates, provider }: ProcessEventsInput, + ProcessEventsInput { start_block, execution_mode, step, address, handler, templates, provider }: ProcessEventsInput, ) -> Result<(), TransportError> { - let execution_mode = handler.execution_mode(); let event_signature = handler.event_signature(); let mut current_block = start_block; @@ -82,7 +75,7 @@ pub async fn process_events( continue; } - let source = handler.get_source(); + let source = handler.name(); println!("[{}] Processing logs from {} to {}", source, current_block, end_block); diff --git a/ghost-crab/src/indexer/indexer.rs b/ghost-crab/src/indexer/indexer.rs index b327ca6..d4b9912 100644 --- a/ghost-crab/src/indexer/indexer.rs +++ b/ghost-crab/src/indexer/indexer.rs @@ -33,24 +33,42 @@ impl Indexer { }) } - pub async fn load_event_handler(&mut self, handler: EventHandlerInstance) { - if handler.is_template() { - return; - } + pub async fn load_event_handler( + &mut self, + handler: EventHandlerInstance, + ) -> Result<(), AddHandlerError> { + let event_config = self + .config + .data_sources + .remove(&handler.name()) + .ok_or(AddHandlerError::NotFound(handler.name()))?; + + let network = self + .config + .networks + .get(&event_config.network) + .ok_or(AddHandlerError::NetworkNotFound(event_config.network.clone()))?; let provider = self .rpc_manager - .get_or_create(handler.network(), handler.rpc_url(), handler.rate_limit()) + .get_or_create( + event_config.network, + network.rpc_url.clone(), + network.requests_per_second, + ) .await; self.handlers.push(ProcessEventsInput { - start_block: handler.start_block(), - address: handler.address(), + start_block: event_config.start_block, + address: event_config.address.parse().unwrap(), step: 10_000, handler, templates: self.templates.clone(), provider, + execution_mode: event_config.execution_mode.unwrap_or(config::ExecutionMode::Parallel), }); + + Ok(()) } pub async fn load_block_handler( @@ -88,7 +106,7 @@ impl Indexer { Ok(()) } - pub async fn start(mut self) { + pub async fn start(mut self) -> Result<(), AddHandlerError> { for block_handler in self.block_handlers { tokio::spawn(async move { if let Err(error) = process_blocks(block_handler).await { @@ -107,11 +125,26 @@ impl Indexer { // For dynamic sources (Templates) while let Some(template) = self.rx.recv().await { - let network = template.handler.network(); - let rpc_url = template.handler.rpc_url(); - let rate_limit = template.handler.rate_limit(); - - let provider = self.rpc_manager.get_or_create(network, rpc_url, rate_limit).await; + let template_config = self + .config + .templates + .get(&template.handler.name()) + .ok_or(AddHandlerError::NotFound(template.handler.name()))?; + + let network = self + .config + .networks + .get(&template_config.network) + .ok_or(AddHandlerError::NetworkNotFound(template_config.network.clone()))?; + + let provider = self + .rpc_manager + .get_or_create( + template_config.network.clone(), + network.rpc_url.clone(), + network.requests_per_second, + ) + .await; let handler = ProcessEventsInput { start_block: template.start_block, @@ -120,6 +153,9 @@ impl Indexer { handler: template.handler, templates: self.templates.clone(), provider, + execution_mode: template_config + .execution_mode + .unwrap_or(config::ExecutionMode::Parallel), }; tokio::spawn(async move { @@ -128,5 +164,7 @@ impl Indexer { } }); } + + Ok(()) } }