Skip to content

Commit

Permalink
refactor: The RpcManager is no longer a singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
luis-herasme committed Jul 20, 2024
1 parent 705a7ce commit 4014d57
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 59 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion ghost-crab/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ ghost-crab-macros = { path = "../ghost-crab-macros", version = "0.1.3" }
ghost-crab-common = { path = "../ghost-crab-common" }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
once_cell = "1.19.0"
rocksdb = "0.22.0"
10 changes: 3 additions & 7 deletions ghost-crab/src/block_handler.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::cache::manager::RPC_MANAGER;
use crate::indexer::TemplateManager;
use crate::latest_block_manager::LatestBlockManager;
use alloy::providers::Provider;
Expand Down Expand Up @@ -30,22 +29,19 @@ pub trait BlockHandler {
fn execution_mode(&self) -> ExecutionMode;
}

pub struct BlockConfig {
pub struct ProcessBlocksInput {
pub handler: BlockHandlerInstance,
pub templates: TemplateManager,
pub provider: RootProvider<Http<Client>>,
}

pub async fn process_logs_block(
BlockConfig { handler, templates }: BlockConfig,
ProcessBlocksInput { handler, templates, provider }: ProcessBlocksInput,
) -> Result<(), TransportError> {
let step = handler.step();
let network = handler.network();
let rpc_url = handler.rpc_url();
let start_block = handler.start_block();
let execution_mode = handler.execution_mode();

let provider = RPC_MANAGER.lock().await.get_or_create(network, rpc_url).await;

let mut current_block = start_block;
let mut latest_block_manager =
LatestBlockManager::new(provider.clone(), Duration::from_secs(10));
Expand Down
6 changes: 0 additions & 6 deletions ghost-crab/src/cache/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,7 @@ use super::rpc_proxy::RpcWithCache;
use alloy::providers::ProviderBuilder;
use alloy::providers::RootProvider;
use alloy::transports::http::{Client, Http};
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;

pub static RPC_MANAGER: Lazy<Arc<Mutex<RPCManager>>> =
Lazy::new(|| Arc::new(Mutex::new(RPCManager::new())));

pub struct RPCManager {
current_port: u16,
Expand Down
9 changes: 0 additions & 9 deletions ghost-crab/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,3 @@ pub trait Handler {
fn execution_mode(&self) -> ExecutionMode;
fn event_signature(&self) -> String;
}

#[derive(Clone)]
pub struct HandlerConfig {
pub start_block: u64,
pub step: u64,
pub address: Address,
pub handler: HandleInstance,
pub templates: TemplateManager,
}
67 changes: 42 additions & 25 deletions ghost-crab/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,45 @@
use crate::block_handler::{process_logs_block, BlockConfig, BlockHandlerInstance};
use crate::handler::{HandleInstance, HandlerConfig};
use crate::process_logs::process_logs;
use crate::block_handler::{process_logs_block, BlockHandlerInstance, ProcessBlocksInput};
use crate::cache::manager::RPCManager;
use crate::handler::HandleInstance;
use crate::process_logs::{process_logs, ProcessEventsInput};
use alloy::primitives::Address;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{self, Receiver, Sender};

#[derive(Clone)]
pub struct TemplateManager {
tx: Sender<HandlerConfig>,
}

pub struct Template {
pub start_block: u64,
pub address: Address,
pub handler: HandleInstance,
}

#[derive(Clone)]
pub struct TemplateManager {
tx: Sender<Template>,
}

impl TemplateManager {
pub async fn start(&self, template: Template) -> Result<(), SendError<HandlerConfig>> {
self.tx
.send(HandlerConfig {
start_block: template.start_block,
address: template.address.clone(),
step: 10_000,
handler: template.handler,
templates: self.clone(),
})
.await
pub async fn start(&self, template: Template) -> Result<(), SendError<Template>> {
self.tx.send(template).await
}
}

pub struct Indexer {
handlers: Vec<HandlerConfig>,
block_handlers: Vec<BlockConfig>,
rx: Receiver<HandlerConfig>,
handlers: Vec<ProcessEventsInput>,
rx: Receiver<Template>,
block_handlers: Vec<ProcessBlocksInput>,
templates: TemplateManager,
rpc_manager: RPCManager,
}

impl Indexer {
pub fn new() -> Indexer {
let (tx, rx) = mpsc::channel::<HandlerConfig>(1);
let (tx, rx) = mpsc::channel::<Template>(1);

Indexer {
handlers: Vec::new(),
block_handlers: Vec::new(),
templates: TemplateManager { tx },
rpc_manager: RPCManager::new(),
rx,
}
}
Expand All @@ -54,17 +49,26 @@ impl Indexer {
return;
}

self.handlers.push(HandlerConfig {
let provider = self.rpc_manager.get_or_create(handler.network(), handler.rpc_url()).await;

self.handlers.push(ProcessEventsInput {
start_block: handler.start_block(),
address: handler.address(),
step: 10_000,
handler,
templates: self.templates.clone(),
provider,
});
}

pub async fn load_block_handler(&mut self, handler: BlockHandlerInstance) {
self.block_handlers.push(BlockConfig { handler, templates: self.templates.clone() });
let provider = self.rpc_manager.get_or_create(handler.network(), handler.rpc_url()).await;

self.block_handlers.push(ProcessBlocksInput {
handler,
templates: self.templates.clone(),
provider,
});
}

pub async fn start(mut self) {
Expand All @@ -85,7 +89,20 @@ impl Indexer {
}

// For dynamic sources (Templates)
while let Some(handler) = self.rx.recv().await {
while let Some(template) = self.rx.recv().await {
let network = template.handler.network();
let rpc_url = template.handler.rpc_url();
let provider = self.rpc_manager.get_or_create(network, rpc_url).await;

let handler = ProcessEventsInput {
start_block: template.start_block,
address: template.address,
step: 10_000,
handler: template.handler,
templates: self.templates.clone(),
provider,
};

tokio::spawn(async move {
if let Err(error) = process_logs(handler).await {
println!("Error processing logs for handler: {error}");
Expand Down
5 changes: 2 additions & 3 deletions ghost-crab/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub use crate::cache::manager::RPC_MANAGER;
pub use crate::handler::{Context, Handler, HandlerConfig};
pub use crate::handler::{Context, Handler};
pub use alloy;
pub use alloy::{
sol,
Expand All @@ -18,6 +17,6 @@ pub use crate::config;
pub use crate::indexer;
pub use crate::indexer::Template;
pub use crate::process_logs;
pub use alloy::primitives::Address;
pub use alloy::primitives::address;
pub use alloy::primitives::Address;
pub use alloy::providers::Provider;
23 changes: 16 additions & 7 deletions ghost-crab/src/process_logs.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
use crate::cache::manager::RPC_MANAGER;
use crate::handler::{Context, HandlerConfig};
use crate::handler::{Context, HandleInstance};
use crate::indexer::TemplateManager;
use crate::latest_block_manager::LatestBlockManager;
use alloy::providers::Provider;
use alloy::primitives::Address;
use alloy::providers::{Provider, RootProvider};
use alloy::rpc::types::eth::Filter;
use alloy::transports::http::{Client, Http};
use alloy::transports::TransportError;
use ghost_crab_common::config::ExecutionMode;
use std::time::Duration;

#[derive(Clone)]
pub struct ProcessEventsInput {
pub start_block: u64,
pub address: Address,
pub step: u64,
pub handler: HandleInstance,
pub templates: TemplateManager,
pub provider: RootProvider<Http<Client>>,
}

pub async fn process_logs(
HandlerConfig { start_block, step, address, handler, templates }: HandlerConfig,
ProcessEventsInput { start_block, step, address, handler, templates, provider }: ProcessEventsInput,
) -> Result<(), TransportError> {
let network = handler.network();
let rpc_url = handler.rpc_url();
let execution_mode = handler.execution_mode();
let event_signature = handler.event_signature();

let provider = RPC_MANAGER.lock().await.get_or_create(network, rpc_url).await;
let mut current_block = start_block;
let mut latest_block_manager =
LatestBlockManager::new(provider.clone(), Duration::from_secs(10));
Expand Down

0 comments on commit 4014d57

Please sign in to comment.