Skip to content

Commit

Permalink
Fix order of resending messages after restart (paritytech#6729)
Browse files Browse the repository at this point in the history
The way we build the messages we need to send to approval-distribution
can result in a situation where is we have multiple assignments covered
by a coalesced approval, the messages are sent in this order:

ASSIGNMENT1, APPROVAL, ASSIGNMENT2, because we iterate over each
candidate and add to the queue of messages both the assignment and the
approval for that candidate, and when the approval reaches the
approval-distribution subsystem it won't be imported and gossiped
because one of the assignment for it is not known.

So in a network where a lot of nodes are restarting at the same time we
could end up in a situation where a set of the nodes correctly received
the assignments and approvals before the restart and approve their
blocks and don't trigger their assignments. The other set of nodes
should receive the assignments and approvals after the restart, but
because the approvals never get broacasted anymore because of this bug,
the only way they could approve is if other nodes start broadcasting
their assignments.

I think this bug contribute to the reason the network did not recovered
on `25-11-25 15:55:40` after the restarts.

Tested this scenario with a `zombienet` where `nodes` are finalising
blocks because of aggression and all nodes are restarted at once and
confirmed the network lags and doesn't recover before and it does after
the fix

---------

Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
  • Loading branch information
alexggh authored and dudo50 committed Jan 4, 2025
1 parent 4ebdf79 commit 7a08623
Show file tree
Hide file tree
Showing 3 changed files with 338 additions and 3 deletions.
12 changes: 9 additions & 3 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,8 +1582,9 @@ async fn handle_actions<
session_info_provider,
)
.await?;

approval_voting_sender.send_messages(messages.into_iter()).await;
for message in messages.into_iter() {
approval_voting_sender.send_unbounded_message(message);
}
let next_actions: Vec<Action> =
next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect();

Expand Down Expand Up @@ -1668,6 +1669,7 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi

let mut approval_meta = Vec::with_capacity(all_blocks.len());
let mut messages = Vec::new();
let mut approvals = Vec::new();
let mut actions = Vec::new();

messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value.
Expand Down Expand Up @@ -1839,7 +1841,7 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi
if signatures_queued
.insert(approval_sig.signed_candidates_indices.clone())
{
messages.push(ApprovalDistributionMessage::DistributeApproval(
approvals.push(ApprovalDistributionMessage::DistributeApproval(
IndirectSignedApprovalVoteV2 {
block_hash,
candidate_indices: approval_sig.signed_candidates_indices,
Expand All @@ -1864,6 +1866,10 @@ async fn distribution_messages_for_activation<Sender: SubsystemSender<RuntimeApi
}

messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta);
// Approvals are appended at the end, to make sure all assignments are sent
// before the approvals, otherwise if they arrive ahead in approval-distribution
// they will be ignored.
messages.extend(approvals.into_iter());
Ok((messages, actions))
}

Expand Down
314 changes: 314 additions & 0 deletions polkadot/node/core/approval-voting/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4459,6 +4459,114 @@ async fn setup_overseer_with_two_blocks_each_with_one_assignment_triggered(
assert!(our_assignment.triggered());
}

// Builds a chain with a fork where both relay blocks include the same candidate.
async fn build_chain_with_block_with_two_candidates(
block_hash1: Hash,
slot: Slot,
sync_oracle_handle: TestSyncOracleHandle,
candidate_receipt: Vec<CandidateReceipt>,
) -> (ChainBuilder, SessionInfo) {
let validators = vec![
Sr25519Keyring::Alice,
Sr25519Keyring::Bob,
Sr25519Keyring::Charlie,
Sr25519Keyring::Dave,
Sr25519Keyring::Eve,
];
let session_info = SessionInfo {
validator_groups: IndexedVec::<GroupIndex, Vec<ValidatorIndex>>::from(vec![
vec![ValidatorIndex(0), ValidatorIndex(1)],
vec![ValidatorIndex(2)],
vec![ValidatorIndex(3), ValidatorIndex(4)],
]),
..session_info(&validators)
};

let candidates = Some(
candidate_receipt
.iter()
.enumerate()
.map(|(i, receipt)| (receipt.clone(), CoreIndex(i as u32), GroupIndex(i as u32)))
.collect(),
);
let mut chain_builder = ChainBuilder::new();

chain_builder
.major_syncing(sync_oracle_handle.is_major_syncing.clone())
.add_block(
block_hash1,
ChainBuilder::GENESIS_HASH,
1,
BlockConfig {
slot,
candidates: candidates.clone(),
session_info: Some(session_info.clone()),
end_syncing: true,
},
);
(chain_builder, session_info)
}

async fn setup_overseer_with_blocks_with_two_assignments_triggered(
virtual_overseer: &mut VirtualOverseer,
store: TestStore,
clock: &Arc<MockClock>,
sync_oracle_handle: TestSyncOracleHandle,
) {
assert_matches!(
overseer_recv(virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();
let candidate_hash = candidate_receipt.hash();

let mut candidate_commitments2 = CandidateCommitments::default();
candidate_commitments2.processed_downward_messages = 3;
let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash);
candidate_receipt2.commitments_hash = candidate_commitments2.hash();
let candidate_hash2 = candidate_receipt2.hash();

let slot = Slot::from(1);
let (chain_builder, _session_info) = build_chain_with_block_with_two_candidates(
block_hash,
slot,
sync_oracle_handle,
vec![candidate_receipt, candidate_receipt2],
)
.await;
chain_builder.build(virtual_overseer).await;

assert!(!clock.inner.lock().current_wakeup_is(1));
clock.inner.lock().wakeup_all(1);

assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot)));
clock.inner.lock().wakeup_all(slot_to_tick(slot));

futures_timer::Delay::new(Duration::from_millis(200)).await;

clock.inner.lock().wakeup_all(slot_to_tick(slot + 2));

assert_eq!(clock.inner.lock().wakeups.len(), 0);

futures_timer::Delay::new(Duration::from_millis(200)).await;

let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap();
let our_assignment =
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
assert!(our_assignment.triggered());

let candidate_entry = store.load_candidate_entry(&candidate_hash2).unwrap().unwrap();
let our_assignment =
candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap();
assert!(our_assignment.triggered());
}

// Tests that for candidates that we did not approve yet, for which we triggered the assignment and
// the approval work we restart the work to approve it.
#[test]
Expand Down Expand Up @@ -4920,6 +5028,212 @@ fn subsystem_sends_pending_approvals_on_approval_restart() {
});
}

