diff --git a/essentials/src/chain_events.rs b/essentials/src/chain_events.rs index 9241c75c..f3f287ea 100644 --- a/essentials/src/chain_events.rs +++ b/essentials/src/chain_events.rs @@ -73,6 +73,8 @@ pub struct SubxtCandidateEvent { pub parachain_id: u32, /// The event type pub event_type: SubxtCandidateEventType, + /// Core index + pub core_idx: u32, } /// A helper structure to keep track of a dispute and it's relay parent @@ -126,6 +128,7 @@ pub async fn decode_chain_event( return Ok(ChainEvent::CandidateChanged(Box::new(create_candidate_event( decoded.0.commitments_hash, decoded.0.descriptor, + decoded.2 .0, SubxtCandidateEventType::Backed, )))) } @@ -135,6 +138,7 @@ pub async fn decode_chain_event( return Ok(ChainEvent::CandidateChanged(Box::new(create_candidate_event( decoded.0.commitments_hash, decoded.0.descriptor, + decoded.2 .0, SubxtCandidateEventType::Included, )))) } @@ -144,6 +148,7 @@ pub async fn decode_chain_event( return Ok(ChainEvent::CandidateChanged(Box::new(create_candidate_event( decoded.0.commitments_hash, decoded.0.descriptor, + decoded.2 .0, SubxtCandidateEventType::TimedOut, )))) } @@ -190,9 +195,10 @@ fn decode_to_specific_event( fn create_candidate_event( commitments_hash: ::Hash, candidate_descriptor: CandidateDescriptor<::Hash>, + core_idx: u32, event_type: SubxtCandidateEventType, ) -> SubxtCandidateEvent { let candidate_hash = BlakeTwo256::hash_of(&(&candidate_descriptor, commitments_hash)); let parachain_id = candidate_descriptor.para_id.0; - SubxtCandidateEvent { event_type, candidate_descriptor, parachain_id, candidate_hash } + SubxtCandidateEvent { event_type, candidate_descriptor, parachain_id, candidate_hash, core_idx } } diff --git a/essentials/src/collector/candidate_record.rs b/essentials/src/collector/candidate_record.rs index e82287a6..58bdd07d 100644 --- a/essentials/src/collector/candidate_record.rs +++ b/essentials/src/collector/candidate_record.rs @@ -35,7 +35,7 @@ pub struct CandidateInclusionRecord { /// Relay parent block number when a candidate was timed out pub timedout: Option, /// Observed core index - pub core_idx: Option, + pub core_idx: u32, /// Stated relay parent pub relay_parent: T, /// Stated relay parent number @@ -109,6 +109,11 @@ pub struct CandidateRecord { } impl CandidateRecord { + /// Returns if a candidate has been included + pub fn is_included(&self) -> bool { + self.candidate_inclusion.included.is_some() + } + /// Returns if a candidate has been disputed #[allow(dead_code)] pub fn is_disputed(&self) -> bool { @@ -144,4 +149,8 @@ impl CandidateRecord { pub fn parachain_id(&self) -> u32 { self.candidate_inclusion.parachain_id } + + pub fn core_idx(&self) -> u32 { + self.candidate_inclusion.core_idx + } } diff --git a/essentials/src/collector/mod.rs b/essentials/src/collector/mod.rs index 7d37f981..e678cc25 100644 --- a/essentials/src/collector/mod.rs +++ b/essentials/src/collector/mod.rs @@ -799,7 +799,7 @@ impl Collector { relay_parent_number: relay_parent.number, parachain_id: change_event.parachain_id, backed: relay_block_number, - core_idx: None, + core_idx: change_event.core_idx, timedout: None, included: None, }; diff --git a/parachain-tracer/src/parachain_block_info.rs b/parachain-tracer/src/parachain_block_info.rs index 9168db45..12537b64 100644 --- a/parachain-tracer/src/parachain_block_info.rs +++ b/parachain-tracer/src/parachain_block_info.rs @@ -22,12 +22,8 @@ use subxt::config::{substrate::BlakeTwo256, Hasher}; /// This is used for displaying CLI updates and also goes to Storage. #[derive(Encode, Decode, Debug, Default)] pub struct ParachainBlockInfo { - /// The candidate information as observed during backing - pub candidate: Option>, /// Candidate hash - pub candidate_hash: Option, - /// The current state. - state: ParachainBlockState, + pub candidate_hash: H256, /// The number of signed bitfields. pub bitfield_count: u32, /// The maximum expected number of availability bits that can be set. Corresponds to `max_validators`. @@ -35,26 +31,21 @@ pub struct ParachainBlockInfo { /// The current number of observed availability bits set to 1. pub current_availability_bits: u32, /// Parachain availability core assignment information. - pub assigned_core: Option, + pub assigned_core: u32, /// Core occupation status. pub core_occupied: bool, + /// The current state. + pub state: ParachainBlockState, } impl ParachainBlockInfo { - pub fn maybe_reset(&mut self) { - if self.is_included() { - self.state = ParachainBlockState::Idle; - self.candidate = None; - self.candidate_hash = None; - } + pub fn new(candidate_hash: H256, assigned_core: u32, bitfield_count: u32) -> Self { + Self { candidate_hash, assigned_core, bitfield_count, ..Default::default() } } - pub fn set_idle(&mut self) { - self.state = ParachainBlockState::Idle - } - - pub fn set_backed(&mut self) { - self.state = ParachainBlockState::Backed + pub fn candidate_hash(candidate: &BackedCandidate) -> H256 { + let commitments_hash = BlakeTwo256::hash_of(&candidate.candidate.commitments); + BlakeTwo256::hash_of(&(&candidate.candidate.descriptor, commitments_hash)) } pub fn set_pending(&mut self) { @@ -65,45 +56,24 @@ impl ParachainBlockInfo { self.state = ParachainBlockState::Included } - pub fn set_candidate(&mut self, candidate: BackedCandidate) { - let commitments_hash = BlakeTwo256::hash_of(&candidate.candidate.commitments); - let candidate_hash = BlakeTwo256::hash_of(&(&candidate.candidate.descriptor, commitments_hash)); - self.candidate_hash = Some(candidate_hash); - self.candidate = Some(candidate); - } - - pub fn is_idle(&self) -> bool { - self.state == ParachainBlockState::Idle - } - pub fn is_backed(&self) -> bool { self.state == ParachainBlockState::Backed } - pub fn is_pending(&self) -> bool { - self.state == ParachainBlockState::PendingAvailability - } - pub fn is_included(&self) -> bool { self.state == ParachainBlockState::Included } - pub fn is_data_available(&self) -> bool { - self.current_availability_bits > (self.max_availability_bits / 3) * 2 - } - pub fn is_bitfield_propagation_slow(&self) -> bool { - self.max_availability_bits > 0 && !self.is_idle() && self.bitfield_count <= (self.max_availability_bits / 3) * 2 + self.max_availability_bits > 0 && self.bitfield_count <= (self.max_availability_bits / 3) * 2 } } /// The state of parachain block. #[derive(Encode, Decode, Debug, Default, Clone, PartialEq, Eq)] -enum ParachainBlockState { - // Parachain block pipeline is idle. - #[default] - Idle, +pub enum ParachainBlockState { // A candidate is currently backed. + #[default] Backed, // A candidate is pending inclusion. PendingAvailability, @@ -115,60 +85,15 @@ enum ParachainBlockState { mod tests { use crate::test_utils::create_para_block_info; - #[test] - fn test_does_not_reset_state_if_not_included() { - let mut info = create_para_block_info(); - info.set_backed(); - - assert!(info.is_backed()); - assert!(info.candidate.is_some()); - assert!(info.candidate_hash.is_some()); - - info.maybe_reset(); - - assert!(info.is_backed()); - assert!(info.candidate.is_some()); - assert!(info.candidate_hash.is_some()); - } - - #[test] - fn test_resets_state_if_included() { - let mut info = create_para_block_info(); - info.set_included(); - - assert!(info.is_included()); - assert!(info.candidate.is_some()); - assert!(info.candidate_hash.is_some()); - - info.maybe_reset(); - - assert!(info.is_idle()); - assert!(info.candidate.is_none()); - assert!(info.candidate_hash.is_none()); - } - - #[test] - fn test_is_data_available() { - let mut info = create_para_block_info(); - assert!(!info.is_data_available()); - - info.max_availability_bits = 200; - info.current_availability_bits = 134; - assert!(info.is_data_available()); - } - #[test] fn test_is_bitfield_propagation_slow() { - let mut info = create_para_block_info(); + let mut info = create_para_block_info(100); assert!(!info.is_bitfield_propagation_slow()); info.max_availability_bits = 200; - assert!(!info.is_bitfield_propagation_slow()); - - info.bitfield_count = 100; - assert!(!info.is_bitfield_propagation_slow()); + assert!(info.is_bitfield_propagation_slow()); - info.set_backed(); + info.bitfield_count = 120; assert!(info.is_bitfield_propagation_slow()); } } diff --git a/parachain-tracer/src/test_utils.rs b/parachain-tracer/src/test_utils.rs index edf45af6..82c01dba 100644 --- a/parachain-tracer/src/test_utils.rs +++ b/parachain-tracer/src/test_utils.rs @@ -143,6 +143,7 @@ pub fn create_hrmp_channels() -> BTreeMap { pub fn create_candidate_record( para_id: u32, backed: u32, + included: Option, relay_parent: H256, relay_parent_number: u32, ) -> CandidateRecord { @@ -150,9 +151,9 @@ pub fn create_candidate_record( candidate_inclusion: CandidateInclusionRecord { parachain_id: para_id, backed, - included: None, + included, timedout: None, - core_idx: None, + core_idx: 0, relay_parent, relay_parent_number, }, @@ -161,35 +162,10 @@ pub fn create_candidate_record( } } -pub fn create_para_block_info() -> ParachainBlockInfo { - let mut info = ParachainBlockInfo::default(); - info.set_candidate(BackedCandidate { - candidate: CommittedCandidateReceipt { - descriptor: CandidateDescriptor { - para_id: Id(100), - relay_parent: Default::default(), - collator: collator_app::Public(Public([0; 32])), - persisted_validation_data_hash: Default::default(), - pov_hash: Default::default(), - erasure_root: Default::default(), - signature: collator_app::Signature(Signature([0; 64])), - para_head: Default::default(), - validation_code_hash: ValidationCodeHash(Default::default()), - }, - commitments: CandidateCommitments { - upward_messages: BoundedVec(Default::default()), - horizontal_messages: BoundedVec(Default::default()), - new_validation_code: Default::default(), - head_data: HeadData(Default::default()), - processed_downward_messages: Default::default(), - hrmp_watermark: Default::default(), - }, - }, - validity_votes: vec![], - validator_indices: DecodedBits::from_iter([true]), - }); - - info +pub fn create_para_block_info(para_id: u32) -> ParachainBlockInfo { + let candidate = create_backed_candidate(para_id); + let hash = ParachainBlockInfo::candidate_hash(&candidate); + ParachainBlockInfo::new(hash, 0, 0) } pub async fn storage_write( diff --git a/parachain-tracer/src/tracker.rs b/parachain-tracer/src/tracker.rs index c5109873..8fc2e824 100644 --- a/parachain-tracer/src/tracker.rs +++ b/parachain-tracer/src/tracker.rs @@ -17,12 +17,12 @@ //! This module tracks parachain blocks. use crate::{ message_queues_tracker::MessageQueuesTracker, - parachain_block_info::ParachainBlockInfo, + parachain_block_info::{ParachainBlockInfo, ParachainBlockState}, prometheus::PrometheusMetrics, stats::Stats, tracker_storage::TrackerStorage, types::{Block, BlockWithoutHash, DisputesTracker, ForkTracker, ParachainConsensusEvent, ParachainProgressUpdate}, - utils::{backed_candidate, extract_availability_bits_count, extract_inherent_fields, time_diff}, + utils::{backed_candidates_by_para_id, extract_availability_bits_count, extract_inherent_fields, time_diff}, }; use log::{error, info}; use polkadot_introspector_essentials::{ @@ -30,7 +30,7 @@ use polkadot_introspector_essentials::{ metadata::polkadot_primitives::{AvailabilityBitfield, BackedCandidate, DisputeStatementSet, ValidatorIndex}, types::{BlockNumber, CoreOccupied, OnDemandOrder, Timestamp, H256}, }; -use std::{default::Default, time::Duration}; +use std::{collections::HashMap, default::Default, time::Duration}; /// A subxt based parachain candidate tracker. pub struct SubxtTracker { @@ -40,7 +40,10 @@ pub struct SubxtTracker { /// A new session index. new_session: Option, /// Information about current parachain block we track. - current_candidate: ParachainBlockInfo, + /// `None` is for skipped slots. + candidates: HashMap>>, + /// Core assignments for current para_id + cores: HashMap>, /// Current relay chain block. current_relay_block: Option, /// Previous relay chain block. @@ -80,7 +83,8 @@ impl SubxtTracker { pub fn new(para_id: u32) -> Self { Self { para_id, - current_candidate: Default::default(), + candidates: HashMap::new(), + cores: HashMap::new(), new_session: None, current_relay_block: None, previous_relay_block: None, @@ -116,17 +120,15 @@ impl SubxtTracker { self.set_relay_block(block_hash, block_number, storage).await?; self.set_forks(block_hash, block_number); - - self.set_current_candidate(backed_candidates, bitfields.len(), block_number); - self.set_core_assignment(block_hash, storage).await?; + self.set_cores(block_hash, storage).await; + self.set_included_candidates(storage).await; + self.set_backed_candidates(backed_candidates, bitfields.len(), block_number, storage) + .await; + self.set_core_assignment(block_hash, storage).await; self.set_disputes(&disputes[..], storage).await; - self.set_hrmp_channels(block_hash, storage).await?; self.set_on_demand_order(block_hash, storage).await; - - if self.has_backed_candidate() { - self.set_availability(block_hash, bitfields, storage).await?; - } + self.set_pending_availability(block_hash, bitfields, storage).await?; } else { error!("Failed to get inherent data for {:?}", block_hash); } @@ -151,7 +153,13 @@ impl SubxtTracker { para_id: self.para_id, is_fork: self.is_fork(), finality_lag: self.finality_lag, - core_occupied: self.current_candidate.core_occupied, + core_occupied: self + .candidates + .iter() + .map(|(core_idx, v)| { + (*core_idx, v.last().map_or(false, |v| v.as_ref().map_or(false, |v| v.core_occupied))) + }) + .collect(), ..Default::default() }; @@ -174,14 +182,18 @@ impl SubxtTracker { /// Resets state pub fn maybe_reset_state(&mut self) { self.first_run = false; - if self.current_candidate.is_backed() { - self.on_demand_order_at = None; + for &core in self.cores.keys() { + if self.is_current_candidate_backed(core) { + self.on_demand_order_at = None; + } } self.new_session = None; self.on_demand_order = None; self.is_on_demand_scheduled_in_current_block = false; self.disputes.clear(); - self.current_candidate.maybe_reset(); + self.candidates + .values_mut() + .for_each(|v| v.retain(|v| v.is_some() && v.as_ref().map_or(false, |v| !v.is_included()))); } async fn set_hrmp_channels(&mut self, block_hash: H256, storage: &TrackerStorage) -> color_eyre::Result<()> { @@ -232,38 +244,70 @@ impl SubxtTracker { }); } - fn set_current_candidate( + async fn set_cores(&mut self, block_hash: H256, storage: &TrackerStorage) { + let assignments = storage.core_assignments(block_hash).await.expect("saved in the collector"); + self.cores = assignments.into_iter().filter(|(_, ids)| ids.contains(&self.para_id)).collect(); + } + + async fn set_included_candidates(&mut self, storage: &TrackerStorage) { + let mut last_included = None; + for candidate in self.all_candidates_mut() { + let Some(stored_candidate) = storage.candidate(candidate.candidate_hash).await else { continue }; + if stored_candidate.is_included() { + candidate.set_included(); + last_included = Some(candidate.candidate_hash); + } + } + if last_included.is_some() { + self.relay_forks.last_mut().expect("must have relay fork").included_candidate = last_included; + self.previous_included_at = self.last_included_at; + self.last_included_at = self.current_relay_block.map(|v| v.into()); + } + } + + async fn set_backed_candidates( &mut self, backed_candidates: Vec>, bitfields_count: usize, block_number: BlockNumber, + storage: &TrackerStorage, ) { - self.current_candidate.bitfield_count = bitfields_count as u32; - if let Some(candidate) = backed_candidate(backed_candidates, self.para_id) { - self.current_candidate.set_backed(); - self.current_candidate.set_candidate(candidate); + let candidate_hashes = backed_candidates_by_para_id(backed_candidates, self.para_id) + .map(|v| ParachainBlockInfo::candidate_hash(&v)); + let mut used_cores = vec![]; + for candidate_hash in candidate_hashes { + let Some(core) = self.candidate_core(candidate_hash, storage).await else { continue }; + let candidate = ParachainBlockInfo::new(candidate_hash, core, bitfields_count as u32); + if let Some(current_fork) = self.relay_forks.last_mut() { + current_fork.backed_candidate = Some(candidate.candidate_hash); + } self.last_backed_at_block_number = Some(block_number); + self.candidates.entry(core).or_default().push(Some(candidate)); - if let Some(current_fork) = self.relay_forks.last_mut() { - current_fork.backed_candidate = self.current_candidate.candidate_hash; + used_cores.push(core) + } + + let idle_cores = self.cores.keys().filter(|v| !used_cores.contains(v)).cloned(); + for core in idle_cores { + if !self.has_backed_candidate(core) { + self.candidates.entry(core).or_default().push(None); } - } else if !self.has_backed_candidate() { - self.current_candidate.set_idle(); } } - async fn set_core_assignment(&mut self, block_hash: H256, storage: &TrackerStorage) -> color_eyre::Result<()> { - let assignments = storage.core_assignments(block_hash).await.expect("saved in the collector"); - if let Some((&core, scheduled_ids)) = assignments.iter().find(|(_, ids)| ids.contains(&self.para_id)) { - self.current_candidate.assigned_core = Some(core); - self.current_candidate.core_occupied = matches!( - storage.occupied_cores(block_hash).await.expect("saved in the collector")[core as usize], - CoreOccupied::Paras - ); + async fn set_core_assignment(&mut self, block_hash: H256, storage: &TrackerStorage) { + for (core, scheduled_ids) in self.cores.clone() { self.is_on_demand_scheduled_in_current_block = self.on_demand_order.is_some() && scheduled_ids[0] == self.para_id; + + let Some(candidate) = self.current_candidate_mut(core) else { continue }; + if candidate.is_backed() { + candidate.core_occupied = matches!( + storage.occupied_cores(block_hash).await.expect("saved in the collector")[core as usize], + CoreOccupied::Paras + ); + } } - Ok(()) } async fn set_disputes(&mut self, disputes: &[DisputeStatementSet], storage: &TrackerStorage) { @@ -293,28 +337,21 @@ impl SubxtTracker { } } - async fn set_availability( + async fn set_pending_availability( &mut self, block_hash: H256, bitfields: Vec, storage: &TrackerStorage, ) -> color_eyre::Result<()> { - if self.current_candidate.is_backed() { - // We only process availability if our parachain is assigned to an availability core. - if let Some(core) = self.current_candidate.assigned_core { - self.current_candidate.current_availability_bits = extract_availability_bits_count(bitfields, core); - self.current_candidate.max_availability_bits = - self.validators_indices(block_hash, storage).await?.len() as u32; - - if self.current_candidate.is_data_available() { - self.current_candidate.set_included(); - self.relay_forks.last_mut().expect("must have relay fork").included_candidate = - self.current_candidate.candidate_hash; - self.previous_included_at = self.last_included_at; - self.last_included_at = self.current_relay_block.map(|v| v.into()); - } else if !self.is_just_backed() { - self.current_candidate.set_pending(); - } + let core_ids: Vec = self.cores.keys().cloned().collect(); + for core in core_ids { + if self.is_current_candidate_backed(core) && !self.is_just_backed() { + let max_bits = self.validators_indices(block_hash, storage).await?.len() as u32; + let candidate = self.current_candidate_mut(core).expect("Checked above"); + + candidate.current_availability_bits = extract_availability_bits_count(&bitfields, core); + candidate.max_availability_bits = max_bits; + candidate.set_pending(); } } @@ -369,16 +406,20 @@ impl SubxtTracker { if self.is_on_demand_scheduled_in_current_block { metrics.handle_on_demand_delay(delay, self.para_id, "scheduled"); } - if self.current_candidate.is_backed() { - metrics.handle_on_demand_delay(delay, self.para_id, "backed"); + for &core in self.cores.keys() { + if self.is_current_candidate_backed(core) { + metrics.handle_on_demand_delay(delay, self.para_id, "backed"); + } } } if let Some(delay_sec) = self.on_demand_delay_sec() { if self.is_on_demand_scheduled_in_current_block { metrics.handle_on_demand_delay_sec(delay_sec, self.para_id, "scheduled"); } - if self.current_candidate.is_backed() { - metrics.handle_on_demand_delay_sec(delay_sec, self.para_id, "backed"); + for &core in self.cores.keys() { + if self.is_current_candidate_backed(core) { + metrics.handle_on_demand_delay_sec(delay_sec, self.para_id, "backed"); + } } } } @@ -390,8 +431,10 @@ impl SubxtTracker { } fn notify_core_assignment(&self, progress: &mut ParachainProgressUpdate) { - if let Some(assigned_core) = self.current_candidate.assigned_core { - progress.events.push(ParachainConsensusEvent::CoreAssigned(assigned_core)); + for &core in self.cores.keys() { + if self.current_candidate(core).is_some() { + progress.events.push(ParachainConsensusEvent::CoreAssigned(core)); + } } } @@ -401,19 +444,17 @@ impl SubxtTracker { stats: &mut impl Stats, metrics: &impl PrometheusMetrics, ) { - if self.current_candidate.is_bitfield_propagation_slow() { - progress.events.push(ParachainConsensusEvent::SlowBitfieldPropagation( - self.current_candidate.bitfield_count, - self.current_candidate.max_availability_bits, - )) + for &core in self.cores.keys() { + let Some(candidate) = self.current_candidate(core) else { continue }; + if candidate.is_bitfield_propagation_slow() { + progress.events.push(ParachainConsensusEvent::SlowBitfieldPropagation( + candidate.bitfield_count, + candidate.max_availability_bits, + )) + } + stats.on_bitfields(candidate.bitfield_count, candidate.is_bitfield_propagation_slow()); + metrics.on_bitfields(candidate.bitfield_count, candidate.is_bitfield_propagation_slow(), self.para_id); } - stats - .on_bitfields(self.current_candidate.bitfield_count, self.current_candidate.is_bitfield_propagation_slow()); - metrics.on_bitfields( - self.current_candidate.bitfield_count, - self.current_candidate.is_bitfield_propagation_slow(), - self.para_id, - ); } async fn notify_candidate_state( @@ -423,34 +464,38 @@ impl SubxtTracker { metrics: &impl PrometheusMetrics, storage: &TrackerStorage, ) { - if self.current_candidate.is_idle() { - progress.events.push(ParachainConsensusEvent::SkippedSlot); - stats.on_skipped_slot(progress); - metrics.on_skipped_slot(progress); - } - - if self.current_candidate.is_backed() { - if let Some(candidate_hash) = self.current_candidate.candidate_hash { - progress.events.push(ParachainConsensusEvent::Backed(candidate_hash)); - stats.on_backed(); - metrics.on_backed(self.para_id); + for candidate in self.all_candidates_and_skipped_slots() { + // Process skipped slots first + if candidate.is_none() { + progress.events.push(ParachainConsensusEvent::SkippedSlot); + stats.on_skipped_slot(progress); + metrics.on_skipped_slot(progress); } - } - - if self.current_candidate.is_pending() || self.current_candidate.is_included() { - progress.bitfield_health.max_bitfield_count = self.current_candidate.max_availability_bits; - progress.bitfield_health.available_count = self.current_candidate.current_availability_bits; - progress.bitfield_health.bitfield_count = self.current_candidate.bitfield_count; - if self.current_candidate.is_data_available() { - if let Some(candidate_hash) = self.current_candidate.candidate_hash { + let Some(candidate) = candidate else { continue }; + match candidate.state { + ParachainBlockState::Backed => { + progress.events.push(ParachainConsensusEvent::Backed(candidate.candidate_hash)); + stats.on_backed(); + metrics.on_backed(self.para_id); + }, + ParachainBlockState::PendingAvailability => + if self.is_slow_availability(candidate.assigned_core) { + progress.events.push(ParachainConsensusEvent::SlowAvailability( + candidate.current_availability_bits, + candidate.max_availability_bits, + )); + stats.on_slow_availability(); + metrics.on_slow_availability(self.para_id); + }, + ParachainBlockState::Included => { progress.events.push(ParachainConsensusEvent::Included( - candidate_hash, - self.current_candidate.current_availability_bits, - self.current_candidate.max_availability_bits, + candidate.candidate_hash, + candidate.current_availability_bits, + candidate.max_availability_bits, )); - let backed_in = self.candidate_backed_in(candidate_hash, storage).await; + let backed_in = self.candidate_backed_in(candidate.candidate_hash, storage).await; let relay_block = self.current_relay_block.expect("Checked by caller; qed"); stats.on_included(relay_block.num, self.previous_included_at.map(|v| v.num), backed_in); metrics.on_included( @@ -460,14 +505,7 @@ impl SubxtTracker { time_diff(Some(relay_block.ts), self.previous_included_at.map(|v| v.ts)), self.para_id, ); - } - } else if self.is_slow_availability() { - progress.events.push(ParachainConsensusEvent::SlowAvailability( - self.current_candidate.current_availability_bits, - self.current_candidate.max_availability_bits, - )); - stats.on_slow_availability(); - metrics.on_slow_availability(self.para_id); + }, } } } @@ -478,6 +516,25 @@ impl SubxtTracker { Duration::from_millis(cur_ts).saturating_sub(Duration::from_millis(base_ts)) } + fn current_candidate(&self, core: u32) -> Option<&ParachainBlockInfo> { + self.candidates.get(&core).and_then(|v| v.last()).and_then(|v| v.as_ref()) + } + + fn current_candidate_mut(&mut self, core: u32) -> Option<&mut ParachainBlockInfo> { + self.candidates + .get_mut(&core) + .and_then(|v| v.last_mut()) + .and_then(|v| v.as_mut()) + } + + fn all_candidates_and_skipped_slots(&self) -> impl Iterator> { + self.candidates.values().flatten().map(|v| v.as_ref()) + } + + fn all_candidates_mut(&mut self) -> impl Iterator { + self.candidates.values_mut().flatten().filter_map(|v| v.as_mut()) + } + fn is_fork(&self) -> bool { match (self.current_relay_block, self.previous_relay_block) { (Some(a), Some(b)) => a.num == b.num, @@ -497,25 +554,30 @@ impl SubxtTracker { time_diff(self.current_relay_block.map(|v| v.ts), self.on_demand_order_at.map(|v| v.ts)) } - fn has_backed_candidate(&self) -> bool { - self.current_candidate.candidate.is_some() || + fn has_backed_candidate(&self, core: u32) -> bool { + self.candidates + .get(&core) + .map_or(false, |v| v.iter().any(|candidate| candidate.is_some())) || self.relay_forks .iter() .any(|fork| fork.backed_candidate.is_some() || fork.included_candidate.is_some()) } + fn is_current_candidate_backed(&self, core: u32) -> bool { + self.current_candidate(core).map_or(false, |v| v.is_backed()) + } + fn is_just_backed(&self) -> bool { self.last_backed_at_block_number.is_some() && self.last_backed_at_block_number == self.current_relay_block.map(|v| v.num) } - fn is_slow_availability(&self) -> bool { - self.current_candidate.core_occupied && - self.last_backed_at_block_number != self.current_relay_block.map(|v| v.num) + fn is_slow_availability(&self, core: u32) -> bool { + self.current_candidate(core).map_or(false, |v| v.core_occupied) && !self.is_just_backed() } async fn validators_indices( - &mut self, + &self, block_hash: H256, storage: &TrackerStorage, ) -> color_eyre::Result> { @@ -535,6 +597,10 @@ impl SubxtTracker { .saturating_sub(v.candidate_inclusion.relay_parent_number) }) } + + async fn candidate_core(&self, candidate_hash: H256, storage: &TrackerStorage) -> Option { + storage.candidate(candidate_hash).await.map(|v| v.core_idx()) + } } #[cfg(test)] @@ -554,16 +620,21 @@ mod test_inject_new_session { #[cfg(test)] mod test_maybe_reset_state { + use crate::test_utils::create_para_block_info; + use super::*; #[tokio::test] async fn test_resets_state_if_not_backed() { let mut tracker = SubxtTracker::new(100); - tracker.current_candidate.set_included(); + let mut candidate = create_para_block_info(100); + candidate.set_included(); + tracker.candidates.entry(0).or_default().push(Some(candidate)); tracker.new_session = Some(42); tracker.on_demand_order = Some(OnDemandOrder::default()); tracker.is_on_demand_scheduled_in_current_block = true; tracker.disputes = vec![DisputesTracker::default()]; + tracker.cores.entry(0).or_default().push(100); tracker.maybe_reset_state(); @@ -571,19 +642,24 @@ mod test_maybe_reset_state { assert!(tracker.on_demand_order.is_none()); assert!(!tracker.is_on_demand_scheduled_in_current_block); assert!(tracker.disputes.is_empty()); - assert!(tracker.current_candidate.is_idle()); + for (_, candidates) in tracker.candidates { + assert!(candidates.is_empty()); + } } #[tokio::test] async fn test_resets_state_if_backed() { let mut tracker = SubxtTracker::new(100); - tracker.current_candidate.set_backed(); + let candidate = create_para_block_info(100); + tracker.candidates.entry(0).or_default().push(Some(candidate)); tracker.new_session = Some(42); tracker.on_demand_order = Some(OnDemandOrder::default()); tracker.on_demand_order_at = Some(BlockWithoutHash::default()); tracker.is_on_demand_scheduled_in_current_block = true; tracker.disputes = vec![DisputesTracker::default()]; + tracker.cores.entry(0).or_default().push(100); + assert!(tracker.is_current_candidate_backed(0)); tracker.maybe_reset_state(); assert!(tracker.on_demand_order_at.is_none()); @@ -596,11 +672,12 @@ mod test_maybe_reset_state { #[cfg(test)] mod test_inject_block { - use std::collections::BTreeMap; - use super::*; - use crate::test_utils::{create_inherent_data, create_storage, storage_write}; + use crate::test_utils::{ + create_candidate_record, create_inherent_data, create_para_block_info, create_storage, storage_write, + }; use polkadot_introspector_essentials::collector::CollectorPrefixType; + use std::collections::BTreeMap; #[tokio::test] async fn test_changes_nothing_if_there_is_no_inherent_data() { @@ -611,7 +688,7 @@ mod test_inject_block { tracker.inject_block(hash, 0, &tracker_storage).await.unwrap(); assert!(tracker.new_session.is_none()); - assert!(tracker.current_candidate.candidate.is_none()); + assert!(tracker.candidates.is_empty()); assert!(tracker.current_relay_block.is_none()); assert!(tracker.previous_relay_block.is_none()); assert!(tracker.last_non_fork_relay_block_ts.is_none()); @@ -636,7 +713,15 @@ mod test_inject_block { let tracker_storage = TrackerStorage::new(100, storage.clone()); // Inject a block - storage_write(CollectorPrefixType::CoreAssignments, first_hash, BTreeMap::>::default(), &storage) + storage_write( + CollectorPrefixType::CoreAssignments, + first_hash, + BTreeMap::>::from([(0, vec![100])]), + &storage, + ) + .await + .unwrap(); + storage_write(CollectorPrefixType::BackingGroups, first_hash, Vec::>::default(), &storage) .await .unwrap(); storage_write(CollectorPrefixType::InherentData, first_hash, create_inherent_data(100), &storage) @@ -657,11 +742,14 @@ mod test_inject_block { storage_write( CollectorPrefixType::CoreAssignments, second_hash, - BTreeMap::>::default(), + BTreeMap::>::from([(0, vec![100])]), &storage, ) .await .unwrap(); + storage_write(CollectorPrefixType::BackingGroups, second_hash, Vec::>::default(), &storage) + .await + .unwrap(); storage_write(CollectorPrefixType::InherentData, second_hash, create_inherent_data(100), &storage) .await .unwrap(); @@ -680,6 +768,101 @@ mod test_inject_block { assert_eq!(tracker.last_non_fork_relay_block_ts, Some(1)); assert_eq!(tracker.finality_lag, Some(2)); } + + #[tokio::test] + async fn test_sets_backed_candidates() { + let storage = create_storage(); + let tracker_storage = TrackerStorage::new(100, storage.clone()); + let mut tracker = SubxtTracker::new(100); + + let block_hash = H256::random(); + let inherent_data = create_inherent_data(100); + let backed_candidate = inherent_data.backed_candidates.first().unwrap(); + let candidate_hash = ParachainBlockInfo::candidate_hash(backed_candidate); + + // Inject a block + storage_write( + CollectorPrefixType::CoreAssignments, + block_hash, + BTreeMap::>::from([(0, vec![100])]), + &storage, + ) + .await + .unwrap(); + storage_write(CollectorPrefixType::OccupiedCores, block_hash, vec![CoreOccupied::Paras], &storage) + .await + .unwrap(); + storage_write(CollectorPrefixType::BackingGroups, block_hash, Vec::>::default(), &storage) + .await + .unwrap(); + storage_write(CollectorPrefixType::InherentData, block_hash, inherent_data, &storage) + .await + .unwrap(); + storage_write(CollectorPrefixType::Timestamp, block_hash, 1_u64, &storage) + .await + .unwrap(); + storage_write( + CollectorPrefixType::Candidate(100), + candidate_hash, + create_candidate_record(100, 42, None, H256::random(), 40), + &storage, + ) + .await + .unwrap(); + + tracker.inject_block(block_hash, 42, &tracker_storage).await.unwrap(); + + let candidate = tracker.candidates.get(&0).unwrap().first().unwrap().as_ref().unwrap(); + assert!(candidate.candidate_hash == candidate_hash); + assert!(candidate.is_backed()); + } + + #[tokio::test] + async fn test_sets_included_candidates() { + let storage = create_storage(); + let tracker_storage = TrackerStorage::new(100, storage.clone()); + let mut tracker = SubxtTracker::new(100); + + let block_hash = H256::random(); + let candidate = create_para_block_info(100); + let candidate_hash = candidate.candidate_hash; + tracker.candidates.entry(0).or_default().push(Some(candidate)); + + // Inject a block + storage_write( + CollectorPrefixType::CoreAssignments, + block_hash, + BTreeMap::>::from([(0, vec![100])]), + &storage, + ) + .await + .unwrap(); + storage_write(CollectorPrefixType::OccupiedCores, block_hash, vec![CoreOccupied::Free], &storage) + .await + .unwrap(); + storage_write(CollectorPrefixType::BackingGroups, block_hash, Vec::>::default(), &storage) + .await + .unwrap(); + storage_write(CollectorPrefixType::InherentData, block_hash, create_inherent_data(100), &storage) + .await + .unwrap(); + storage_write(CollectorPrefixType::Timestamp, block_hash, 1_u64, &storage) + .await + .unwrap(); + storage_write( + CollectorPrefixType::Candidate(100), + candidate_hash, + create_candidate_record(100, 41, Some(42), H256::random(), 40), + &storage, + ) + .await + .unwrap(); + + tracker.inject_block(block_hash, 42, &tracker_storage).await.unwrap(); + + let candidate = tracker.candidates.get(&0).unwrap().first().unwrap().as_ref().unwrap(); + assert!(candidate.is_included()); + } } #[cfg(test)] @@ -688,7 +871,9 @@ mod test_progress { use crate::{ prometheus::{Metrics, MockPrometheusMetrics}, stats::{MockStats, ParachainStats}, - test_utils::{create_candidate_record, create_hrmp_channels, create_storage, storage_write}, + test_utils::{ + create_candidate_record, create_hrmp_channels, create_para_block_info, create_storage, storage_write, + }, }; use mockall::predicate::eq; use polkadot_introspector_essentials::collector::CollectorPrefixType; @@ -723,7 +908,9 @@ mod test_progress { assert_eq!(progress.para_id, 100); assert!(!progress.is_fork); assert!(progress.finality_lag.is_none()); - assert!(!progress.core_occupied); + for (_, core_occupied) in progress.core_occupied { + assert!(!core_occupied); + } } #[tokio::test] @@ -758,10 +945,12 @@ mod test_progress { let tracker_storage = TrackerStorage::new(100, create_storage()); let mut stats = ParachainStats::default(); let metrics = Metrics::default(); + let mut candidate = create_para_block_info(100); tracker.current_relay_block = Some(Block { num: 42, ts: 1694095332000, hash: H256::random() }); - tracker.current_candidate.assigned_core = Some(0); - tracker.current_candidate.core_occupied = true; + candidate.core_occupied = true; + tracker.candidates.entry(0).or_default().push(Some(candidate)); + tracker.cores.entry(0).or_default().push(100); let progress = tracker.progress(&mut stats, &metrics, &tracker_storage).await.unwrap(); assert!(progress @@ -774,6 +963,7 @@ mod test_progress { async fn test_includes_slow_propogation() { let mut tracker = SubxtTracker::new(100); let tracker_storage = TrackerStorage::new(100, create_storage()); + let mut candidate = create_para_block_info(100); let mut mock_stats = MockStats::default(); mock_stats.expect_on_backed().returning(|| ()); mock_stats.expect_on_block().returning(|_| ()); @@ -786,7 +976,9 @@ mod test_progress { // Bitfields propogation isn't slow tracker.current_relay_block = Some(Block { num: 42, ts: 1694095332000, hash: H256::random() }); - tracker.current_candidate.bitfield_count = 120; + candidate.bitfield_count = 120; + tracker.candidates.entry(0).or_default().push(Some(candidate)); + tracker.cores.entry(0).or_default().push(100); mock_stats.expect_on_bitfields().with(eq(120), eq(false)).returning(|_, _| ()); mock_metrics .expect_on_bitfields() @@ -803,8 +995,8 @@ mod test_progress { .any(|e| matches!(e, ParachainConsensusEvent::SlowBitfieldPropagation(_, _)))); // Bitfields propogation is slow - tracker.current_candidate.set_backed(); - tracker.current_candidate.max_availability_bits = 200; + let candidate = tracker.candidates.entry(0).or_default().last_mut().unwrap().as_mut().unwrap(); + candidate.max_availability_bits = 200; mock_stats.expect_on_bitfields().with(eq(120), eq(true)).returning(|_, _| ()); mock_metrics .expect_on_bitfields() @@ -974,6 +1166,7 @@ mod test_progress { mock_metrics.expect_init_disputes().returning(|_| ()); mock_metrics.expect_on_bitfields().returning(|_, _, _| ()); mock_metrics.expect_on_skipped_slot().returning(|_| ()); + mock_metrics.expect_on_backed().returning(|_| ()); mock_metrics.expect_on_block().returning(|_, _| ()); // With on-demand order @@ -1005,7 +1198,9 @@ mod test_progress { let _progress = tracker.progress(&mut stats, &mock_metrics, &tracker_storage).await.unwrap(); tracker.is_on_demand_scheduled_in_current_block = false; // If backed - tracker.current_candidate.set_backed(); + let candidate = create_para_block_info(100); + tracker.candidates.entry(0).or_default().push(Some(candidate)); + tracker.cores.entry(0).or_default().push(100); mock_metrics .expect_handle_on_demand_delay() .with(eq(1), eq(100), eq("backed")) @@ -1035,7 +1230,7 @@ mod test_progress { storage_write( CollectorPrefixType::Candidate(100), candidate_hash, - create_candidate_record(100, 41, H256::random(), 40), + create_candidate_record(100, 41, None, H256::random(), 40), &storage, ) .await @@ -1043,7 +1238,7 @@ mod test_progress { // When candidate is idle tracker.current_relay_block = Some(Block { num: 42, ts: 1694095332000, hash: H256::random() }); - tracker.current_candidate.set_idle(); + tracker.candidates.entry(0).or_default().push(None); mock_stats.expect_on_skipped_slot().once().returning(|_| ()); mock_metrics.expect_on_skipped_slot().once().returning(|_| ()); let progress = tracker @@ -1057,8 +1252,9 @@ mod test_progress { .any(|e| matches!(e, ParachainConsensusEvent::SkippedSlot))); // When candidate is backed - tracker.current_candidate.set_backed(); - tracker.current_candidate.candidate_hash = Some(candidate_hash); + let candidate = ParachainBlockInfo::new(candidate_hash, 0, 0); + tracker.candidates.clear(); + tracker.candidates.entry(0).or_default().push(Some(candidate)); mock_stats.expect_on_backed().once().returning(|| ()); mock_metrics.expect_on_backed().with(eq(100)).once().returning(|_| ()); let progress = tracker @@ -1070,39 +1266,25 @@ mod test_progress { // When candidate is pending // And data is available - tracker.previous_included_at = Some(BlockWithoutHash { num: 41, ts: 1694095326000 }); - tracker.current_candidate.set_pending(); - tracker.current_candidate.max_availability_bits = 200; - tracker.current_candidate.current_availability_bits = 140; - tracker.current_candidate.bitfield_count = 150; - mock_stats - .expect_on_included() - .with(eq(42), eq(Some(41)), eq(None)) - .once() - .returning(|_, _, _| ()); - mock_metrics - .expect_on_included() - .with(eq(42), eq(Some(41)), eq(None), eq(Some(Duration::from_secs(6))), eq(100)) - .once() - .returning(|_, _, _, _, _| ()); + let candidate = tracker.candidates.entry(0).or_default().last_mut().unwrap().as_mut().unwrap(); + candidate.set_pending(); + candidate.max_availability_bits = 200; + candidate.current_availability_bits = 140; + candidate.bitfield_count = 150; + // tracker.previous_included_at = Some(BlockWithoutHash { num: 41, ts: 1694095326000 }); let progress = tracker .progress(&mut mock_stats, &mock_metrics, &tracker_storage) .await .unwrap(); - assert_eq!(progress.bitfield_health.max_bitfield_count, 200); - assert_eq!(progress.bitfield_health.available_count, 140); - assert_eq!(progress.bitfield_health.bitfield_count, 150); - assert!(progress - .events - .iter() - .any(|e| matches!(e, ParachainConsensusEvent::Included(_, _, _)))); + assert!(progress.events.is_empty()); // And availability is slow - tracker.current_candidate.max_availability_bits = 200; - tracker.current_candidate.current_availability_bits = 120; - tracker.current_candidate.bitfield_count = 150; - tracker.current_candidate.core_occupied = true; + let candidate = tracker.candidates.entry(0).or_default().last_mut().unwrap().as_mut().unwrap(); + candidate.max_availability_bits = 200; + candidate.current_availability_bits = 120; + candidate.bitfield_count = 150; + candidate.core_occupied = true; tracker.last_backed_at_block_number = Some(41); mock_stats.expect_on_slow_availability().once().returning(|| ()); mock_metrics @@ -1115,21 +1297,18 @@ mod test_progress { .await .unwrap(); - assert_eq!(progress.bitfield_health.max_bitfield_count, 200); - assert_eq!(progress.bitfield_health.available_count, 120); - assert_eq!(progress.bitfield_health.bitfield_count, 150); assert!(progress .events .iter() .any(|e| matches!(e, ParachainConsensusEvent::SlowAvailability(_, _)))); - // When candidate is included (all checks are same as for pending) - // And data is available + // When candidate is included and data is available + let candidate = tracker.candidates.entry(0).or_default().last_mut().unwrap().as_mut().unwrap(); + candidate.set_included(); + candidate.max_availability_bits = 200; + candidate.current_availability_bits = 140; + candidate.bitfield_count = 150; tracker.previous_included_at = Some(BlockWithoutHash { num: 41, ts: 1694095326000 }); - tracker.current_candidate.set_included(); - tracker.current_candidate.max_availability_bits = 200; - tracker.current_candidate.current_availability_bits = 140; - tracker.current_candidate.bitfield_count = 150; mock_stats .expect_on_included() .with(eq(42), eq(Some(41)), eq(None)) @@ -1145,38 +1324,10 @@ mod test_progress { .await .unwrap(); - assert_eq!(progress.bitfield_health.max_bitfield_count, 200); - assert_eq!(progress.bitfield_health.available_count, 140); - assert_eq!(progress.bitfield_health.bitfield_count, 150); assert!(progress .events .iter() .any(|e| matches!(e, ParachainConsensusEvent::Included(_, _, _)))); - - // And availability is slow - tracker.current_candidate.max_availability_bits = 200; - tracker.current_candidate.current_availability_bits = 120; - tracker.current_candidate.bitfield_count = 150; - tracker.current_candidate.core_occupied = true; - tracker.last_backed_at_block_number = Some(41); - mock_stats.expect_on_slow_availability().once().returning(|| ()); - mock_metrics - .expect_on_slow_availability() - .with(eq(100)) - .once() - .returning(|_| ()); - let progress = tracker - .progress(&mut mock_stats, &mock_metrics, &tracker_storage) - .await - .unwrap(); - - assert_eq!(progress.bitfield_health.max_bitfield_count, 200); - assert_eq!(progress.bitfield_health.available_count, 120); - assert_eq!(progress.bitfield_health.bitfield_count, 150); - assert!(progress - .events - .iter() - .any(|e| matches!(e, ParachainConsensusEvent::SlowAvailability(_, _)))); } #[tokio::test] @@ -1202,3 +1353,144 @@ mod test_progress { .unwrap(); } } + +#[cfg(test)] +mod test_logic { + use crate::test_utils::create_para_block_info; + + use super::*; + + fn block_with_num(num: u32) -> Block { + Block { num, hash: Default::default(), ts: Default::default() } + } + + #[test] + fn test_is_fork() { + let mut tracker = SubxtTracker::new(100); + + tracker.previous_relay_block = None; + tracker.current_relay_block = None; + assert!(!tracker.is_fork()); + + tracker.previous_relay_block = None; + tracker.current_relay_block = Some(block_with_num(42)); + assert!(!tracker.is_fork()); + + tracker.previous_relay_block = Some(block_with_num(41)); + tracker.current_relay_block = None; + assert!(!tracker.is_fork()); + + tracker.previous_relay_block = Some(block_with_num(41)); + tracker.current_relay_block = Some(block_with_num(42)); + assert!(!tracker.is_fork()); + + tracker.previous_relay_block = Some(block_with_num(42)); + tracker.current_relay_block = Some(block_with_num(42)); + assert!(tracker.is_fork()); + } + + #[test] + fn test_has_backed_candidate() { + let relay_hash = H256::default(); + let relay_number = 42; + let mut tracker = SubxtTracker::new(100); + tracker.candidates.entry(0).or_default().push(Some(create_para_block_info(100))); + tracker.candidates.entry(1).or_default().push(None); + + assert!(tracker.has_backed_candidate(0)); + assert!(!tracker.has_backed_candidate(1)); + assert!(!tracker.has_backed_candidate(2)); + + tracker.candidates.clear(); + assert!(!tracker.has_backed_candidate(0)); + + tracker.relay_forks.clear(); + tracker.relay_forks.push(ForkTracker { + relay_hash, + relay_number, + backed_candidate: None, + included_candidate: None, + }); + assert!(!tracker.has_backed_candidate(0)); + + tracker.relay_forks.clear(); + tracker.relay_forks.push(ForkTracker { + relay_hash, + relay_number, + backed_candidate: Some(H256::default()), + included_candidate: None, + }); + assert!(tracker.has_backed_candidate(0)); + + tracker.relay_forks.clear(); + tracker.relay_forks.push(ForkTracker { + relay_hash, + relay_number, + backed_candidate: None, + included_candidate: Some(H256::default()), + }); + assert!(tracker.has_backed_candidate(0)); + } + + #[test] + fn test_is_current_candidate_backed() { + let mut tracker = SubxtTracker::new(100); + + assert!(!tracker.is_current_candidate_backed(0)); + + tracker.candidates.entry(0).or_default().push(None); + assert!(!tracker.is_current_candidate_backed(0)); + + let candidate = create_para_block_info(100); + tracker.candidates.clear(); + tracker.candidates.entry(0).or_default().push(Some(candidate)); + assert!(tracker.is_current_candidate_backed(0)); + } + + #[test] + fn test_is_just_backed() { + let mut tracker = SubxtTracker::new(100); + + tracker.last_backed_at_block_number = None; + tracker.current_relay_block = Some(block_with_num(42)); + assert!(!tracker.is_just_backed()); + + tracker.last_backed_at_block_number = Some(41); + tracker.current_relay_block = Some(block_with_num(42)); + assert!(!tracker.is_just_backed()); + + tracker.last_backed_at_block_number = Some(42); + tracker.current_relay_block = Some(block_with_num(42)); + assert!(tracker.is_just_backed()); + } + + #[test] + fn test_is_slow_availability() { + let mut tracker = SubxtTracker::new(100); + + assert!(!tracker.is_slow_availability(0)); + + let mut candidate = create_para_block_info(100); + candidate.core_occupied = true; + tracker.candidates.entry(0).or_default().push(Some(candidate)); + tracker.last_backed_at_block_number = Some(42); + tracker.current_relay_block = Some(block_with_num(42)); + assert!(!tracker.is_slow_availability(0)); + + let mut candidate = create_para_block_info(100); + candidate.core_occupied = false; + tracker.candidates.clear(); + tracker.candidates.entry(0).or_default().push(Some(candidate)); + tracker.last_backed_at_block_number = Some(41); + tracker.current_relay_block = Some(block_with_num(42)); + assert!(!tracker.is_slow_availability(0)); + + let mut candidate = create_para_block_info(100); + candidate.core_occupied = true; + tracker.candidates.clear(); + tracker.candidates.entry(0).or_default().push(Some(candidate)); + tracker.last_backed_at_block_number = Some(41); + tracker.current_relay_block = Some(block_with_num(42)); + assert!(tracker.is_slow_availability(0)); + } +} diff --git a/parachain-tracer/src/tracker_storage.rs b/parachain-tracer/src/tracker_storage.rs index 18cc2871..7567ccd8 100644 --- a/parachain-tracer/src/tracker_storage.rs +++ b/parachain-tracer/src/tracker_storage.rs @@ -279,7 +279,7 @@ mod tests { hash, StorageEntry::new_onchain( RecordTime::with_ts(0, Duration::from_secs(0)), - create_candidate_record(100, 0, H256::random(), 0), + create_candidate_record(100, 0, None, H256::random(), 0), ), ) .await diff --git a/parachain-tracer/src/types.rs b/parachain-tracer/src/types.rs index ac69a96b..50956482 100644 --- a/parachain-tracer/src/types.rs +++ b/parachain-tracer/src/types.rs @@ -24,6 +24,7 @@ use polkadot_introspector_essentials::{ types::{AccountId32, BlockNumber, SubxtHrmpChannel, Timestamp, H256}, }; use std::{ + collections::HashMap, fmt::{self, Display, Formatter, Write}, time::Duration, }; @@ -99,17 +100,7 @@ impl From for BlockWithoutHash { } } -#[derive(Clone, Default)] -pub struct BitfieldsHealth { - /// Maximum bitfield count, equal to number of parachain validators. - pub max_bitfield_count: u32, - /// Current bitfield count in the relay chain block. - pub bitfield_count: u32, - /// Sum of all bits for a given parachain. - pub available_count: u32, -} - -#[derive(Clone)] +#[derive(Clone, Debug)] /// Events related to parachain blocks from consensus perspective. pub enum ParachainConsensusEvent { /// A core has been assigned to a parachain. @@ -146,10 +137,8 @@ pub struct ParachainProgressUpdate { pub block_number: BlockNumber, /// Relay chain block hash. pub block_hash: H256, - /// Bitfields health metrics. - pub bitfield_health: BitfieldsHealth, /// Core occupation. - pub core_occupied: bool, + pub core_occupied: HashMap, /// Consensus events happening for the para under a relay parent. pub events: Vec, /// If we are in the fork chain, then this flag will be `true` @@ -179,7 +168,9 @@ impl Display for ParachainProgressUpdate { )?; } writeln!(buf, "\tšŸ”— Relay block hash: {} ", format!("{:?}", self.block_hash).bold())?; - writeln!(buf, "\tšŸ„ Availability core {}", if !self.core_occupied { "FREE" } else { "OCCUPIED" })?; + for (&core, &core_occupied) in self.core_occupied.iter() { + writeln!(buf, "\tšŸ„ Availability core #{} {}", core, if core_occupied { "OCCUPIED" } else { "FREE" })?; + } writeln!( buf, "\tšŸŒ Finality lag: {}{}", diff --git a/parachain-tracer/src/utils.rs b/parachain-tracer/src/utils.rs index 599f3da2..4a181887 100644 --- a/parachain-tracer/src/utils.rs +++ b/parachain-tracer/src/utils.rs @@ -150,13 +150,13 @@ mod test_extract_inherent_fields { } } -pub(crate) fn backed_candidate( +pub(crate) fn backed_candidates_by_para_id( backed_candidates: Vec>, para_id: u32, -) -> Option> { +) -> impl Iterator> { backed_candidates .into_iter() - .find(|candidate| candidate.candidate.descriptor.para_id.0 == para_id) + .filter(move |candidate| candidate.candidate.descriptor.para_id.0 == para_id) } #[cfg(test)] @@ -166,9 +166,12 @@ mod test_backed_candidate { #[test] fn test_returns_a_candidate() { - let found = backed_candidate(vec![create_backed_candidate(100), create_backed_candidate(200)], 100).unwrap(); + let found_candidates = + backed_candidates_by_para_id(vec![create_backed_candidate(100), create_backed_candidate(200)], 100); - assert_eq!(found.candidate.descriptor.para_id.0, 100); + for found in found_candidates { + assert_eq!(found.candidate.descriptor.para_id.0, 100); + } } } @@ -226,7 +229,7 @@ mod test_extract_votes { } } -pub(crate) fn extract_availability_bits_count(bitfields: Vec, core: u32) -> u32 { +pub(crate) fn extract_availability_bits_count(bitfields: &[AvailabilityBitfield], core: u32) -> u32 { bitfields .iter() .map(|v| v.0.as_bits().get(core as usize).expect("core index must be in the bitfield") as u32) @@ -241,7 +244,7 @@ mod test_extract_availability_bits_count { #[test] fn test_counts_availability_bits() { assert_eq!( - extract_availability_bits_count(vec![AvailabilityBitfield(DecodedBits::from_iter([true, false, true]))], 0), + extract_availability_bits_count(&[AvailabilityBitfield(DecodedBits::from_iter([true, false, true]))], 0), 1 ); }