Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncChannelSource and AsyncChannelSink #12

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea
.vscode
config.toml
/target
/Cargo.lock
15 changes: 9 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,25 @@ members = [
[dependencies]
futuresdr = { git = "https://github.com/FutureSDR/FutureSDR", branch = "main" }
#futuresdr = { path = "../FutureSDR" }
async-trait = "0.1.68"
crossbeam-channel = { version = "0.5.8", optional = true }
async-channel = { version = "2.2.0", optional = true }
async-trait = "0.1.78"
crossbeam-channel = { version = "0.5.12", optional = true }
bimap = { version = "0.6.3", optional = true }
sigmf = { version = "0.1.0", path = "crates/sigmf" }
async-fs = "2.1.0"
serde = "1.0.193"
async-fs = "2.1.1"
serde = "1.0.197"

[dev-dependencies]
criterion = { version = "0.4.0", features = ["html_reports"] }
criterion = { version = "0.5.1", features = ["html_reports"] }
tokio-test = "0.4.4"
rand = { version = "0.8.5" }
quickcheck_macros = "1"
serde_json = "1.0.108"
serde_json = "1.0.114"

[features]
default = []
crossbeam = ["dep:crossbeam-channel"]
async-channel = ["dep:async-channel"]
cw = ["dep:bimap"]

[[bench]]
Expand Down
2 changes: 1 addition & 1 deletion benches/channel/crossbeam_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn crossbeam_sink_boxed_slice_u32(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(n_samp as u64));

group.bench_function(format!("mock-u32-crossbeam-sink"), |b| {
group.bench_function("mock-u32-crossbeam-sink", |b| {
b.iter(|| {
let block = CrossbeamSink::new_typed(tx.clone());
let mut mocker = Mocker::new(block);
Expand Down
2 changes: 1 addition & 1 deletion benches/channel/crossbeam_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn crossbeam_source_boxed_slice_u32(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(n_samp as u64));

group.bench_function(format!("mock-u32-crossbeam-source"), |b| {
group.bench_function("mock-u32-crossbeam-source", |b| {
b.iter(|| {
let block = CrossbeamSource::new_typed(rx.clone());
let mut mocker = Mocker::new(block);
Expand Down
2 changes: 1 addition & 1 deletion benches/cw/baseband_to_cw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn bench_baseband_to_cw(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(baseband.len() as u64));

group.bench_function(format!("mock-baseband-to-cw"), |b| {
group.bench_function("mock-baseband-to-cw", |b| {
b.iter(|| {
let block = BaseBandToCW::new_typed(100, samples_per_dot);
let mut mocker = Mocker::new(block);
Expand Down
2 changes: 1 addition & 1 deletion benches/cw/cw_to_char.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub fn bench_cw_to_char(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(cw.len() as u64));

group.bench_function(format!("mock-cw-to-char"), |b| {
group.bench_function("mock-cw-to-char", |b| {
b.iter(|| {
let block = CWToChar::new_typed(get_alphabet());
let mut mocker = Mocker::new(block);
Expand Down
4 changes: 2 additions & 2 deletions benches/cw/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn bench_char_to_baseband(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(bb.len() as u64));

group.bench_function(format!("char_to_bb"), |b| {
group.bench_function("char_to_bb", |b| {
b.iter(|| {
message
.chars()
Expand All @@ -42,7 +42,7 @@ pub fn bench_msg_to_cw(c: &mut Criterion) {

group.throughput(criterion::Throughput::Elements(msg_slice.len() as u64));

group.bench_function(format!("msg_to_cw"), |b| {
group.bench_function("msg_to_cw", |b| {
b.iter(|| {
msg_to_cw(msg_slice);
});
Expand Down
4 changes: 2 additions & 2 deletions check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ cd ${SCRIPTPATH} && cargo fmt --check
###########################################################
# CLIPPY
###########################################################
cd ${SCRIPTPATH} && cargo clippy --all-targets --workspace
cd ${SCRIPTPATH} && cargo clippy --all-targets --all-features --workspace

###########################################################
# Test
###########################################################
cd ${SCRIPTPATH} && cargo test --all-targets --workspace
cd ${SCRIPTPATH} && cargo test --all-targets --all-features --workspace
81 changes: 81 additions & 0 deletions src/async_channel/async_channel_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use async_channel::Sender;

use futuresdr::anyhow::Result;
use futuresdr::log::info;
use futuresdr::runtime::Block;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
use futuresdr::runtime::MessageIo;
use futuresdr::runtime::MessageIoBuilder;
use futuresdr::runtime::StreamIo;
use futuresdr::runtime::StreamIoBuilder;
use futuresdr::runtime::WorkIo;

/// Get samples out of a Flowgraph into a channel.
///
/// # Inputs
///
/// `in`: Samples retrieved from teh flowgraph
///
/// # Usage
/// ```
/// use async_channel;
/// use futuresdr::blocks::VectorSource;
/// use fsdr-blocks::blocks::AsyncChannelSink
/// use futuresdr::runtime::Flowgraph;
///
/// let mut fg = Flowgraph::new();
/// let (tx, rx) = async_channel::unbounded::<Box<[u32]>>();
/// let vec = vec![0, 1, 2];
/// let src = fg.add_block(VectorSource::<u32>::new(vec));
/// let cs = fg.add_block(AsyncChannelSink::<u32>::new(tx));
/// // start flowgraph
/// ```
pub struct AsyncChannelSink<T: Send + 'static> {
sender: Sender<Box<[T]>>,
}

impl<T: Send + Clone + 'static> AsyncChannelSink<T> {
#[allow(clippy::new_ret_no_self)]
pub fn new(sender: Sender<Box<[T]>>) -> Block {
Block::new(
BlockMetaBuilder::new("AsyncChannelSink").build(),
StreamIoBuilder::new().add_input::<T>("in").build(),
MessageIoBuilder::new().build(),
AsyncChannelSink::<T> { sender },
)
}
}

#[doc(hidden)]
#[async_trait]
impl<T: Send + Clone + 'static> Kernel for AsyncChannelSink<T> {
async fn work(
&mut self,
io: &mut WorkIo,
sio: &mut StreamIo,
_mio: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let i = sio.input(0).slice::<T>();

if !i.is_empty() {
match self.sender.try_send(i.into()) {
Ok(_) => {
info!("sent data...");
}
Err(err) => {
info!("{}", err.to_string());
}
}
sio.input(0).consume(i.len());
}

if sio.input(0).finished() {
io.finished = true;
}

Ok(())
}
}
103 changes: 103 additions & 0 deletions src/async_channel/async_channel_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use async_channel::Receiver;
use futuresdr::futures::StreamExt;

use futuresdr::anyhow::Result;
use futuresdr::log::info;
use futuresdr::runtime::Block;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
use futuresdr::runtime::MessageIo;
use futuresdr::runtime::MessageIoBuilder;
use futuresdr::runtime::StreamIo;
use futuresdr::runtime::StreamIoBuilder;
use futuresdr::runtime::WorkIo;

/// Push samples through a channel into a stream connection.
///
/// # Outputs
///
/// `out`: Samples pushed into the channel
///
/// # Usage
/// ```
/// use async_channel;
/// use fsdr-blocks::blocks::AsyncChannelSource;
/// use futuresdr::runtime::Flowgraph;
///
/// let mut fg = Flowgraph::new();
/// let (tx, rx) = async_channel::unbounded::<Box<[u32]>>();
///
/// let async_channel_src = fg.add_block(AsyncChannelSource::<u32>::new(rx));
///
/// tx.send(orig.clone().into_boxed_slice()).await.unwrap();
/// ```
pub struct AsyncChannelSource<T: Send + 'static> {
receiver: Receiver<Box<[T]>>,
current: Option<(Box<[T]>, usize)>,
}

impl<T: Send + 'static> AsyncChannelSource<T> {
#[allow(clippy::new_ret_no_self)]
pub fn new(receiver: Receiver<Box<[T]>>) -> Block {
Block::new(
BlockMetaBuilder::new("AsyncChannelSource").build(),
StreamIoBuilder::new().add_output::<T>("out").build(),
MessageIoBuilder::new().build(),
AsyncChannelSource::<T> {
receiver,
current: None,
},
)
}
}

#[doc(hidden)]
#[async_trait]
impl<T: Send + 'static> Kernel for AsyncChannelSource<T> {
async fn work(
&mut self,
io: &mut WorkIo,
sio: &mut StreamIo,
_mio: &mut MessageIo<Self>,
_meta: &mut BlockMeta,
) -> Result<()> {
let out = sio.output(0).slice::<T>();
if out.is_empty() {
return Ok(());
}

if self.current.is_none() {
match self.receiver.by_ref().recv().await {
//.by_ref().next().await
Ok(data) => {
info!("received data chunk on channel");
self.current = Some((data, 0));
}
Err(_err) => {
info!("sender-end of channel was closed");
io.finished = true;
return Ok(());
}
}
}

if let Some((data, index)) = &mut self.current {
let n = std::cmp::min(data.len() - *index, out.len());
unsafe {
std::ptr::copy_nonoverlapping(data.as_ptr().add(*index), out.as_mut_ptr(), n);
};
sio.output(0).produce(n);
*index += n;
if *index == data.len() {
self.current = None;
}
}

if self.current.is_none() {
io.call_again = true;
}

Ok(())
}
}
6 changes: 6 additions & 0 deletions src/async_channel/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//! ## Blocks related to channels
mod async_channel_sink;
mod async_channel_source;

pub use async_channel_sink::AsyncChannelSink;
pub use async_channel_source::AsyncChannelSource;
2 changes: 1 addition & 1 deletion src/channel/crossbeam_sink.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use crossbeam_channel::Sender;
use futuresdr::anyhow::Result;
use futuresdr::async_trait::async_trait;
use futuresdr::log::info;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
Expand Down
2 changes: 1 addition & 1 deletion src/channel/crossbeam_source.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;
use crossbeam_channel::{Receiver, TryRecvError};
use futuresdr::anyhow::Result;
use futuresdr::async_trait::async_trait;
use futuresdr::log::debug;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
Expand Down
2 changes: 1 addition & 1 deletion src/cw/baseband_to_cw.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use std::ops::RangeInclusive;

use futuresdr::anyhow::Result;
use futuresdr::async_trait::async_trait;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
Expand Down
3 changes: 2 additions & 1 deletion src/cw/cw_to_char.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_trait::async_trait;

use futuresdr::anyhow::Result;
use futuresdr::async_trait::async_trait;
use futuresdr::runtime::BlockMeta;
use futuresdr::runtime::BlockMetaBuilder;
use futuresdr::runtime::Kernel;
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pub extern crate async_trait;
#[cfg(feature = "crossbeam")]
pub mod channel;

#[cfg(feature = "async-channel")]
pub mod async_channel;

#[cfg(feature = "cw")]
pub mod cw;

Expand Down
1 change: 0 additions & 1 deletion src/serde_pmt/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std;
use std::fmt::{self, Display};

use serde::{de, ser};
Expand Down
8 changes: 4 additions & 4 deletions src/serde_pmt/serialiser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ impl<'a> serde::Serializer for &'a mut Serializer {
Ok(Pmt::Null)
}

fn serialize_some<T: ?Sized>(self, value: &T) -> Result<Self::Ok>
fn serialize_some<T>(self, value: &T) -> Result<Self::Ok>
where
T: Serialize,
T: Serialize + ?Sized,
{
value.serialize(self)
}
Expand All @@ -119,13 +119,13 @@ impl<'a> serde::Serializer for &'a mut Serializer {
self.serialize_str(variant)
}

fn serialize_newtype_struct<T: ?Sized>(
fn serialize_newtype_struct<T>(
self,
_name: &'static str,
value: &T,
) -> std::prelude::v1::Result<Self::Ok, Self::Error>
where
T: Serialize,
T: Serialize + ?Sized,
{
value.serialize(self)
}
Expand Down
Loading
Loading