Skip to content

Commit

Permalink
Merge pull request #143 from HerodotusDev/feat/input-format
Browse files Browse the repository at this point in the history
refact: input data format for multi chain support
  • Loading branch information
rkdud007 authored Sep 13, 2024
2 parents eeeeb96 + cc02d15 commit a00269c
Show file tree
Hide file tree
Showing 18 changed files with 481 additions and 195 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ _Note: Fields marked with "-" are not applicable for the specified aggregate fun
```sh
# CI check
cargo make run-ci-flow
just run-ci-flow
```
### Local Run
Expand Down
71 changes: 48 additions & 23 deletions hdp/src/preprocessor/compile/datalake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,46 @@ impl Compilable for DatalakeCompute {
&self,
compile_config: &CompilerConfig,
) -> Result<CompilationResult, CompileError> {
// Log the target datalake task being processed
info!("target task: {:#?}", self);
// ========== datalake ==============

// ========== Fetch Provider Configuration ==============
// Retrieve the provider configuration for the specific chain ID of the datalake
let chain_id = self.datalake.get_chain_id();
let target_provider_config = compile_config
.provider_config
.get(&self.datalake.get_chain_id())
.get(&chain_id)
.expect("target task's chain had not been configured.");

// Create a new provider instance from the configuration
let provider = new_provider_from_config(target_provider_config);

// ========== Fetch Proofs ==============
// Fetch the proofs from the provider for the given datalake task
let compiled_block_sampled = provider.fetch_proofs(self).await?;
debug!("values to aggregate : {:#?}", compiled_block_sampled.values);

// ========== compute ==============
let aggregation_fn = &self.compute.aggregate_fn_id;
let fn_context = &self.compute.aggregate_fn_ctx;
let aggregated_result =
aggregation_fn.operation(&compiled_block_sampled.values, Some(fn_context.clone()))?;

Ok(CompilationResult::new(
// ========== Compute Aggregated Result ==============
// Get the aggregation function and its context from the datalake compute
let aggregation_function = &self.compute.aggregate_fn_id;
let function_context = &self.compute.aggregate_fn_ctx;

// Compute the aggregated result using the fetched values and context
let aggregated_result = aggregation_function.operation(
&compiled_block_sampled.values,
Some(function_context.clone()),
)?;

// ========== Return Compilation Result ==============
// Return the compilation result, which is specific to a single chain context
Ok(CompilationResult::from_single_chain(
chain_id.to_numeric_id(),
vec![aggregated_result],
compiled_block_sampled.headers,
compiled_block_sampled.mmr_with_headers,
compiled_block_sampled.accounts,
compiled_block_sampled.storages,
compiled_block_sampled.transactions,
compiled_block_sampled.transaction_receipts,
compiled_block_sampled.mmr_metas,
))
}
}
Expand Down Expand Up @@ -130,14 +146,18 @@ mod tests {
.compile(&compiler_config)
.await
.unwrap();
assert_eq!(results.headers.len(), 16);
assert_eq!(results.accounts.len(), 2);
assert_eq!(results.storages.len(), 1);
// assert_eq!(results.mmr_with_headers[0].headers.len(), 16);
let account_proofs = results.accounts.iter().next().unwrap();
assert_eq!(account_proofs.1.len(), 2);
let storage_proofs = results.storages.iter().next().unwrap();
assert_eq!(storage_proofs.1.len(), 1);
let storage_proofs = storage_proofs.1.iter().next().unwrap();
assert_eq!(storage_proofs.proofs.len(), 6);
assert_eq!(results.transactions.len(), 0);
assert_eq!(results.transaction_receipts.len(), 0);
assert_eq!(results.mmr_metas.len(), 1);
let tx_proofs = results.transactions.iter().next().unwrap();
assert_eq!(tx_proofs.1.len(), 0);
let tx_receipt_proofs = results.transaction_receipts.iter().next().unwrap();
assert_eq!(tx_receipt_proofs.1.len(), 0);
// assert_eq!(results.mmr_metas.len(), 1);
}

#[tokio::test]
Expand Down Expand Up @@ -181,11 +201,16 @@ mod tests {
.compile(&compiler_config)
.await
.unwrap();
assert_eq!(results.headers.len(), 2);
assert_eq!(results.accounts.len(), 0);
assert_eq!(results.storages.len(), 0);
assert_eq!(results.transactions.len(), 10);
assert_eq!(results.transaction_receipts.len(), 11);
assert_eq!(results.mmr_metas.len(), 1);

// assert_eq!(results.headers.len(), 2);
let accounts_proofs = results.accounts.iter().next().unwrap();
assert_eq!(accounts_proofs.1.len(), 0);
let storages_proofs = results.storages.iter().next().unwrap();
assert_eq!(storages_proofs.1.len(), 0);
let tx_proofs = results.transactions.iter().next().unwrap();
assert_eq!(tx_proofs.1.len(), 10);
let tx_receipt_proofs = results.transaction_receipts.iter().next().unwrap();
assert_eq!(tx_receipt_proofs.1.len(), 11);
// assert_eq!(results.mmr_metas.len(), 1);
}
}
149 changes: 122 additions & 27 deletions hdp/src/preprocessor/compile/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use alloy::primitives::U256;

