Skip to content

Commit

Permalink
fix(consensus): initiate block validation in spawned task (#2876)
Browse files Browse the repository at this point in the history
This results in less blocking of the consensus crate.
Also moved this code to another function since validate_current_round_proposal was both large and
complex.
  • Loading branch information
matan-starkware authored Dec 24, 2024
1 parent a5bafd2 commit 3723b1a
Showing 1 changed file with 114 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -393,90 +393,34 @@ impl SequencerConsensusContext {
height: BlockNumber,
proposer: ValidatorId,
timeout: Duration,
mut content_receiver: mpsc::Receiver<ProposalPart>,
content_receiver: mpsc::Receiver<ProposalPart>,
fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>,
) {
info!("Validating proposal with timeout: {timeout:?}");
let cancel_token = CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
let batcher = Arc::clone(&self.batcher);
let valid_proposals = Arc::clone(&self.valid_proposals);
let chain_id = self.chain_id.clone();
let proposal_id = ProposalId(self.proposal_id);
self.proposal_id += 1;

let chrono_timeout =
chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration");
let now = chrono::Utc::now();
let input = ValidateBlockInput {
proposal_id,
deadline: now + chrono_timeout,
// TODO(Matan 3/11/2024): Add the real value of the retrospective block hash.
retrospective_block_hash: Some(BlockHashAndNumber {
number: BlockNumber::default(),
hash: BlockHash::default(),
}),
// TODO(Dan, Matan): Fill block info.
block_info: BlockInfo {
block_number: height,
gas_prices: TEMPORARY_GAS_PRICES,
block_timestamp: BlockTimestamp(
now.timestamp().try_into().expect("Failed to convert timestamp"),
),
use_kzg_da: true,
sequencer_address: proposer,
},
};
debug!("Initiating validate proposal: input={input:?}");
batcher.validate_block(input).await.expect("Failed to initiate proposal validation");

let token = CancellationToken::new();
let token_clone = token.clone();
let chain_id = self.chain_id.clone();
let mut content = Vec::new();

let handle = tokio::spawn(async move {
let (built_block, received_fin) = loop {
tokio::select! {
_ = token_clone.cancelled() => {
warn!("Proposal interrupted: {:?}", proposal_id);
batcher_abort_proposal(batcher.as_ref(), proposal_id).await;
return;
}
_ = tokio::time::sleep(timeout) => {
warn!("Validation timed out");
batcher_abort_proposal(batcher.as_ref(), proposal_id).await;
return;
}
proposal_part = content_receiver.next() => {
match handle_proposal_part(
proposal_id,
batcher.as_ref(),
proposal_part,
&mut content,
chain_id.clone()
).await {
HandledProposalPart::Finished(built_block, received_fin) => {
break (built_block, received_fin);
}
HandledProposalPart::Continue => {continue;}
HandledProposalPart::Failed(fail_reason) => {
warn!("Failed to handle proposal part: {proposal_id:?}, {fail_reason}");
batcher_abort_proposal(batcher.as_ref(), proposal_id).await;
return;
}
}
}
}
};
// Update valid_proposals before sending fin to avoid a race condition
// with `get_proposal` being called before `valid_proposals` is updated.
// TODO(Matan): Consider validating the ProposalFin signature here.
let mut valid_proposals = valid_proposals.lock().unwrap();
valid_proposals.entry(height).or_default().insert(built_block, (content, proposal_id));
if fin_sender.send((built_block, received_fin)).is_err() {
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content ids");
}
validate_proposal(
chain_id,
proposal_id,
batcher.as_ref(),
height,
proposer,
timeout,
valid_proposals,
content_receiver,
fin_sender,
cancel_token_clone,
)
.await
});
self.active_proposal = Some((token, handle));
self.active_proposal = Some((cancel_token, handle));
}

async fn interrupt_active_proposal(&mut self) {
Expand Down Expand Up @@ -580,6 +524,102 @@ async fn stream_build_proposal(
}
}

// TODO(Arni): Remove the clippy when switch to ProposalInit.
#[allow(clippy::too_many_arguments)]
async fn validate_proposal(
chain_id: ChainId,
proposal_id: ProposalId,
batcher: &dyn BatcherClient,
height: BlockNumber,
proposer: ValidatorId,
timeout: Duration,
valid_proposals: Arc<Mutex<HeightToIdToContent>>,
mut content_receiver: mpsc::Receiver<ProposalPart>,
fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>,
cancel_token: CancellationToken,
) {
initiate_validation(batcher, proposal_id, height, proposer, timeout).await;

let mut content = Vec::new();
let (built_block, received_fin) = loop {
tokio::select! {
_ = cancel_token.cancelled() => {
warn!("Proposal interrupted: {:?}", proposal_id);
batcher_abort_proposal(batcher, proposal_id).await;
return;
}
_ = tokio::time::sleep(timeout) => {
warn!("Validation timed out");
batcher_abort_proposal(batcher, proposal_id).await;
return;
}
proposal_part = content_receiver.next() => {
match handle_proposal_part(
proposal_id,
batcher,
proposal_part,
&mut content,
chain_id.clone()
).await {
HandledProposalPart::Finished(built_block, received_fin) => {
break (built_block, received_fin);
}
HandledProposalPart::Continue => {continue;}
HandledProposalPart::Failed(fail_reason) => {
warn!("Failed to handle proposal part: {proposal_id:?}, {fail_reason}");
batcher_abort_proposal(batcher, proposal_id).await;
return;
}
}
}
}
};

// Update valid_proposals before sending fin to avoid a race condition
// with `get_proposal` being called before `valid_proposals` is updated.
// TODO(Matan): Consider validating the ProposalFin signature here.
let mut valid_proposals = valid_proposals.lock().unwrap();
valid_proposals.entry(height).or_default().insert(built_block, (content, proposal_id));
if fin_sender.send((built_block, received_fin)).is_err() {
// Consensus may exit early (e.g. sync).
warn!("Failed to send proposal content ids");
}
}

async fn initiate_validation(
batcher: &dyn BatcherClient,
proposal_id: ProposalId,
height: BlockNumber,
proposer: ValidatorId,
timeout: Duration,
) {
// Initiate the validation.
let chrono_timeout =
chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration");
let now = chrono::Utc::now();
let input = ValidateBlockInput {
proposal_id,
deadline: now + chrono_timeout,
// TODO(Matan 3/11/2024): Add the real value of the retrospective block hash.
retrospective_block_hash: Some(BlockHashAndNumber {
number: BlockNumber::default(),
hash: BlockHash::default(),
}),
// TODO(Dan, Matan): Fill block info.
block_info: BlockInfo {
block_number: height,
gas_prices: TEMPORARY_GAS_PRICES,
block_timestamp: BlockTimestamp(
now.timestamp().try_into().expect("Failed to convert timestamp"),
),
use_kzg_da: true,
sequencer_address: proposer,
},
};
debug!("Initiating validate proposal: input={input:?}");
batcher.validate_block(input).await.expect("Failed to initiate proposal validation");
}

// Handles receiving a proposal from another node without blocking consensus:
// 1. Receives the proposal part from the network.
// 2. Pass this to the batcher.
Expand Down

0 comments on commit 3723b1a

Please sign in to comment.