Skip to content

Commit

Permalink
Decouple network subnets from das-core
Browse files Browse the repository at this point in the history
  • Loading branch information
hangleang committed Jan 7, 2025
1 parent 1b44b10 commit 65df433
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 108 deletions.
160 changes: 81 additions & 79 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions eip_7594/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ workspace = true
anyhow = { workspace = true }
c-kzg = { workspace = true }
helper_functions = { workspace = true }
itertools = { workspace = true }
kzg = { workspace = true }
num-traits = { workspace = true }
ssz = { workspace = true}
Expand Down
62 changes: 42 additions & 20 deletions eip_7594/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::HashSet;

use anyhow::{ensure, Result};
use c_kzg::{
Blob as CKzgBlob, Bytes48, Cell as CKzgCell, KzgProof as CKzgProof, CELLS_PER_EXT_BLOB,
};
use helper_functions::{misc, predicates::is_valid_merkle_branch};
use itertools::Itertools as _;
use kzg as _;
use num_traits::One as _;
use sha2::{Digest as _, Sha256};
Expand All @@ -18,7 +21,7 @@ use types::{
containers::{DataColumnSidecar, MatrixEntry},
primitives::{Cell, ColumnIndex, CustodyIndex},
},
phase0::primitives::NodeId,
phase0::primitives::{NodeId, SubnetId},
preset::Preset,
traits::SignedBeaconBlock as _,
};
Expand All @@ -36,10 +39,10 @@ mod tests;
use prometheus_metrics::METRICS;

pub fn get_custody_groups(
node_id: NodeId,
raw_node_id: [u8; 32],
custody_group_count: u64,
config: &Config,
) -> Result<impl Iterator<Item = CustodyIndex>> {
) -> Result<HashSet<CustodyIndex>> {
let number_of_custody_groups = config.number_of_custody_groups;
ensure!(
custody_group_count <= number_of_custody_groups,
Expand All @@ -49,12 +52,12 @@ pub fn get_custody_groups(
},
);

let mut custody_groups = vec![];
let mut current_id = node_id;
let mut current_id = NodeId::from_be_bytes(raw_node_id);

let mut custody_groups = HashSet::new();
while (custody_groups.len() as u64) < custody_group_count {
let mut hasher = Sha256::new();
let mut bytes: [u8; 32] = [0; 32];
let mut bytes = [0u8; 32];

current_id.into_raw().to_little_endian(&mut bytes);

Expand All @@ -66,21 +69,20 @@ pub fn get_custody_groups(
];

let output_prefix_u64 = u64::from_le_bytes(output_prefix);
let custody_group = output_prefix_u64 % number_of_custody_groups;

if !custody_groups.contains(&custody_group) {
custody_groups.push(custody_group);
}
let custody_group = output_prefix_u64
.checked_rem(number_of_custody_groups)
.expect("number of custody groups must not be zero");
custody_groups.insert(custody_group);

if current_id == Uint256::MAX {
// > Overflow prevention
current_id = Uint256::ZERO;
} else {
current_id = current_id + Uint256::one();
}

current_id = current_id + Uint256::one();
}

custody_groups.sort_unstable();
Ok(custody_groups.into_iter())
Ok(custody_groups)
}

