Skip to content

Commit

Permalink
Merge pull request #51 from HerodotusDev/refact/datalake
Browse files Browse the repository at this point in the history
rework
  • Loading branch information
rkdud007 authored Mar 29, 2024
2 parents 3fa853e + b64c882 commit fbf1c61
Show file tree
Hide file tree
Showing 38 changed files with 1,666 additions and 1,908 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ tokio = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
hdp-provider = { workspace = true }
hdp-primitives = { workspace = true }

clap = { version = "4.4.4", features = ["derive"] }
dotenv = "0.15.0"
Expand Down
50 changes: 23 additions & 27 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use anyhow::{bail, Result};
use hdp_primitives::datalake::{
block_sampled::BlockSampledDatalake, envelope::DatalakeEnvelope,
transactions::TransactionsDatalake,
};
use std::{sync::Arc, vec};
use tracing_subscriber::FmtSubscriber;

Expand All @@ -9,7 +13,6 @@ use hdp_core::{
tasks_encoder,
},
config::Config,
datalake::Datalake,
evaluator::evaluator,
task::ComputationalTask,
};
Expand Down Expand Up @@ -130,7 +133,7 @@ enum DataLakeCommands {

struct DecodeMultipleResult {
tasks: Vec<ComputationalTask>,
datalakes: Vec<Datalake>,
datalakes: Vec<DatalakeEnvelope>,
}

struct EncodeMultipleResult {
Expand All @@ -155,7 +158,7 @@ async fn handle_decode_multiple(datalakes: String, tasks: String) -> Result<Deco

async fn handle_encode_multiple(
tasks: Vec<ComputationalTask>,
datalakes: Vec<Datalake>,
datalakes: Vec<DatalakeEnvelope>,
) -> Result<EncodeMultipleResult> {
let encoded_datalakes = datalakes_encoder(datalakes)?;
info!("Encoded datalakes: {}", encoded_datalakes);
Expand Down Expand Up @@ -185,7 +188,7 @@ async fn handle_run(

match evaluator(
decoded_result.tasks,
Some(decoded_result.datalakes),
decoded_result.datalakes,
Arc::new(RwLock::new(provider)),
)
.await
Expand Down Expand Up @@ -239,14 +242,13 @@ async fn main() -> Result<()> {
sampled_property,
increment,
} => {
let block_sampled_datalake =
hdp_core::datalake::block_sampled::BlockSampledDatalake::new(
block_range_start,
block_range_end,
sampled_property,
increment,
);
Datalake::BlockSampled(block_sampled_datalake)
let block_sampled_datalake = BlockSampledDatalake::new(
block_range_start,
block_range_end,
sampled_property,
increment,
)?;
DatalakeEnvelope::BlockSampled(block_sampled_datalake)
}
DataLakeCommands::Transactions {
address,
Expand All @@ -255,28 +257,22 @@ async fn main() -> Result<()> {
sampled_property,
increment,
} => {
let transactions_datalake =
hdp_core::datalake::transactions::TransactionsDatalake::new(
address,
from_nonce,
to_nonce,
sampled_property,
increment,
)?;
Datalake::Transactions(transactions_datalake)
let transactions_datalake = TransactionsDatalake::new(
address,
from_nonce,
to_nonce,
sampled_property,
increment,
)?;
DatalakeEnvelope::Transactions(transactions_datalake)
}
};

let encoded_result = handle_encode_multiple(
vec![ComputationalTask::new(
None,
aggregate_fn_id,
aggregate_fn_ctx,
)],
vec![ComputationalTask::new(aggregate_fn_id, aggregate_fn_ctx)],
vec![datalake],
)
.await?;

// if allow_run is true, then run the evaluator
if allow_run {
handle_run(
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
133 changes: 58 additions & 75 deletions crates/core/src/codec.rs

Large diffs are not rendered by default.

115 changes: 44 additions & 71 deletions crates/core/src/compiler/block_sampled.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,44 @@
use hex::FromHex;
use std::{str::FromStr, sync::Arc};

use crate::datalake::base::DatalakeResult;
use alloy_primitives::{hex, keccak256};
use anyhow::{bail, Result};
use hdp_primitives::{
block::{
account::{decode_account_field, AccountField},
header::{decode_header_field, HeaderField},
},
format::{Account, Header, HeaderProof, MPTProof, Storage},
use hdp_primitives::datalake::{
block_sampled::{BlockSampledCollection, BlockSampledDatalake},
DatalakeField,
};
use std::sync::Arc;

use alloy_primitives::keccak256;
use anyhow::Result;

use hdp_primitives::datalake::block_sampled::types::{
Account, Header, HeaderProof, MPTProof, Storage,
};
use hdp_provider::evm::AbstractProvider;
use tokio::sync::RwLock;

use super::CompiledDatalake;

pub async fn compile_block_sampled_datalake(
block_range_start: u64,
block_range_end: u64,
sampled_property: &str,
increment: u64,
datalake: BlockSampledDatalake,
provider: &Arc<RwLock<AbstractProvider>>,
) -> Result<DatalakeResult> {
) -> Result<CompiledDatalake> {
let mut abstract_provider = provider.write().await;
let property_parts: Vec<&str> = sampled_property.split('.').collect();
let collection = property_parts[0];

let mut aggregation_set: Vec<String> = Vec::new();
// we don't need range
let target_block_range: Vec<u64> = (block_range_start..=block_range_end)
.step_by(increment as usize)
.collect();

let full_header_and_proof_result = abstract_provider
.get_sequencial_full_header_with_proof(block_range_start, block_range_end)
.get_sequencial_full_header_with_proof(datalake.block_range_start, datalake.block_range_end)
.await?;
let mmr_meta = full_header_and_proof_result.1;
let mut headers: Vec<Header> = vec![];
let mut accounts: Vec<Account> = vec![];
let mut storages: Vec<Storage> = vec![];

match collection {
"header" => {
let property = property_parts[1];

for block in target_block_range {
match datalake.sampled_property {
BlockSampledCollection::Header(property) => {
for block in datalake.block_range_start..=datalake.block_range_end {
if block % datalake.increment != 0 {
continue;
}
let fetched_block = full_header_and_proof_result.0.get(&block).unwrap().clone();

let value = decode_header_field(
&fetched_block.0,
HeaderField::from_str(&property.to_uppercase()).unwrap(),
);
let value = property.decode_field_from_rlp(&fetched_block.0);

headers.push(Header {
rlp: fetched_block.0,
Expand All @@ -62,34 +51,27 @@ pub async fn compile_block_sampled_datalake(
aggregation_set.push(value);
}
}
"account" => {
let address = property_parts[1];
let property = property_parts[2];

BlockSampledCollection::Account(address, property) => {
let accounts_and_proofs_result = abstract_provider
.get_range_account_with_proof(
block_range_start,
block_range_end,
increment,
datalake.block_range_start,
datalake.block_range_end,
datalake.increment,
address.to_string(),
)
.await?;

let mut account_proofs: Vec<MPTProof> = vec![];
// let mut encoded_account = "".to_string();

for i in block_range_start..=block_range_end {
if i % increment != 0 {
for block in datalake.block_range_start..=datalake.block_range_end {
if block % datalake.increment != 0 {
continue;
}
let fetched_block = full_header_and_proof_result.0.get(&i).unwrap().clone();
let acc = accounts_and_proofs_result.get(&i).unwrap().clone();
let fetched_block = full_header_and_proof_result.0.get(&block).unwrap().clone();
let acc = accounts_and_proofs_result.get(&block).unwrap().clone();
// encoded_account = acc.0.clone();

let value = decode_account_field(
&acc.0,
AccountField::from_str(&property.to_uppercase()).unwrap(),
);
let value = property.decode_field_from_rlp(&acc.0);

headers.push(Header {
rlp: fetched_block.0,
Expand All @@ -100,33 +82,27 @@ pub async fn compile_block_sampled_datalake(
});

let account_proof = MPTProof {
block_number: i,
block_number: block,
proof: acc.1,
};

account_proofs.push(account_proof);

aggregation_set.push(value);
}

let address_bytes = Vec::from_hex(address).expect("Invalid hex string");
let account_key = keccak256(address_bytes);

let account_key = keccak256(address);
accounts.push(Account {
address: address.to_string(),
account_key: account_key.to_string(),
proofs: account_proofs,
});
}
"storage" => {
let address = property_parts[1];
let slot = property_parts[2];

BlockSampledCollection::Storage(address, slot) => {
let storages_and_proofs_result = abstract_provider
.get_range_storage_with_proof(
block_range_start,
block_range_end,
increment,
datalake.block_range_start,
datalake.block_range_end,
datalake.increment,
address.to_string(),
slot.to_string(),
)
Expand All @@ -135,8 +111,8 @@ pub async fn compile_block_sampled_datalake(
let mut storage_proofs: Vec<MPTProof> = vec![];
let mut account_proofs: Vec<MPTProof> = vec![];

for i in block_range_start..=block_range_end {
if i % increment != 0 {
for i in datalake.block_range_start..=datalake.block_range_end {
if i % datalake.increment != 0 {
continue;
}
let fetched_block = full_header_and_proof_result.0.get(&i).unwrap().clone();
Expand All @@ -162,11 +138,9 @@ pub async fn compile_block_sampled_datalake(

aggregation_set.push(acc_and_storage.2);
}
let slot_bytes = Vec::from_hex(slot).expect("Invalid hex string");
let storage_key = keccak256(slot_bytes).to_string();

let address_bytes = Vec::from_hex(address).expect("Invalid hex string");
let account_key = keccak256(address_bytes);
let storage_key = keccak256(slot).to_string();
let account_key = keccak256(address);

storages.push(Storage {
address: address.to_string(),
Expand All @@ -180,11 +154,10 @@ pub async fn compile_block_sampled_datalake(
proofs: account_proofs,
});
}
_ => bail!("Unknown collection type"),
}

Ok(DatalakeResult {
compiled_results: aggregation_set,
Ok(CompiledDatalake {
values: aggregation_set,
headers,
accounts,
storages,
Expand Down
Loading

0 comments on commit fbf1c61

Please sign in to comment.