Skip to content

Commit

Permalink
Fix skipped slots tracking (#750)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiEres authored Aug 1, 2024
1 parent 5f660a1 commit 5ed8947
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 14 deletions.
41 changes: 39 additions & 2 deletions essentials/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub struct CollectorOptions {
pub hrmp_channels: bool,
#[clap(short = 's', long = "subscribe-mode", default_value_t, value_enum)]
pub subscribe_mode: CollectorSubscribeMode,
/// Evict a stalled parachain after this amount of skipped blocks
#[clap(long, default_value = "256")]
max_parachain_stall: u32,
}

/// How to subscribe to subxt blocks
Expand Down Expand Up @@ -148,6 +151,8 @@ struct CollectorState {
current_session_index: u32,
/// Last finalized block number
last_finalized_block_number: Option<u32>,
/// A list of paras for broadcasting
paras_seen: BTreeMap<u32, u32>,
}

/// Provides collector new head events split by parachain
Expand Down Expand Up @@ -208,6 +213,7 @@ pub struct Collector {
executor: RequestExecutor,
subscribe_mode: CollectorSubscribeMode,
hrmp_channels: bool,
max_parachain_stall: u32,
}

impl Collector {
Expand Down Expand Up @@ -236,6 +242,7 @@ impl Collector {
executor,
subscribe_mode: opts.subscribe_mode,
hrmp_channels: opts.hrmp_channels,
max_parachain_stall: opts.max_parachain_stall,
}
}

Expand Down Expand Up @@ -419,7 +426,37 @@ impl Collector {
}

for broadcast_channel in self.broadcast_channels.iter_mut() {
for (para_id, candidates) in self.state.candidates_seen.iter() {
// Update a list of current parachains
for para_id in self.state.candidates_seen.keys() {
self.state
.paras_seen
.insert(*para_id, self.state.current_relay_chain_block_number);
}
let to_evict: Vec<_> = self
.state
.paras_seen
.iter()
.filter_map(|(para_id, last_block)| {
let is_active =
self.state.current_relay_chain_block_number - *last_block > self.max_parachain_stall;
if is_active {
Some(*para_id)
} else {
None
}
})
.collect();
for para_id in to_evict {
let last_seen = self.state.paras_seen.remove(&para_id).expect("checked previously, qed");
info!(
"evicting tracker for parachain {}, stalled for {} blocks",
para_id,
self.state.current_relay_chain_block_number - last_seen
);
}

for para_id in self.state.paras_seen.keys() {
let candidates = self.state.candidates_seen.get(para_id);
let disputes_concluded = self.state.disputes_seen.get(para_id).map(|disputes_seen| {
disputes_seen
.iter()
Expand All @@ -431,7 +468,7 @@ impl Collector {
.send(CollectorUpdateEvent::NewHead(NewHeadEvent {
relay_parent_hashes: self.state.current_relay_chain_block_hashes.clone(),
relay_parent_number: self.state.current_relay_chain_block_number,
candidates_seen: candidates.clone(),
candidates_seen: candidates.cloned().unwrap_or_default(),
disputes_concluded: disputes_concluded.clone().unwrap_or_default(),
para_id: *para_id,
}))
Expand Down
17 changes: 5 additions & 12 deletions parachain-tracer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,6 @@ pub(crate) struct ParachainTracerOptions {
/// The number of last blocks with missing slots to display
#[clap(long = "last-skipped-slot-blocks", default_value = "10")]
pub last_skipped_slot_blocks: usize,
/// Evict a stalled parachain after this amount of skipped blocks
#[clap(long, default_value = "256")]
max_parachain_stall: u32,
/// Defines subscription mode
#[clap(flatten)]
collector_opts: CollectorOptions,
Expand Down Expand Up @@ -275,7 +272,6 @@ impl ParachainTracer {
// for the practical reasons we are fine to do a hash map scan on each head.
let mut last_blocks: HashMap<u32, u32> = HashMap::new();
let mut best_known_block: u32 = 0;
let max_stall = self.opts.max_parachain_stall;
let mut futures = FuturesUnordered::new();
let mut from_collector = Box::pin(from_collector);

Expand Down Expand Up @@ -304,7 +300,7 @@ impl ParachainTracer {

if last_known_block > best_known_block {
best_known_block = last_known_block;
evict_stalled(&mut trackers, &mut last_blocks, max_stall);
evict_stalled(&mut trackers, &mut last_blocks);
}
},
CollectorUpdateEvent::NewSession(idx) =>
Expand Down Expand Up @@ -340,16 +336,13 @@ impl ParachainTracer {
}
}

fn evict_stalled(
trackers: &mut HashMap<u32, Sender<CollectorUpdateEvent>>,
last_blocks: &mut HashMap<u32, u32>,
max_stall: u32,
) {
fn evict_stalled(trackers: &mut HashMap<u32, Sender<CollectorUpdateEvent>>, last_blocks: &mut HashMap<u32, u32>) {
let max_block = *last_blocks.values().max().unwrap_or(&0_u32);
// Collectors keep sending events to stalled paras for a particular amount of blocks.
// So we can remove it immediately after we stopped receiving events.
let to_evict: Vec<u32> = last_blocks
.iter()
.filter(|(_, last_block)| max_block - *last_block > max_stall)
.map(|(para_id, _)| *para_id)
.filter_map(|(para_id, last_block)| if max_block > *last_block { Some(*para_id) } else { None })
.collect();
for para_id in to_evict {
let last_seen = last_blocks.remove(&para_id).expect("checked previously, qed");
Expand Down

0 comments on commit 5ed8947

Please sign in to comment.