pub fn compute_columns_for_custody_group(
Expand Down Expand Up @@ -108,12 +110,32 @@ pub fn compute_columns_for_custody_group(
Ok(columns.into_iter())
}

pub fn compute_custody_requirement_groups(
node_id: NodeId,
pub fn compute_subnets_from_custody_group(
custody_group: CustodyIndex,
config: &Config,
) -> Result<impl Iterator<Item = SubnetId> + '_> {
let subnets = compute_columns_for_custody_group(custody_group, config)?
.map(|column_index| misc::compute_subnet_for_data_column_sidecar(config, column_index))
.unique();

Ok(subnets)
}

pub fn compute_subnets_for_node(
raw_node_id: [u8; 32],
custody_group_count: u64,
config: &Config,
) -> impl Iterator<Item = CustodyIndex> {
get_custody_groups(node_id, config.custody_requirement, config)
.expect("compute must be success with custody requirement")
) -> Result<HashSet<SubnetId>> {
let custody_groups = get_custody_groups(raw_node_id, custody_group_count, config)?;

let mut subnets = HashSet::new();
for custody_group in custody_groups {
let custody_group_subnets = compute_subnets_from_custody_group(custody_group, config)?;

subnets.extend(custody_group_subnets);
}

Ok(subnets)
}

/// Verify if the data column sidecar is valid.
Expand Down
9 changes: 7 additions & 2 deletions eip_7594/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,20 @@ fn run_case<P: Preset>(case: Case) {
} = case.yaml::<Meta>("meta");

let config = P::default_config().start_and_stay_in(Phase::Fulu);
let custody_groups = get_custody_groups(node_id, custody_group_count, &config)
let mut raw_node_id = [0u8; 32];
node_id.into_raw().to_big_endian(&mut raw_node_id);

let custody_groups = get_custody_groups(raw_node_id, custody_group_count, &config)
.expect("custody groups must be valid");

let custody_columns = custody_groups
let mut custody_columns = custody_groups
.into_iter()
.flat_map(|group| {
compute_columns_for_custody_group(group, &config)
.expect("there should not be any invalid custody group")
})
.collect::<Vec<_>>();
custody_columns.sort_unstable();

assert_eq!(custody_columns, result);
}
Expand Down
12 changes: 8 additions & 4 deletions p2p/src/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,14 @@ impl SyncManager {
let data_column_serve_range_slot =
misc::data_column_serve_range_slot::<P>(config, current_slot);
if data_column_serve_range_slot < max_slot {
match self
.map_peer_custody_columns(&self.network_globals.sampling_columns, None)
{
let columns = self
.network_globals()
.sampling_columns
.iter()
.copied()
.collect::<Vec<_>>();

match self.map_peer_custody_columns(&columns, None) {
Ok(peer_custody_columns_mapping) => {
for (peer_id, columns) in peer_custody_columns_mapping {
sync_batches.push(SyncBatch {
Expand All @@ -363,7 +368,6 @@ impl SyncManager {
),
);

let columns = self.network_globals().sampling_columns.clone();
sync_batches.push(SyncBatch {
target: SyncTarget::DataColumnSidecar(columns),
direction: SyncDirection::Forward,
Expand Down
4 changes: 2 additions & 2 deletions runtime/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,8 @@ pub async fn run_after_genesis<P: Preset>(

// TODO(peerdas-fulu): figure out the way to check sampling columns without storing the indices in sync_manager
if chain_config.is_eip7594_scheduled() {
let sampling_columns = &network.network_globals().sampling_columns;
controller.on_store_sampling_columns(sampling_columns);
let sampling_columns = network.network_globals().sampling_columns.iter().copied().collect::<Vec<_>>();
controller.on_store_sampling_columns(&sampling_columns);
}

let block_sync_service_channels = BlockSyncServiceChannels {
Expand Down
14 changes: 14 additions & 0 deletions types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,20 @@ impl Config {
}
}

#[must_use]
pub const fn custody_group_count(&self, subscribe_all_data_column_subnets: bool) -> u64 {
if subscribe_all_data_column_subnets {
self.number_of_custody_groups
} else {
self.custody_requirement
}
}

#[must_use]
pub fn sampling_size(&self, custody_group_count: u64) -> u64 {
core::cmp::max(custody_group_count, self.samples_per_slot)
}

fn fork_slots<P: Preset>(&self) -> impl Iterator<Item = (Phase, Toption<Slot>)> + '_ {
enum_iterator::all().map(|phase| (phase, self.fork_slot::<P>(phase)))
}
Expand Down

0 comments on commit 65df433

Please sign in to comment.