use config::CompilerConfig;
use std::hash::Hash;

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use thiserror::Error;

use crate::primitives::processed_types::block_proofs::{
convert_to_mmr_with_headers, mmr_with_header_vec_to_map, MMRWithHeader, ProcessedBlockProofs,
};
use crate::primitives::processed_types::{
account::ProcessedAccount, header::ProcessedHeader, mmr::MMRMeta, receipt::ProcessedReceipt,
storage::ProcessedStorage, transaction::ProcessedTransaction,
account::ProcessedAccount, receipt::ProcessedReceipt, storage::ProcessedStorage,
transaction::ProcessedTransaction,
};

use crate::provider::error::ProviderError;
Expand Down Expand Up @@ -54,49 +57,141 @@ pub trait Compilable {
pub struct CompilationResult {
/// results of tasks
pub task_results: Vec<U256>,
/// Headers related to the datalake
pub headers: HashSet<ProcessedHeader>,
/// mmr_with_headers related to the datalake
pub mmr_with_headers: HashMap<u128, HashSet<MMRWithHeader>>,
/// Accounts related to the datalake
pub accounts: HashSet<ProcessedAccount>,
pub accounts: HashMap<u128, HashSet<ProcessedAccount>>,
/// Storages related to the datalake
pub storages: HashSet<ProcessedStorage>,
pub storages: HashMap<u128, HashSet<ProcessedStorage>>,
/// Transactions related to the datalake
pub transactions: HashSet<ProcessedTransaction>,
pub transactions: HashMap<u128, HashSet<ProcessedTransaction>>,
/// Transaction receipts related to the datalake
pub transaction_receipts: HashSet<ProcessedReceipt>,
/// MMR meta data related to the headers
pub mmr_metas: HashSet<MMRMeta>,
pub transaction_receipts: HashMap<u128, HashSet<ProcessedReceipt>>,
}

impl CompilationResult {
pub fn new(
task_results: Vec<U256>,
headers: HashSet<ProcessedHeader>,
accounts: HashSet<ProcessedAccount>,
storages: HashSet<ProcessedStorage>,
transactions: HashSet<ProcessedTransaction>,
transaction_receipts: HashSet<ProcessedReceipt>,
mmr_metas: HashSet<MMRMeta>,
mmr_with_headers: HashMap<u128, HashSet<MMRWithHeader>>,
accounts: HashMap<u128, HashSet<ProcessedAccount>>,
storages: HashMap<u128, HashSet<ProcessedStorage>>,
transactions: HashMap<u128, HashSet<ProcessedTransaction>>,
transaction_receipts: HashMap<u128, HashSet<ProcessedReceipt>>,
) -> Self {
Self {
task_results,
headers,
mmr_with_headers,
accounts,
storages,
transactions,
transaction_receipts,
mmr_metas,
}
}

/// Extend the current compilation results with another compilation results
pub fn from_single_chain(
chain_id: u128,
task_results: Vec<U256>,

mmr_with_headers: HashSet<MMRWithHeader>,
accounts: HashSet<ProcessedAccount>,
storages: HashSet<ProcessedStorage>,
transactions: HashSet<ProcessedTransaction>,
transaction_receipts: HashSet<ProcessedReceipt>,
) -> Self {
Self {
task_results,
mmr_with_headers: HashMap::from_iter(vec![(chain_id, mmr_with_headers)]),
accounts: HashMap::from_iter(vec![(chain_id, accounts)]),
storages: HashMap::from_iter(vec![(chain_id, storages)]),
transactions: HashMap::from_iter(vec![(chain_id, transactions)]),
transaction_receipts: HashMap::from_iter(vec![(chain_id, transaction_receipts)]),
}
}

pub fn extend(&mut self, other: CompilationResult) {
self.headers.extend(other.headers);
self.accounts.extend(other.accounts);
self.storages.extend(other.storages);
self.transactions.extend(other.transactions);
self.transaction_receipts.extend(other.transaction_receipts);
self.task_results.extend(other.task_results);
self.mmr_metas.extend(other.mmr_metas);

// Merge mmr_with_headers
merge_header_mmr_maps(&mut self.mmr_with_headers, other.mmr_with_headers);

// Merge accounts
merge_hash_maps(&mut self.accounts, other.accounts);

// Merge storages
merge_hash_maps(&mut self.storages, other.storages);

// Merge transactions
merge_hash_maps(&mut self.transactions, other.transactions);

// Merge transaction_receipts
merge_hash_maps(&mut self.transaction_receipts, other.transaction_receipts);
}

pub fn to_processed_block_vec(self) -> Vec<ProcessedBlockProofs> {
let mut processed_block_vec = Vec::new();

for (chain_id, mmr_with_headers) in self.mmr_with_headers {
let accounts = self.accounts.get(&chain_id).cloned().unwrap_or_default();
let storages = self.storages.get(&chain_id).cloned().unwrap_or_default();
let transactions = self
.transactions
.get(&chain_id)
.cloned()
.unwrap_or_default();
let transaction_receipts = self
.transaction_receipts
.get(&chain_id)
.cloned()
.unwrap_or_default();

let processed_block = ProcessedBlockProofs {
chain_id,
mmr_with_headers: mmr_with_headers.into_iter().collect(),
accounts: accounts.into_iter().collect(),
storages: storages.into_iter().collect(),
transactions: transactions.into_iter().collect(),
transaction_receipts: transaction_receipts.into_iter().collect(),
};

processed_block_vec.push(processed_block);
}

processed_block_vec
}
}

// Helper function to merge HashMaps with HashSet values
fn merge_hash_maps<T>(base: &mut HashMap<u128, HashSet<T>>, other: HashMap<u128, HashSet<T>>)
where
T: Eq + Hash + Clone,
{
for (key, value) in other {
base.entry(key).or_default().extend(value);
}
}

// TODO too complicated. refactor with method in MMRWithHeader
fn merge_header_mmr_maps(
base: &mut HashMap<u128, HashSet<MMRWithHeader>>,
other: HashMap<u128, HashSet<MMRWithHeader>>,
) {
for (key, other_headers) in other {
base.entry(key)
.and_modify(|base_headers| {
// Merge using the extend method from MMRWithHeader
let mut new_headers =
mmr_with_header_vec_to_map(base_headers.iter().cloned().collect::<Vec<_>>());
for item in other_headers.clone() {
new_headers
.entry(item.mmr_meta)
.and_modify(|existing_headers| {
existing_headers.extend(item.headers.iter().cloned());
})
.or_insert_with(|| item.headers.into_iter().collect());
}
let new_headers_vec = convert_to_mmr_with_headers(new_headers);
*base_headers = HashSet::from_iter(new_headers_vec);
})
.or_insert(other_headers);
}
}
Loading

0 comments on commit a00269c

Please sign in to comment.