Skip to content

Commit

Permalink
feat: wip evaluator
Browse files Browse the repository at this point in the history
  • Loading branch information
rkdud007 committed Feb 2, 2024
1 parent 2d4e2c1 commit 73f0ec6
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 19 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ common = { version = "0.1.0", path = "crates/common" }
types = { version = "0.1.0", path = "crates/types" }
decoder = { version = "0.1.0", path = "crates/decoder" }
fetcher = { version = "0.1.0", path = "crates/fetcher" }
evaluator = { version = "0.1.0", path = "crates/evaluator" }
tokio = { version = "1", features = ["full"] }
alloy-dyn-abi = "0.6.2"
alloy-primitives = "0.6.2"
Expand Down
1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition = "2021"
clap = { version = "4.4.4", features = ["derive"] }
decoder = { workspace = true }
types = { workspace = true }
evaluator = { workspace = true }

tokio.workspace = true

Expand Down
23 changes: 5 additions & 18 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::Parser;
use decoder::args_decoder::{datalake_decoder, tasks_decoder};
use types::{datalake::base::Derivable, Datalake};
use evaluator::evaluator;

#[derive(Debug, Parser)]
struct Cli {
Expand All @@ -22,27 +22,14 @@ struct Cli {

fn main() {
let args = Cli::parse();
let mut tasks = tasks_decoder(args.tasks).unwrap();
let mut datalakes = datalake_decoder(args.datalakes).unwrap();
let tasks = tasks_decoder(args.tasks).unwrap();
let datalakes = datalake_decoder(args.datalakes).unwrap();

if tasks.len() != datalakes.len() {
panic!("Tasks and datalakes must have the same length");
}

for (datalake_idx, datalake) in datalakes.iter_mut().enumerate() {
let task = &mut tasks[datalake_idx];

task.datalake = match datalake {
Datalake::BlockSampled(block_datalake) => Some(block_datalake.derive()),
Datalake::DynamicLayout(dynamic_layout_datalake) => {
Some(dynamic_layout_datalake.derive())
}
_ => None,
};

task.datalake.as_mut().unwrap().compile();
}
println!("tasks: {:?}", tasks);
println!("datalakes: {:?}", datalakes);
let res = evaluator(tasks, Some(datalakes)).unwrap();
println!("tasks: {:?}", res.result);
println!("rpc_url: {:?}", args.rpc_url);
}
2 changes: 2 additions & 0 deletions crates/evaluator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ version = "0.1.0"
edition = "2021"

[dependencies]
types = { workspace = true }
anyhow = { workspace = true }
13 changes: 13 additions & 0 deletions crates/evaluator/src/aggregation_functions/avg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use types::datalake::base::DataPoint;

// TODO: Implement the avg function
pub fn avg(values: &Vec<DataPoint>) -> usize {
let mut sum = 0;
for value in values {
match value {
DataPoint::Str(_) => panic!("String value found"),
DataPoint::Int(int) => sum += int,
}
}
sum / values.len()
}
30 changes: 30 additions & 0 deletions crates/evaluator/src/aggregation_functions/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use types::datalake::base::DataPoint;

pub mod avg;

/// Get [`AggregationFunction`] from function id
pub fn get_aggregation_function_type(function_id: String) -> AggregationFunction {
match function_id.as_str() {
"AVG" => AggregationFunction::Avg,
_ => panic!("Unknown aggregation function"),
}
}

/// Aggregation function types
pub enum AggregationFunction {
Avg,
}

impl AggregationFunction {
pub fn get_index(&self) -> usize {
match self {
AggregationFunction::Avg => 0,
}
}

pub fn operation(&self, values: &Vec<DataPoint>) -> usize {
match self {
AggregationFunction::Avg => avg::avg(values),
}
}
}
59 changes: 59 additions & 0 deletions crates/evaluator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1,60 @@
use aggregation_functions::get_aggregation_function_type;
use anyhow::{bail, Result};
use std::collections::HashMap;

pub mod aggregation_functions;

use types::{datalake::base::Derivable, task::ComputationalTask, Datalake};

pub struct EvaluationResult {
pub result: HashMap<String, usize>,
}

impl EvaluationResult {
pub fn new() -> Self {
EvaluationResult {
result: HashMap::new(),
}
}
pub fn merkle_commit(&self) -> String {
"merkle_commit".to_string()
}
}

impl Default for EvaluationResult {
fn default() -> Self {
EvaluationResult::new()
}
}

pub fn evaluator(
mut compute_expressions: Vec<ComputationalTask>,
datalake_for_tasks: Option<Vec<Datalake>>,
) -> Result<EvaluationResult> {
let mut results = EvaluationResult::new();
// If optional datalake_for_tasks is provided, need to assign the datalake to the corresponding task
if let Some(datalake) = datalake_for_tasks {
for (datalake_idx, datalake) in datalake.iter().enumerate() {
let task = &mut compute_expressions[datalake_idx];

task.datalake = match datalake {
Datalake::BlockSampled(block_datalake) => Some(block_datalake.derive()),
Datalake::DynamicLayout(dynamic_layout_datalake) => {
Some(dynamic_layout_datalake.derive())
}
_ => bail!("Unknown datalake type"),
};
}
}

// Evaulate the compute expressions
for compute_expression in compute_expressions {
let computation_task_id = compute_expression.to_string();
let datapoints = compute_expression.datalake.unwrap().compile();
let aggregation_fn = get_aggregation_function_type(compute_expression.aggregate_fn_id);
let result = aggregation_fn.operation(&datapoints);
results.result.insert(computation_task_id, result);
}

Ok(results)
}
3 changes: 2 additions & 1 deletion crates/types/src/datalake/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ impl DatalakeBase {
// self.identifier = format!("{}{}", self.identifier, other.identifier);
// }

pub fn compile(&mut self) {
pub fn compile(&mut self) -> Vec<DataPoint> {
self.datapoints.clear();
for compiler in &self.compilation_pipeline {
self.datapoints.extend(compiler().unwrap());
}
self.datapoints.clone()
}
}

Expand Down

0 comments on commit 73f0ec6

Please sign in to comment.