diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 7a9450eaff..6dd27cc4d9 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -393,90 +393,34 @@ impl SequencerConsensusContext { height: BlockNumber, proposer: ValidatorId, timeout: Duration, - mut content_receiver: mpsc::Receiver, + content_receiver: mpsc::Receiver, fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>, ) { info!("Validating proposal with timeout: {timeout:?}"); + let cancel_token = CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); let batcher = Arc::clone(&self.batcher); let valid_proposals = Arc::clone(&self.valid_proposals); + let chain_id = self.chain_id.clone(); let proposal_id = ProposalId(self.proposal_id); self.proposal_id += 1; - let chrono_timeout = - chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration"); - let now = chrono::Utc::now(); - let input = ValidateBlockInput { - proposal_id, - deadline: now + chrono_timeout, - // TODO(Matan 3/11/2024): Add the real value of the retrospective block hash. - retrospective_block_hash: Some(BlockHashAndNumber { - number: BlockNumber::default(), - hash: BlockHash::default(), - }), - // TODO(Dan, Matan): Fill block info. - block_info: BlockInfo { - block_number: height, - gas_prices: TEMPORARY_GAS_PRICES, - block_timestamp: BlockTimestamp( - now.timestamp().try_into().expect("Failed to convert timestamp"), - ), - use_kzg_da: true, - sequencer_address: proposer, - }, - }; - debug!("Initiating validate proposal: input={input:?}"); - batcher.validate_block(input).await.expect("Failed to initiate proposal validation"); - - let token = CancellationToken::new(); - let token_clone = token.clone(); - let chain_id = self.chain_id.clone(); - let mut content = Vec::new(); - let handle = tokio::spawn(async move { - let (built_block, received_fin) = loop { - tokio::select! { - _ = token_clone.cancelled() => { - warn!("Proposal interrupted: {:?}", proposal_id); - batcher_abort_proposal(batcher.as_ref(), proposal_id).await; - return; - } - _ = tokio::time::sleep(timeout) => { - warn!("Validation timed out"); - batcher_abort_proposal(batcher.as_ref(), proposal_id).await; - return; - } - proposal_part = content_receiver.next() => { - match handle_proposal_part( - proposal_id, - batcher.as_ref(), - proposal_part, - &mut content, - chain_id.clone() - ).await { - HandledProposalPart::Finished(built_block, received_fin) => { - break (built_block, received_fin); - } - HandledProposalPart::Continue => {continue;} - HandledProposalPart::Failed(fail_reason) => { - warn!("Failed to handle proposal part: {proposal_id:?}, {fail_reason}"); - batcher_abort_proposal(batcher.as_ref(), proposal_id).await; - return; - } - } - } - } - }; - // Update valid_proposals before sending fin to avoid a race condition - // with `get_proposal` being called before `valid_proposals` is updated. - // TODO(Matan): Consider validating the ProposalFin signature here. - let mut valid_proposals = valid_proposals.lock().unwrap(); - valid_proposals.entry(height).or_default().insert(built_block, (content, proposal_id)); - if fin_sender.send((built_block, received_fin)).is_err() { - // Consensus may exit early (e.g. sync). - warn!("Failed to send proposal content ids"); - } + validate_proposal( + chain_id, + proposal_id, + batcher.as_ref(), + height, + proposer, + timeout, + valid_proposals, + content_receiver, + fin_sender, + cancel_token_clone, + ) + .await }); - self.active_proposal = Some((token, handle)); + self.active_proposal = Some((cancel_token, handle)); } async fn interrupt_active_proposal(&mut self) { @@ -580,6 +524,102 @@ async fn stream_build_proposal( } } +// TODO(Arni): Remove the clippy when switch to ProposalInit. +#[allow(clippy::too_many_arguments)] +async fn validate_proposal( + chain_id: ChainId, + proposal_id: ProposalId, + batcher: &dyn BatcherClient, + height: BlockNumber, + proposer: ValidatorId, + timeout: Duration, + valid_proposals: Arc>, + mut content_receiver: mpsc::Receiver, + fin_sender: oneshot::Sender<(ProposalContentId, ProposalFin)>, + cancel_token: CancellationToken, +) { + initiate_validation(batcher, proposal_id, height, proposer, timeout).await; + + let mut content = Vec::new(); + let (built_block, received_fin) = loop { + tokio::select! { + _ = cancel_token.cancelled() => { + warn!("Proposal interrupted: {:?}", proposal_id); + batcher_abort_proposal(batcher, proposal_id).await; + return; + } + _ = tokio::time::sleep(timeout) => { + warn!("Validation timed out"); + batcher_abort_proposal(batcher, proposal_id).await; + return; + } + proposal_part = content_receiver.next() => { + match handle_proposal_part( + proposal_id, + batcher, + proposal_part, + &mut content, + chain_id.clone() + ).await { + HandledProposalPart::Finished(built_block, received_fin) => { + break (built_block, received_fin); + } + HandledProposalPart::Continue => {continue;} + HandledProposalPart::Failed(fail_reason) => { + warn!("Failed to handle proposal part: {proposal_id:?}, {fail_reason}"); + batcher_abort_proposal(batcher, proposal_id).await; + return; + } + } + } + } + }; + + // Update valid_proposals before sending fin to avoid a race condition + // with `get_proposal` being called before `valid_proposals` is updated. + // TODO(Matan): Consider validating the ProposalFin signature here. + let mut valid_proposals = valid_proposals.lock().unwrap(); + valid_proposals.entry(height).or_default().insert(built_block, (content, proposal_id)); + if fin_sender.send((built_block, received_fin)).is_err() { + // Consensus may exit early (e.g. sync). + warn!("Failed to send proposal content ids"); + } +} + +async fn initiate_validation( + batcher: &dyn BatcherClient, + proposal_id: ProposalId, + height: BlockNumber, + proposer: ValidatorId, + timeout: Duration, +) { + // Initiate the validation. + let chrono_timeout = + chrono::Duration::from_std(timeout).expect("Can't convert timeout to chrono::Duration"); + let now = chrono::Utc::now(); + let input = ValidateBlockInput { + proposal_id, + deadline: now + chrono_timeout, + // TODO(Matan 3/11/2024): Add the real value of the retrospective block hash. + retrospective_block_hash: Some(BlockHashAndNumber { + number: BlockNumber::default(), + hash: BlockHash::default(), + }), + // TODO(Dan, Matan): Fill block info. + block_info: BlockInfo { + block_number: height, + gas_prices: TEMPORARY_GAS_PRICES, + block_timestamp: BlockTimestamp( + now.timestamp().try_into().expect("Failed to convert timestamp"), + ), + use_kzg_da: true, + sequencer_address: proposer, + }, + }; + debug!("Initiating validate proposal: input={input:?}"); + batcher.validate_block(input).await.expect("Failed to initiate proposal validation"); +} + // Handles receiving a proposal from another node without blocking consensus: // 1. Receives the proposal part from the network. // 2. Pass this to the batcher.