Skip to content

Commit

Permalink
Pure state sync refactoring (part-1) (#6249)
Browse files Browse the repository at this point in the history
This pure refactoring of state sync is preparing for
#4. As the rough plan
in
#4 (comment),
there will be two PRs for the state sync refactoring.

This first PR focuses on isolating the function
`process_state_key_values()` as the central point for storing received
state data in memory. This function will later be adapted to forward the
state data directly to the DB layer for persistent sync. A follow-up PR
will handle the encapsulation of `StateSyncMetadata` to support this
persistent storage.

Although there are many commits in this PR, each commit is small and
intentionally incremental to facilitate a smoother review, please review
them commit by commit. Each commit should represent an equivalent
rewrite of the existing logic, with one exception
bb447b2,
which has a slight deviation from the original but is correct IMHO.
Please give this commit special attention during the review.
  • Loading branch information
liuchengxu authored Nov 18, 2024
1 parent 95be9c1 commit 06a68be
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 87 deletions.
10 changes: 10 additions & 0 deletions prdoc/pr_6249.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: Pure state sync refactoring (part-1)

doc:
- audience: Node Dev
description: |
The pure refactoring of state sync is preparing for https://github.com/paritytech/polkadot-sdk/issues/4. This is the first part, focusing on isolating the function `process_state_key_values()` as the central point for storing received state data in memory. This function will later be adapted to forward the state data directly to the DB layer to resolve the OOM issue and support persistent state sync.

crates:
- name: sc-network-sync
bump: none
165 changes: 78 additions & 87 deletions substrate/client/network/sync/src/strategy/state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
//! State sync support.
use crate::{
schema::v1::{StateEntry, StateRequest, StateResponse},
schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse},
LOG_TARGET,
};
use codec::{Decode, Encode};
use log::debug;
use sc_client_api::{CompactProof, ProofProvider};
use sc_client_api::{CompactProof, KeyValueStates, ProofProvider};
use sc_consensus::ImportedState;
use smallvec::SmallVec;
use sp_core::storage::well_known_keys;
Expand Down Expand Up @@ -132,6 +132,80 @@ where
skip_proof,
}
}

fn process_state_key_values(
&mut self,
state_root: Vec<u8>,
key_values: impl IntoIterator<Item = (Vec<u8>, Vec<u8>)>,
) {
let is_top = state_root.is_empty();

let entry = self.state.entry(state_root).or_default();

if entry.0.len() > 0 && entry.1.len() > 1 {
// Already imported child_trie with same root.
// Warning this will not work with parallel download.
return;
}

let mut child_storage_roots = Vec::new();

for (key, value) in key_values {
// Skip all child key root (will be recalculated on import)
if is_top && well_known_keys::is_child_storage_key(key.as_slice()) {
child_storage_roots.push((value, key));
} else {
self.imported_bytes += key.len() as u64;
entry.0.push((key, value));
}
}

for (root, storage_key) in child_storage_roots {
self.state.entry(root).or_default().1.push(storage_key);
}
}

fn process_state_verified(&mut self, values: KeyValueStates) {
for values in values.0 {
self.process_state_key_values(values.state_root, values.key_values);
}
}

fn process_state_unverified(&mut self, response: StateResponse) -> bool {
let mut complete = true;
// if the trie is a child trie and one of its parent trie is empty,
// the parent cursor stays valid.
// Empty parent trie content only happens when all the response content
// is part of a single child trie.
if self.last_key.len() == 2 && response.entries[0].entries.is_empty() {
// Do not remove the parent trie position.
self.last_key.pop();
} else {
self.last_key.clear();
}
for state in response.entries {
debug!(
target: LOG_TARGET,
"Importing state from {:?} to {:?}",
state.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
state.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
);

if !state.complete {
if let Some(e) = state.entries.last() {
self.last_key.push(e.key.clone());
}
complete = false;
}

let KeyValueStateEntry { state_root, entries, complete: _ } = state;
self.process_state_key_values(
state_root,
entries.into_iter().map(|StateEntry { key, value }| (key, value)),
);
}
complete
}
}

impl<B, Client> StateSyncProvider<B> for StateSync<B, Client>
Expand Down Expand Up @@ -181,94 +255,11 @@ where
debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
};

for values in values.0 {
let key_values = if values.state_root.is_empty() {
// Read child trie roots.
values
.key_values
.into_iter()
.filter(|key_value| {
if well_known_keys::is_child_storage_key(key_value.0.as_slice()) {
self.state
.entry(key_value.1.clone())
.or_default()
.1
.push(key_value.0.clone());
false
} else {
true
}
})
.collect()
} else {
values.key_values
};
let entry = self.state.entry(values.state_root).or_default();
if entry.0.len() > 0 && entry.1.len() > 1 {
// Already imported child_trie with same root.
// Warning this will not work with parallel download.
} else if entry.0.is_empty() {
for (key, _value) in key_values.iter() {
self.imported_bytes += key.len() as u64;
}

entry.0 = key_values;
} else {
for (key, value) in key_values {
self.imported_bytes += key.len() as u64;
entry.0.push((key, value))
}
}
}
self.process_state_verified(values);
self.imported_bytes += proof_size;
complete
} else {
let mut complete = true;
// if the trie is a child trie and one of its parent trie is empty,
// the parent cursor stays valid.
// Empty parent trie content only happens when all the response content
// is part of a single child trie.
if self.last_key.len() == 2 && response.entries[0].entries.is_empty() {
// Do not remove the parent trie position.
self.last_key.pop();
} else {
self.last_key.clear();
}
for state in response.entries {
debug!(
target: LOG_TARGET,
"Importing state from {:?} to {:?}",
state.entries.last().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
state.entries.first().map(|e| sp_core::hexdisplay::HexDisplay::from(&e.key)),
);

if !state.complete {
if let Some(e) = state.entries.last() {
self.last_key.push(e.key.clone());
}
complete = false;
}
let is_top = state.state_root.is_empty();
let entry = self.state.entry(state.state_root).or_default();
if entry.0.len() > 0 && entry.1.len() > 1 {
// Already imported child trie with same root.
} else {
let mut child_roots = Vec::new();
for StateEntry { key, value } in state.entries {
// Skip all child key root (will be recalculated on import).
if is_top && well_known_keys::is_child_storage_key(key.as_slice()) {
child_roots.push((value, key));
} else {
self.imported_bytes += key.len() as u64;
entry.0.push((key, value))
}
}
for (root, storage_key) in child_roots {
self.state.entry(root).or_default().1.push(storage_key);
}
}
}
complete
self.process_state_unverified(response)
};
if complete {
self.complete = true;
Expand Down

0 comments on commit 06a68be

Please sign in to comment.