Skip to content

Commit

Permalink
Halt runloop when jobs panic
Browse files Browse the repository at this point in the history
This involved quite a bit of fiddling. I couldn't get rayon's built-in
panic_handler to work, so I ended up rolling my own solution, where we
manually catch unwinds that occur in spawned jobs.

When a panic occurs, we will set an atomic flag and return an error. We
check this flag before beginning any scheduled job, and return
immediately if it is set.
  • Loading branch information
cmyr committed Jul 26, 2023
1 parent 74fc37c commit c624f76
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 5 deletions.
5 changes: 3 additions & 2 deletions fontc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ fn main() -> Result<(), Error> {
let ts = buf.timestamp_micros();
writeln!(
buf,
"[{ts} {:?} {} {}] {}",
std::thread::current().id(),
"[{ts} {} {} {}] {}",
// we manually assign all threads a name
std::thread::current().name().unwrap_or("unknown"),
record.target(),
buf.default_level_style(record.level())
.value(record.level()),
Expand Down
2 changes: 2 additions & 0 deletions fontc/src/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use fontir::{
pub enum AnyWorkError {
Fe(FeError),
Be(BeError),
Panic(String),
}

impl From<BeError> for AnyWorkError {
Expand All @@ -37,6 +38,7 @@ impl Display for AnyWorkError {
match self {
AnyWorkError::Be(e) => e.fmt(f),
AnyWorkError::Fe(e) => e.fmt(f),
AnyWorkError::Panic(e) => write!(f, "Job panicked: '{e}'"),
}
}
}
Expand Down
64 changes: 61 additions & 3 deletions fontc/src/workload.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
//! Tracking jobs to run

use std::collections::{HashMap, HashSet};
use rayon::ThreadPoolBuilder;
use std::{
collections::{HashMap, HashSet},
panic::AssertUnwindSafe,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use crossbeam_channel::{Receiver, TryRecvError};
use fontbe::orchestration::{AnyWorkId, Context as BeContext};
Expand Down Expand Up @@ -191,7 +199,15 @@ impl<'a> Workload<'a> {
// Async work will send us it's ID on completion
let (send, recv) = crossbeam_channel::unbounded::<(AnyWorkId, Result<(), AnyWorkError>)>();

rayon::in_place_scope(|scope| {
// a flag we set if we panic
let abort_queued_jobs = Arc::new(AtomicBool::new(false));
// build a custom threadpool. we use this to ensure all threads are named
let threadpool = ThreadPoolBuilder::new()
.thread_name(|n| format!("tp{n}"))
.build()
.expect("failed to init thread pool");

threadpool.in_place_scope(|scope| {
// Whenever a task completes see if it was the last incomplete dependency of other task(s)
// and spawn them if it was
// TODO timeout and die it if takes too long to make forward progress or we're spinning w/o progress
Expand Down Expand Up @@ -228,8 +244,38 @@ impl<'a> Workload<'a> {
job.write_access.clone(),
);

let abort = abort_queued_jobs.clone();

scope.spawn(move |_| {
let result = work.exec(work_context);
if abort.load(Ordering::Relaxed) {
log::trace!("Aborting {:?}", work.id());
return;
}
// # Unwind Safety
//
// 'unwind safety' does not impact memory safety, but
// it may impact program correctness; the thread may have
// left shared memory in an inconsistent state.
//
// I believe this is not a concern for us, as we cancel any
// pending jobs after seeing a panic and jobs that depend
// on state produced by the panicking job must be scheduled
// after it. Unless we have jobs that are mutating
// shared resources then I think this is fine.
//
// references:
// <https://doc.rust-lang.org/nomicon/exception-safety.html#exception-safety>
// <https://doc.rust-lang.org/std/panic/trait.UnwindSafe.html>
let result = match std::panic::catch_unwind(AssertUnwindSafe(|| {
work.exec(work_context)
})) {
Ok(result) => result,
Err(err) => {
let msg = get_panic_message(err);
abort.store(true, Ordering::Relaxed);
Err(AnyWorkError::Panic(msg))
}
};
if let Err(e) = send.send((id.clone(), result)) {
log::error!("Unable to write {id:?} to completion channel: {e}");
}
Expand Down Expand Up @@ -387,3 +433,15 @@ impl<'a> Workload<'a> {
self.success.difference(&pre_success).cloned().collect()
}
}

// taken from std:
// <https://github.com/rust-lang/rust/blob/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/panicking.rs#L247-L253>
fn get_panic_message(msg: Box<dyn std::any::Any + Send + 'static>) -> String {
match msg.downcast_ref::<&'static str>() {
Some(s) => s.to_string(),
None => match msg.downcast_ref::<String>() {
Some(s) => s.to_owned(),
None => "Box<dyn Any>".to_owned(),
},
}
}

0 comments on commit c624f76

Please sign in to comment.