diff --git a/Cargo.toml b/Cargo.toml index acab9c8..d8866e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,7 @@ aaronia-rtsa = { version = "0.0.6", optional = true } [dev-dependencies] clap = { version = "4.2", features = ["derive"] } +ctrlc = "3.4" env_logger = "0.10.0" gnuplot = "0.0.38" rustfft = "6.1" diff --git a/examples/rx_threaded.rs b/examples/rx_threaded.rs index 0a9a1e1..21ddae3 100644 --- a/examples/rx_threaded.rs +++ b/examples/rx_threaded.rs @@ -1,10 +1,13 @@ use clap::Parser; use num_complex::Complex32; +use std::error::Error; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; use vmcircbuffer::sync; use seify::Device; use seify::Direction::Rx; -use seify::Error; use seify::RxStreamer; #[derive(Parser, Debug)] @@ -15,7 +18,7 @@ struct Args { args: String, } -pub fn main() -> Result<(), Box> { +pub fn main() -> Result<(), Box> { env_logger::init(); let cli = Args::parse(); @@ -32,24 +35,48 @@ pub fn main() -> Result<(), Box> { let mut r = w.add_reader(); // producer thread - std::thread::spawn(move || -> Result<(), Error> { - let mut rx = dev.rx_streamer(&[0])?; - let mtu = rx.mtu()?; - rx.activate()?; - - loop { - let w_buff = w.slice(); - let n = std::cmp::min(w_buff.len(), mtu); - let n = rx.read(&mut [&mut w_buff[0..n]], 200000)?; - w.produce(n); + let terminate = Arc::new(AtomicBool::new(false)); + let rx_thread = std::thread::spawn({ + let terminate = terminate.clone(); + move || -> Result<(), Box> { + let mut rx = dev.rx_streamer(&[0])?; + let mtu = rx.mtu()?; + rx.activate()?; + + loop { + if terminate.load(Ordering::Relaxed) { + break Ok(()); + } + let w_buff = w.slice(); + let n = std::cmp::min(w_buff.len(), mtu); + let n = rx.read(&mut [&mut w_buff[0..n]], 200000)?; + w.produce(n); + } } }); + ctrlc::set_handler({ + let terminate = terminate.clone(); + move || { + println!("terminating..."); + terminate.store(true, Ordering::Relaxed); + } + }) + .expect("Error setting Ctrl-C handler"); + // consumer loop { + if terminate.load(Ordering::Relaxed) { + break; + } let r_buff = r.slice().unwrap(); let l = r_buff.len(); println!("received {l} samples"); r.consume(l); } + + if let Err(e) = rx_thread.join() { + std::panic::resume_unwind(e); + } + Ok(()) }