// Test that after restart approvals are sent after all assignments have been distributed.
#[test]
fn subsystem_sends_assignment_approval_in_correct_order_on_approval_restart() {
let assignment_criteria = Box::new(MockAssignmentCriteria(
|| {
let mut assignments = HashMap::new();

let _ = assignments.insert(
CoreIndex(0),
approval_db::v2::OurAssignment {
cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact {
core_bitfield: vec![CoreIndex(0), CoreIndex(2)].try_into().unwrap(),
}),
tranche: 0,
validator_index: ValidatorIndex(0),
triggered: false,
}
.into(),
);

let _ = assignments.insert(
CoreIndex(1),
approval_db::v2::OurAssignment {
cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFDelay {
core_index: CoreIndex(1),
}),
tranche: 0,
validator_index: ValidatorIndex(0),
triggered: false,
}
.into(),
);
assignments
},
|_| Ok(0),
));
let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build();
let store = config.backend();
let store_clone = config.backend();

test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;

setup_overseer_with_blocks_with_two_assignments_triggered(
&mut virtual_overseer,
store,
&clock,
sync_oracle_handle,
)
.await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

recover_available_data(&mut virtual_overseer).await;
fetch_validation_code(&mut virtual_overseer).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_
)) => {
}
);

recover_available_data(&mut virtual_overseer).await;
fetch_validation_code(&mut virtual_overseer).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive {
exec_kind,
response_sender,
..
}) if exec_kind == PvfExecKind::Approval => {
response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive {
exec_kind,
response_sender,
..
}) if exec_kind == PvfExecKind::Approval => {
response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default())))
.unwrap();
}
);

// Configure a big coalesce number, so that the signature is cached instead of being sent to
// approval-distribution.
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 2,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => {
let _ = sender.send(Ok(ApprovalVotingParams {
max_approval_coalesce_count: 2,
}));
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_))
);

// Assert that there are no more messages being sent by the subsystem
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

virtual_overseer
});

let config = HarnessConfigBuilder::default().backend(store_clone).major_syncing(true).build();
// On restart we should first distribute all assignments covering a coalesced approval.
test_harness(config, |test_harness| async move {
let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => {
rx.send(Ok(0)).unwrap();
}
);

let block_hash = Hash::repeat_byte(0x01);
let candidate_commitments = CandidateCommitments::default();
let mut candidate_receipt = dummy_candidate_receipt_v2(block_hash);
candidate_receipt.commitments_hash = candidate_commitments.hash();

let mut candidate_commitments2 = CandidateCommitments::default();
candidate_commitments2.processed_downward_messages = 3;
let mut candidate_receipt2 = dummy_candidate_receipt_v2(block_hash);
candidate_receipt2.commitments_hash = candidate_commitments2.hash();

let slot = Slot::from(1);

clock.inner.lock().set_tick(slot_to_tick(slot + 2));
let (chain_builder, _session_info) = build_chain_with_block_with_two_candidates(
block_hash,
slot,
sync_oracle_handle,
vec![candidate_receipt.into(), candidate_receipt2.into()],
)
.await;
chain_builder.build(&mut virtual_overseer).await;

futures_timer::Delay::new(Duration::from_millis(2000)).await;

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks(
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment(
_,
_,
)) => {
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(approval)) => {
assert_eq!(approval.candidate_indices.count_ones(), 2);
}
);

// Assert that there are no more messages being sent by the subsystem
assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none());

virtual_overseer
});
}

// Test we correctly update the timer when we mark the beginning of gathering assignments.
#[test]
fn test_gathering_assignments_statements() {
Expand Down
15 changes: 15 additions & 0 deletions prdoc/pr_6729.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Fix order of resending messages after restart

doc:
- audience: Node Dev
description: |
At restart when dealing with a coalesced approval we might end up in a situation where we sent to
approval-distribution the approval before all assignments covering it, in that case, the approval
is ignored and never distribute, which will lead to no-shows.

crates:
- name: polkadot-node-core-approval-voting
bump: minor

0 comments on commit 7a08623

Please sign in to comment.