Skip to content

Commit

Permalink
Merge pull request #14 from HerodotusDev/evaluator
Browse files Browse the repository at this point in the history
feat: wip evaluator
  • Loading branch information
rkdud007 authored Feb 4, 2024
2 parents 2d4e2c1 + 7f74d6c commit 6dcd2db
Show file tree
Hide file tree
Showing 31 changed files with 535 additions and 296 deletions.
21 changes: 8 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
resolver = "2"
members = [
"cli",
"crates/types",
"crates/decoder",
"crates/evaluator",
"crates/event-watcher",
Expand All @@ -12,9 +11,9 @@ members = [

[workspace.dependencies]
common = { version = "0.1.0", path = "crates/common" }
types = { version = "0.1.0", path = "crates/types" }
decoder = { version = "0.1.0", path = "crates/decoder" }
fetcher = { version = "0.1.0", path = "crates/fetcher" }
evaluator = { version = "0.1.0", path = "crates/evaluator" }
tokio = { version = "1", features = ["full"] }
alloy-dyn-abi = "0.6.2"
alloy-primitives = "0.6.2"
Expand Down
3 changes: 2 additions & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ edition = "2021"
[dependencies]
clap = { version = "4.4.4", features = ["derive"] }
decoder = { workspace = true }
types = { workspace = true }
common = { workspace = true }
evaluator = { workspace = true }

tokio.workspace = true

Expand Down
25 changes: 8 additions & 17 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::Parser;
use decoder::args_decoder::{datalake_decoder, tasks_decoder};
use types::{datalake::base::Derivable, Datalake};
use evaluator::evaluator;

#[derive(Debug, Parser)]
struct Cli {
Expand All @@ -22,27 +22,18 @@ struct Cli {

fn main() {
let args = Cli::parse();
let mut tasks = tasks_decoder(args.tasks).unwrap();
let mut datalakes = datalake_decoder(args.datalakes).unwrap();
let tasks = tasks_decoder(args.tasks).unwrap();

let datalakes = datalake_decoder(args.datalakes).unwrap();

if tasks.len() != datalakes.len() {
panic!("Tasks and datalakes must have the same length");
}

for (datalake_idx, datalake) in datalakes.iter_mut().enumerate() {
let task = &mut tasks[datalake_idx];

task.datalake = match datalake {
Datalake::BlockSampled(block_datalake) => Some(block_datalake.derive()),
Datalake::DynamicLayout(dynamic_layout_datalake) => {
Some(dynamic_layout_datalake.derive())
}
_ => None,
};
println!("tasks: {:?}\n", tasks);
println!("datalakes: {:?}\n", datalakes);

task.datalake.as_mut().unwrap().compile();
}
println!("tasks: {:?}", tasks);
println!("datalakes: {:?}", datalakes);
let res = evaluator(tasks, Some(datalakes)).unwrap();
println!("res: {:?}", res.result);
println!("rpc_url: {:?}", args.rpc_url);
}
2 changes: 2 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,7 @@ edition = "2021"
[dependencies]
alloy-primitives = { workspace = true }
alloy-rlp = { workspace = true }
alloy-dyn-abi = { workspace = true }
anyhow = { workspace = true }
reth-primitives = { workspace = true }
fetcher = { workspace = true }
14 changes: 9 additions & 5 deletions crates/common/src/block/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::str::FromStr;
use alloy_primitives::{hex, FixedBytes, U256};
use alloy_rlp::{Decodable, Encodable as _, RlpDecodable, RlpEncodable};

use crate::datalake::base::DataPoint;

#[derive(Debug)]
pub enum AccountField {
Nonce,
Expand Down Expand Up @@ -90,12 +92,14 @@ impl AccountField {
}
}

pub fn decode_account_field(account_rlp: &str, field: AccountField) -> String {
pub fn decode_account_field(account_rlp: &str, field: AccountField) -> DataPoint {
let decoded = <Account>::decode(&mut hex::decode(account_rlp).unwrap().as_slice()).unwrap();
match field {
AccountField::Nonce => decoded.nonce.to_string(),
AccountField::Balance => decoded.balance.to_string(),
AccountField::StorageRoot => decoded.storage_root.to_string(),
AccountField::CodeHash => decoded.code_hash.to_string(),
AccountField::Nonce => DataPoint::Int(u64::from_str(&decoded.nonce.to_string()).unwrap()),
AccountField::Balance => {
DataPoint::Int(u64::from_str(&decoded.balance.to_string()).unwrap())
}
AccountField::StorageRoot => DataPoint::Str(decoded.storage_root.to_string()),
AccountField::CodeHash => DataPoint::Str(decoded.code_hash.to_string()),
}
}
62 changes: 41 additions & 21 deletions crates/common/src/block/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use alloy_primitives::hex;
use alloy_rlp::Decodable;
use reth_primitives::Header;

use crate::datalake::base::DataPoint;

#[derive(Debug)]
pub enum HeaderField {
ParentHash,
Expand Down Expand Up @@ -136,30 +138,48 @@ impl FromStr for HeaderField {
}
}

pub fn decode_header_field(header_rlp: &str, field: HeaderField) -> String {
pub fn decode_header_field(header_rlp: &str, field: HeaderField) -> DataPoint {
let decoded =
<Header as Decodable>::decode(&mut hex::decode(header_rlp).unwrap().as_slice()).unwrap();

match field {
HeaderField::ParentHash => decoded.parent_hash.to_string(),
HeaderField::OmmerHash => decoded.ommers_hash.to_string(),
HeaderField::Beneficiary => decoded.beneficiary.to_string(),
HeaderField::StateRoot => decoded.state_root.to_string(),
HeaderField::TransactionsRoot => decoded.transactions_root.to_string(),
HeaderField::ReceiptsRoot => decoded.receipts_root.to_string(),
HeaderField::LogsBloom => decoded.logs_bloom.to_string(),
HeaderField::Difficulty => decoded.difficulty.to_string(),
HeaderField::Number => decoded.number.to_string(),
HeaderField::GasLimit => decoded.gas_limit.to_string(),
HeaderField::GasUsed => decoded.gas_used.to_string(),
HeaderField::Timestamp => decoded.timestamp.to_string(),
HeaderField::ExtraData => decoded.extra_data.to_string(),
HeaderField::MixHash => decoded.mix_hash.to_string(),
HeaderField::Nonce => decoded.nonce.to_string(),
HeaderField::BaseFeePerGas => decoded.base_fee_per_gas.unwrap().to_string(),
HeaderField::WithdrawalsRoot => decoded.withdrawals_root.unwrap().to_string(),
HeaderField::BlobGasUsed => decoded.blob_gas_used.unwrap().to_string(),
HeaderField::ExcessBlobGas => decoded.excess_blob_gas.unwrap().to_string(),
HeaderField::ParentBeaconBlockRoot => decoded.parent_beacon_block_root.unwrap().to_string(),
HeaderField::ParentHash => DataPoint::Str(decoded.parent_hash.to_string()),
HeaderField::OmmerHash => DataPoint::Str(decoded.ommers_hash.to_string()),
HeaderField::Beneficiary => DataPoint::Str(decoded.beneficiary.to_string()),
HeaderField::StateRoot => DataPoint::Str(decoded.state_root.to_string()),
HeaderField::TransactionsRoot => DataPoint::Str(decoded.transactions_root.to_string()),
HeaderField::ReceiptsRoot => DataPoint::Str(decoded.receipts_root.to_string()),
HeaderField::LogsBloom => DataPoint::Str(decoded.logs_bloom.to_string()),
HeaderField::Difficulty => {
DataPoint::Int(u64::from_str(&decoded.difficulty.to_string()).unwrap())
}
HeaderField::Number => DataPoint::Int(u64::from_str(&decoded.number.to_string()).unwrap()),
HeaderField::GasLimit => {
DataPoint::Int(u64::from_str(&decoded.gas_limit.to_string()).unwrap())
}
HeaderField::GasUsed => {
DataPoint::Int(u64::from_str(&decoded.gas_used.to_string()).unwrap())
}
HeaderField::Timestamp => {
DataPoint::Int(u64::from_str(&decoded.timestamp.to_string()).unwrap())
}
HeaderField::ExtraData => DataPoint::Str(decoded.extra_data.to_string()),
HeaderField::MixHash => DataPoint::Str(decoded.mix_hash.to_string()),
HeaderField::Nonce => DataPoint::Int(u64::from_str(&decoded.nonce.to_string()).unwrap()),
HeaderField::BaseFeePerGas => {
DataPoint::Int(u64::from_str(&decoded.base_fee_per_gas.unwrap().to_string()).unwrap())
}
HeaderField::WithdrawalsRoot => {
DataPoint::Str(decoded.withdrawals_root.unwrap().to_string())
}
HeaderField::BlobGasUsed => {
DataPoint::Int(u64::from_str(&decoded.blob_gas_used.unwrap().to_string()).unwrap())
}
HeaderField::ExcessBlobGas => {
DataPoint::Int(u64::from_str(&decoded.excess_blob_gas.unwrap().to_string()).unwrap())
}
HeaderField::ParentBeaconBlockRoot => {
DataPoint::Str(decoded.parent_beacon_block_root.unwrap().to_string())
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::str::FromStr;

use anyhow::Result;
use common::block::{
use crate::block::{
account::{decode_account_field, AccountField},
header::{decode_header_field, HeaderField},
};
use anyhow::Result;
use fetcher::{
example_data::{get_example_accounts, get_example_headers, get_example_storages},
memoizer::Memoizer,
Expand Down Expand Up @@ -44,7 +44,7 @@ pub fn compile_block_sampled_datalake(
HeaderField::from_str(&property.to_uppercase()).unwrap(),
);

aggregation_set.push(DataPoint::Str(value));
aggregation_set.push(value);
}
}
"account" => {
Expand All @@ -61,7 +61,7 @@ pub fn compile_block_sampled_datalake(
AccountField::from_str(&property.to_uppercase()).unwrap(),
);

aggregation_set.push(DataPoint::Str(value));
aggregation_set.push(value);
}
}
"storage" => {
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ use std::fmt;
type DataCompiler = dyn Fn() -> Result<Vec<DataPoint>>;

/// DataPoint is a type that can be used to store data in a Datalake
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone, PartialEq, PartialOrd)]
pub enum DataPoint {
Int(usize),
Int(u64),
Float(f64),
Str(String),
}

Expand Down Expand Up @@ -46,11 +47,12 @@ impl DatalakeBase {
// self.identifier = format!("{}{}", self.identifier, other.identifier);
// }

pub fn compile(&mut self) {
pub fn compile(&mut self) -> Vec<DataPoint> {
self.datapoints.clear();
for compiler in &self.compilation_pipeline {
self.datapoints.extend(compiler().unwrap());
}
self.datapoints.clone()
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use std::str::FromStr;

use crate::{
block::{account::AccountField, header::HeaderField, Collection},
utils::bytes_to_hex_string,
};
use alloy_dyn_abi::{DynSolType, DynSolValue};
use alloy_primitives::{
hex::{self, FromHex},
keccak256, Address, U256,
};
use anyhow::{bail, Result};
use common::{
block::{account::AccountField, header::HeaderField, Collection},
utils::bytes_to_hex_string,
};

use crate::compiler::block_sampled::compile_block_sampled_datalake;

Expand Down
File renamed without changes.
8 changes: 4 additions & 4 deletions crates/types/src/lib.rs → crates/common/src/datalake/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use datalake::{block_sampled::BlockSampledDatalake, dynamic_layout::DynamicLayoutDatalake};
use self::{block_sampled::BlockSampledDatalake, dynamic_layout::DynamicLayoutDatalake};

pub mod compiler;
pub mod datalake;
pub mod task;
pub mod base;
pub mod block_sampled;
pub mod dynamic_layout;

#[derive(Debug, Clone, PartialEq)]
pub enum Datalake {
Expand Down
3 changes: 3 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
pub mod block;
pub mod compiler;
pub mod datalake;
pub mod task;
pub mod utils;
16 changes: 9 additions & 7 deletions crates/types/src/task.rs → crates/common/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use alloy_dyn_abi::{DynSolType, DynSolValue};
use alloy_primitives::{keccak256, U256};
use anyhow::Result;
use common::utils::bytes32_to_utf8_str;

use crate::datalake::base::DatalakeBase;
use crate::{datalake::base::DatalakeBase, utils::bytes32_to_utf8_str};

/// ComputationalTask represents a task for certain datalake with a specified aggregate function
#[derive(Debug)]
Expand Down Expand Up @@ -47,12 +46,15 @@ impl ToString for ComputationalTask {
fn to_string(&self) -> String {
let datalake = self.datalake.as_ref().ok_or("Datalake is None").unwrap();

let identifier = u64::from_str_radix(&datalake.identifier, 16)
.expect("Failed to parse identifier as a hexadecimal number");
let identifier_value = DynSolValue::Uint(U256::from(identifier), 256);
let datalake_identifier =
U256::from_str_radix(&datalake.identifier[2..], 16).expect("Invalid hex string");
let identifier_value = DynSolValue::Uint(datalake_identifier, 256);
let aggregate_fn_id_value = DynSolValue::String(self.aggregate_fn_id.clone());
let aggregate_fn_ctx_value =
DynSolValue::Bytes(self.aggregate_fn_ctx.clone().unwrap().into_bytes());
let aggregate_fn_ctx_value = match &self.aggregate_fn_ctx {
None => DynSolValue::Bytes("".to_string().into_bytes()),
Some(ctx) => DynSolValue::Bytes(ctx.clone().into_bytes()),
};

let header_tuple_value = DynSolValue::Tuple(vec![
identifier_value,
aggregate_fn_id_value,
Expand Down
1 change: 0 additions & 1 deletion crates/decoder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,3 @@ alloy-dyn-abi = { workspace = true }
alloy-primitives = { workspace = true }
anyhow = { workspace = true }
common = { workspace = true }
types = { workspace = true }
4 changes: 2 additions & 2 deletions crates/decoder/src/args_decoder.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use alloy_dyn_abi::DynSolType;
use alloy_primitives::hex::FromHex;
use anyhow::{bail, Ok, Result};
use common::datalake::Datalake;
use common::utils::{bytes_to_hex_string, last_byte_to_u8};
use types::{
use common::{
datalake::{block_sampled::BlockSampledDatalake, dynamic_layout::DynamicLayoutDatalake},
task::ComputationalTask,
Datalake,
};

pub fn tasks_decoder(serialized_tasks_batch: String) -> Result<Vec<ComputationalTask>> {
Expand Down
Loading

0 comments on commit 6dcd2db

Please sign in to comment.