Skip to content

Commit

Permalink
example: fix termination of threaded example
Browse files Browse the repository at this point in the history
  • Loading branch information
bastibl committed Sep 22, 2023
1 parent 2dc11da commit c89eab0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 12 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ aaronia-rtsa = { version = "0.0.6", optional = true }

[dev-dependencies]
clap = { version = "4.2", features = ["derive"] }
ctrlc = "3.4"
env_logger = "0.10.0"
gnuplot = "0.0.38"
rustfft = "6.1"
Expand Down
51 changes: 39 additions & 12 deletions examples/rx_threaded.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use clap::Parser;
use num_complex::Complex32;
use std::error::Error;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use vmcircbuffer::sync;

use seify::Device;
use seify::Direction::Rx;
use seify::Error;
use seify::RxStreamer;

#[derive(Parser, Debug)]
Expand All @@ -15,7 +18,7 @@ struct Args {
args: String,
}

pub fn main() -> Result<(), Box<dyn std::error::Error>> {
pub fn main() -> Result<(), Box<dyn Error>> {
env_logger::init();
let cli = Args::parse();

Expand All @@ -32,24 +35,48 @@ pub fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut r = w.add_reader();

// producer thread
std::thread::spawn(move || -> Result<(), Error> {
let mut rx = dev.rx_streamer(&[0])?;
let mtu = rx.mtu()?;
rx.activate()?;

loop {
let w_buff = w.slice();
let n = std::cmp::min(w_buff.len(), mtu);
let n = rx.read(&mut [&mut w_buff[0..n]], 200000)?;
w.produce(n);
let terminate = Arc::new(AtomicBool::new(false));
let rx_thread = std::thread::spawn({
let terminate = terminate.clone();
move || -> Result<(), Box<dyn Error + Send + Sync>> {
let mut rx = dev.rx_streamer(&[0])?;
let mtu = rx.mtu()?;
rx.activate()?;

loop {
if terminate.load(Ordering::Relaxed) {
break Ok(());
}
let w_buff = w.slice();
let n = std::cmp::min(w_buff.len(), mtu);
let n = rx.read(&mut [&mut w_buff[0..n]], 200000)?;
w.produce(n);
}
}
});

ctrlc::set_handler({
let terminate = terminate.clone();
move || {
println!("terminating...");
terminate.store(true, Ordering::Relaxed);
}
})
.expect("Error setting Ctrl-C handler");

// consumer
loop {
if terminate.load(Ordering::Relaxed) {
break;
}
let r_buff = r.slice().unwrap();
let l = r_buff.len();
println!("received {l} samples");
r.consume(l);
}

if let Err(e) = rx_thread.join() {
std::panic::resume_unwind(e);
}
Ok(())
}

0 comments on commit c89eab0

Please sign in to comment.