From 1671ad38f56323b698b9b57bb7969908bae50a0c Mon Sep 17 00:00:00 2001 From: ratzrattillo <11601995+ratzrattillo@users.noreply.github.com> Date: Wed, 20 Mar 2024 23:40:39 +0100 Subject: [PATCH] Add AsyncChannelSource and AsyncChannelSink Fix wrong Imports in CrossbeamChannel Update Dependencies Fixed Tests Default Test "All Features" in check.sh Fix clippy warnings --- .gitignore | 1 + Cargo.toml | 15 +-- benches/channel/crossbeam_sink.rs | 2 +- benches/channel/crossbeam_source.rs | 2 +- benches/cw/baseband_to_cw.rs | 2 +- benches/cw/cw_to_char.rs | 2 +- benches/cw/shared.rs | 4 +- check.sh | 4 +- src/async_channel/async_channel_sink.rs | 81 +++++++++++++++ src/async_channel/async_channel_source.rs | 103 ++++++++++++++++++++ src/async_channel/mod.rs | 6 ++ src/channel/crossbeam_sink.rs | 2 +- src/channel/crossbeam_source.rs | 2 +- src/cw/baseband_to_cw.rs | 2 +- src/cw/cw_to_char.rs | 3 +- src/lib.rs | 3 + src/serde_pmt/error.rs | 1 - src/serde_pmt/serialiser.rs | 8 +- tests/async_channel/async_channel_sink.rs | 29 ++++++ tests/async_channel/async_channel_source.rs | 44 +++++++++ tests/async_channel/mod.rs | 2 + tests/channel/crossbeam_source.rs | 1 - tests/cw/shared.rs | 2 +- tests/tests.rs | 2 + 24 files changed, 298 insertions(+), 25 deletions(-) create mode 100644 src/async_channel/async_channel_sink.rs create mode 100644 src/async_channel/async_channel_source.rs create mode 100644 src/async_channel/mod.rs create mode 100644 tests/async_channel/async_channel_sink.rs create mode 100644 tests/async_channel/async_channel_source.rs create mode 100644 tests/async_channel/mod.rs diff --git a/.gitignore b/.gitignore index 489647c..ac3f1d3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea +.vscode config.toml /target /Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index 5aa0254..c3efc8c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,22 +22,25 @@ members = [ [dependencies] futuresdr = { git = "https://github.com/FutureSDR/FutureSDR", branch = "main" } #futuresdr = { path = "../FutureSDR" } -async-trait = "0.1.68" -crossbeam-channel = { version = "0.5.8", optional = true } +async-channel = { version = "2.2.0", optional = true } +async-trait = "0.1.78" +crossbeam-channel = { version = "0.5.12", optional = true } bimap = { version = "0.6.3", optional = true } sigmf = { version = "0.1.0", path = "crates/sigmf" } -async-fs = "2.1.0" -serde = "1.0.193" +async-fs = "2.1.1" +serde = "1.0.197" [dev-dependencies] -criterion = { version = "0.4.0", features = ["html_reports"] } +criterion = { version = "0.5.1", features = ["html_reports"] } +tokio-test = "0.4.4" rand = { version = "0.8.5" } quickcheck_macros = "1" -serde_json = "1.0.108" +serde_json = "1.0.114" [features] default = [] crossbeam = ["dep:crossbeam-channel"] +async-channel = ["dep:async-channel"] cw = ["dep:bimap"] [[bench]] diff --git a/benches/channel/crossbeam_sink.rs b/benches/channel/crossbeam_sink.rs index 570bfd6..1466663 100644 --- a/benches/channel/crossbeam_sink.rs +++ b/benches/channel/crossbeam_sink.rs @@ -19,7 +19,7 @@ pub fn crossbeam_sink_boxed_slice_u32(c: &mut Criterion) { group.throughput(criterion::Throughput::Elements(n_samp as u64)); - group.bench_function(format!("mock-u32-crossbeam-sink"), |b| { + group.bench_function("mock-u32-crossbeam-sink", |b| { b.iter(|| { let block = CrossbeamSink::new_typed(tx.clone()); let mut mocker = Mocker::new(block); diff --git a/benches/channel/crossbeam_source.rs b/benches/channel/crossbeam_source.rs index f4f0bd9..f623d36 100644 --- a/benches/channel/crossbeam_source.rs +++ b/benches/channel/crossbeam_source.rs @@ -17,7 +17,7 @@ pub fn crossbeam_source_boxed_slice_u32(c: &mut Criterion) { group.throughput(criterion::Throughput::Elements(n_samp as u64)); - group.bench_function(format!("mock-u32-crossbeam-source"), |b| { + group.bench_function("mock-u32-crossbeam-source", |b| { b.iter(|| { let block = CrossbeamSource::new_typed(rx.clone()); let mut mocker = Mocker::new(block); diff --git a/benches/cw/baseband_to_cw.rs b/benches/cw/baseband_to_cw.rs index 0961310..b0d86d3 100644 --- a/benches/cw/baseband_to_cw.rs +++ b/benches/cw/baseband_to_cw.rs @@ -19,7 +19,7 @@ pub fn bench_baseband_to_cw(c: &mut Criterion) { group.throughput(criterion::Throughput::Elements(baseband.len() as u64)); - group.bench_function(format!("mock-baseband-to-cw"), |b| { + group.bench_function("mock-baseband-to-cw", |b| { b.iter(|| { let block = BaseBandToCW::new_typed(100, samples_per_dot); let mut mocker = Mocker::new(block); diff --git a/benches/cw/cw_to_char.rs b/benches/cw/cw_to_char.rs index 012ae8f..83e36a1 100644 --- a/benches/cw/cw_to_char.rs +++ b/benches/cw/cw_to_char.rs @@ -16,7 +16,7 @@ pub fn bench_cw_to_char(c: &mut Criterion) { group.throughput(criterion::Throughput::Elements(cw.len() as u64)); - group.bench_function(format!("mock-cw-to-char"), |b| { + group.bench_function("mock-cw-to-char", |b| { b.iter(|| { let block = CWToChar::new_typed(get_alphabet()); let mut mocker = Mocker::new(block); diff --git a/benches/cw/shared.rs b/benches/cw/shared.rs index cb574f7..587ce2f 100644 --- a/benches/cw/shared.rs +++ b/benches/cw/shared.rs @@ -17,7 +17,7 @@ pub fn bench_char_to_baseband(c: &mut Criterion) { group.throughput(criterion::Throughput::Elements(bb.len() as u64)); - group.bench_function(format!("char_to_bb"), |b| { + group.bench_function("char_to_bb", |b| { b.iter(|| { message .chars() @@ -42,7 +42,7 @@ pub fn bench_msg_to_cw(c: &mut Criterion) { group.throughput(criterion::Throughput::Elements(msg_slice.len() as u64)); - group.bench_function(format!("msg_to_cw"), |b| { + group.bench_function("msg_to_cw", |b| { b.iter(|| { msg_to_cw(msg_slice); }); diff --git a/check.sh b/check.sh index c6e1f74..d2ee224 100755 --- a/check.sh +++ b/check.sh @@ -13,9 +13,9 @@ cd ${SCRIPTPATH} && cargo fmt --check ########################################################### # CLIPPY ########################################################### -cd ${SCRIPTPATH} && cargo clippy --all-targets --workspace +cd ${SCRIPTPATH} && cargo clippy --all-targets --all-features --workspace ########################################################### # Test ########################################################### -cd ${SCRIPTPATH} && cargo test --all-targets --workspace +cd ${SCRIPTPATH} && cargo test --all-targets --all-features --workspace diff --git a/src/async_channel/async_channel_sink.rs b/src/async_channel/async_channel_sink.rs new file mode 100644 index 0000000..e7f1363 --- /dev/null +++ b/src/async_channel/async_channel_sink.rs @@ -0,0 +1,81 @@ +use async_channel::Sender; + +use futuresdr::anyhow::Result; +use futuresdr::log::info; +use futuresdr::runtime::Block; +use futuresdr::runtime::BlockMeta; +use futuresdr::runtime::BlockMetaBuilder; +use futuresdr::runtime::Kernel; +use futuresdr::runtime::MessageIo; +use futuresdr::runtime::MessageIoBuilder; +use futuresdr::runtime::StreamIo; +use futuresdr::runtime::StreamIoBuilder; +use futuresdr::runtime::WorkIo; + +/// Get samples out of a Flowgraph into a channel. +/// +/// # Inputs +/// +/// `in`: Samples retrieved from teh flowgraph +/// +/// # Usage +/// ``` +/// use async_channel; +/// use futuresdr::blocks::VectorSource; +/// use fsdr-blocks::blocks::AsyncChannelSink +/// use futuresdr::runtime::Flowgraph; +/// +/// let mut fg = Flowgraph::new(); +/// let (tx, rx) = async_channel::unbounded::>(); +/// let vec = vec![0, 1, 2]; +/// let src = fg.add_block(VectorSource::::new(vec)); +/// let cs = fg.add_block(AsyncChannelSink::::new(tx)); +/// // start flowgraph +/// ``` +pub struct AsyncChannelSink { + sender: Sender>, +} + +impl AsyncChannelSink { + #[allow(clippy::new_ret_no_self)] + pub fn new(sender: Sender>) -> Block { + Block::new( + BlockMetaBuilder::new("AsyncChannelSink").build(), + StreamIoBuilder::new().add_input::("in").build(), + MessageIoBuilder::new().build(), + AsyncChannelSink:: { sender }, + ) + } +} + +#[doc(hidden)] +#[async_trait] +impl Kernel for AsyncChannelSink { + async fn work( + &mut self, + io: &mut WorkIo, + sio: &mut StreamIo, + _mio: &mut MessageIo, + _meta: &mut BlockMeta, + ) -> Result<()> { + let i = sio.input(0).slice::(); + + if !i.is_empty() { + match self.sender.try_send(i.into()) { + Ok(_) => { + info!("sent data..."); + } + Err(err) => { + info!("{}", err.to_string()); + } + } + sio.input(0).consume(i.len()); + } + + if sio.input(0).finished() { + io.finished = true; + } + + Ok(()) + } +} diff --git a/src/async_channel/async_channel_source.rs b/src/async_channel/async_channel_source.rs new file mode 100644 index 0000000..014f287 --- /dev/null +++ b/src/async_channel/async_channel_source.rs @@ -0,0 +1,103 @@ +use async_channel::Receiver; +use futuresdr::futures::StreamExt; + +use futuresdr::anyhow::Result; +use futuresdr::log::info; +use futuresdr::runtime::Block; +use futuresdr::runtime::BlockMeta; +use futuresdr::runtime::BlockMetaBuilder; +use futuresdr::runtime::Kernel; +use futuresdr::runtime::MessageIo; +use futuresdr::runtime::MessageIoBuilder; +use futuresdr::runtime::StreamIo; +use futuresdr::runtime::StreamIoBuilder; +use futuresdr::runtime::WorkIo; + +/// Push samples through a channel into a stream connection. +/// +/// # Outputs +/// +/// `out`: Samples pushed into the channel +/// +/// # Usage +/// ``` +/// use async_channel; +/// use fsdr-blocks::blocks::AsyncChannelSource; +/// use futuresdr::runtime::Flowgraph; +/// +/// let mut fg = Flowgraph::new(); +/// let (tx, rx) = async_channel::unbounded::>(); +/// +/// let async_channel_src = fg.add_block(AsyncChannelSource::::new(rx)); +/// +/// tx.send(orig.clone().into_boxed_slice()).await.unwrap(); +/// ``` +pub struct AsyncChannelSource { + receiver: Receiver>, + current: Option<(Box<[T]>, usize)>, +} + +impl AsyncChannelSource { + #[allow(clippy::new_ret_no_self)] + pub fn new(receiver: Receiver>) -> Block { + Block::new( + BlockMetaBuilder::new("AsyncChannelSource").build(), + StreamIoBuilder::new().add_output::("out").build(), + MessageIoBuilder::new().build(), + AsyncChannelSource:: { + receiver, + current: None, + }, + ) + } +} + +#[doc(hidden)] +#[async_trait] +impl Kernel for AsyncChannelSource { + async fn work( + &mut self, + io: &mut WorkIo, + sio: &mut StreamIo, + _mio: &mut MessageIo, + _meta: &mut BlockMeta, + ) -> Result<()> { + let out = sio.output(0).slice::(); + if out.is_empty() { + return Ok(()); + } + + if self.current.is_none() { + match self.receiver.by_ref().recv().await { + //.by_ref().next().await + Ok(data) => { + info!("received data chunk on channel"); + self.current = Some((data, 0)); + } + Err(_err) => { + info!("sender-end of channel was closed"); + io.finished = true; + return Ok(()); + } + } + } + + if let Some((data, index)) = &mut self.current { + let n = std::cmp::min(data.len() - *index, out.len()); + unsafe { + std::ptr::copy_nonoverlapping(data.as_ptr().add(*index), out.as_mut_ptr(), n); + }; + sio.output(0).produce(n); + *index += n; + if *index == data.len() { + self.current = None; + } + } + + if self.current.is_none() { + io.call_again = true; + } + + Ok(()) + } +} diff --git a/src/async_channel/mod.rs b/src/async_channel/mod.rs new file mode 100644 index 0000000..df1a967 --- /dev/null +++ b/src/async_channel/mod.rs @@ -0,0 +1,6 @@ +//! ## Blocks related to channels +mod async_channel_sink; +mod async_channel_source; + +pub use async_channel_sink::AsyncChannelSink; +pub use async_channel_source::AsyncChannelSource; diff --git a/src/channel/crossbeam_sink.rs b/src/channel/crossbeam_sink.rs index 2d4da6f..0ca7126 100644 --- a/src/channel/crossbeam_sink.rs +++ b/src/channel/crossbeam_sink.rs @@ -1,6 +1,6 @@ +use async_trait::async_trait; use crossbeam_channel::Sender; use futuresdr::anyhow::Result; -use futuresdr::async_trait::async_trait; use futuresdr::log::info; use futuresdr::runtime::BlockMeta; use futuresdr::runtime::BlockMetaBuilder; diff --git a/src/channel/crossbeam_source.rs b/src/channel/crossbeam_source.rs index 953ae37..34c4a71 100644 --- a/src/channel/crossbeam_source.rs +++ b/src/channel/crossbeam_source.rs @@ -1,6 +1,6 @@ +use async_trait::async_trait; use crossbeam_channel::{Receiver, TryRecvError}; use futuresdr::anyhow::Result; -use futuresdr::async_trait::async_trait; use futuresdr::log::debug; use futuresdr::runtime::BlockMeta; use futuresdr::runtime::BlockMetaBuilder; diff --git a/src/cw/baseband_to_cw.rs b/src/cw/baseband_to_cw.rs index 028b96a..6138eae 100644 --- a/src/cw/baseband_to_cw.rs +++ b/src/cw/baseband_to_cw.rs @@ -1,7 +1,7 @@ +use async_trait::async_trait; use std::ops::RangeInclusive; use futuresdr::anyhow::Result; -use futuresdr::async_trait::async_trait; use futuresdr::runtime::BlockMeta; use futuresdr::runtime::BlockMetaBuilder; use futuresdr::runtime::Kernel; diff --git a/src/cw/cw_to_char.rs b/src/cw/cw_to_char.rs index 74cac28..eaaefe8 100644 --- a/src/cw/cw_to_char.rs +++ b/src/cw/cw_to_char.rs @@ -1,5 +1,6 @@ +use async_trait::async_trait; + use futuresdr::anyhow::Result; -use futuresdr::async_trait::async_trait; use futuresdr::runtime::BlockMeta; use futuresdr::runtime::BlockMetaBuilder; use futuresdr::runtime::Kernel; diff --git a/src/lib.rs b/src/lib.rs index 2392668..10ffb1d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,9 @@ pub extern crate async_trait; #[cfg(feature = "crossbeam")] pub mod channel; +#[cfg(feature = "async-channel")] +pub mod async_channel; + #[cfg(feature = "cw")] pub mod cw; diff --git a/src/serde_pmt/error.rs b/src/serde_pmt/error.rs index 83a5dd3..d269497 100644 --- a/src/serde_pmt/error.rs +++ b/src/serde_pmt/error.rs @@ -1,4 +1,3 @@ -use std; use std::fmt::{self, Display}; use serde::{de, ser}; diff --git a/src/serde_pmt/serialiser.rs b/src/serde_pmt/serialiser.rs index fd7cafc..b968942 100644 --- a/src/serde_pmt/serialiser.rs +++ b/src/serde_pmt/serialiser.rs @@ -95,9 +95,9 @@ impl<'a> serde::Serializer for &'a mut Serializer { Ok(Pmt::Null) } - fn serialize_some(self, value: &T) -> Result + fn serialize_some(self, value: &T) -> Result where - T: Serialize, + T: Serialize + ?Sized, { value.serialize(self) } @@ -119,13 +119,13 @@ impl<'a> serde::Serializer for &'a mut Serializer { self.serialize_str(variant) } - fn serialize_newtype_struct( + fn serialize_newtype_struct( self, _name: &'static str, value: &T, ) -> std::prelude::v1::Result where - T: Serialize, + T: Serialize + ?Sized, { value.serialize(self) } diff --git a/tests/async_channel/async_channel_sink.rs b/tests/async_channel/async_channel_sink.rs new file mode 100644 index 0000000..cbcda2c --- /dev/null +++ b/tests/async_channel/async_channel_sink.rs @@ -0,0 +1,29 @@ +use fsdr_blocks::async_channel::AsyncChannelSink; +use futuresdr::anyhow::Result; +use futuresdr::blocks::VectorSource; +use futuresdr::macros::connect; +use futuresdr::runtime::Flowgraph; +use futuresdr::runtime::Runtime; + +#[test] +fn run_async_channel_sink_f32() -> Result<()> { + tokio_test::block_on(async_channel_sink_f32()) +} + +async fn async_channel_sink_f32() -> Result<()> { + let mut fg = Flowgraph::new(); + let (tx, rx) = async_channel::unbounded::>(); + + let orig: Vec = vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]; + let vector_src = VectorSource::::new(orig.clone()); + let async_channel_snk = AsyncChannelSink::::new(tx.clone()); + + connect!(fg, + vector_src > async_channel_snk; + ); + Runtime::new().run(fg)?; + + assert_eq!(orig, rx.recv().await.unwrap().to_vec()); + + Ok(()) +} diff --git a/tests/async_channel/async_channel_source.rs b/tests/async_channel/async_channel_source.rs new file mode 100644 index 0000000..47e5f3a --- /dev/null +++ b/tests/async_channel/async_channel_source.rs @@ -0,0 +1,44 @@ +use fsdr_blocks::async_channel::AsyncChannelSource; +use futuresdr::anyhow::Result; +use futuresdr::blocks::{Head, VectorSink, VectorSinkBuilder}; +use futuresdr::log::debug; +use futuresdr::macros::connect; +use futuresdr::runtime::{Flowgraph, Runtime}; + +#[test] +fn run_async_channel_source_u32() -> Result<()> { + tokio_test::block_on(async_channel_source_u32()) +} + +async fn async_channel_source_u32() -> Result<()> { + let mut fg = Flowgraph::new(); + let orig = vec![0, 1, 2]; + let (tx, rx) = async_channel::unbounded::>(); + + let async_channel_src = AsyncChannelSource::::new(rx); + let limit = Head::::new(orig.len() as u64); + let vector_snk = VectorSinkBuilder::::new().build(); + + connect!(fg, + async_channel_src > limit > vector_snk; + ); + + tx.send(orig.clone().into_boxed_slice()).await.unwrap(); + tx.close(); + + fg = Runtime::new().run(fg)?; + + let snk = fg.kernel::>(vector_snk).unwrap(); + let received = snk.items(); + + debug!("{}", received.len()); + debug!("{}", orig.len()); + + assert_eq!(received.len(), orig.len()); + + for (v, e) in orig.iter().zip(received.iter()) { + debug!("{v} == {e}"); + assert_eq!(v, e); + } + Ok(()) +} diff --git a/tests/async_channel/mod.rs b/tests/async_channel/mod.rs new file mode 100644 index 0000000..303cba9 --- /dev/null +++ b/tests/async_channel/mod.rs @@ -0,0 +1,2 @@ +pub mod async_channel_sink; +pub mod async_channel_source; diff --git a/tests/channel/crossbeam_source.rs b/tests/channel/crossbeam_source.rs index af67ad6..49eae3a 100644 --- a/tests/channel/crossbeam_source.rs +++ b/tests/channel/crossbeam_source.rs @@ -1,4 +1,3 @@ -use crossbeam_channel; use fsdr_blocks::channel::CrossbeamSource; use futuresdr::anyhow::Result; use futuresdr::blocks::{Head, VectorSink, VectorSinkBuilder}; diff --git a/tests/cw/shared.rs b/tests/cw/shared.rs index 3d695d2..6c5321a 100644 --- a/tests/cw/shared.rs +++ b/tests/cw/shared.rs @@ -64,7 +64,7 @@ fn test_msg_to_cw() -> Result<()> { // cargo nextest run test_display_trait_impl --no-capture #[test] fn test_display_trait_impl() { - let testdata = vec![Dash, Dot, LetterSpace, Unknown, Dot, WordSpace, Dash]; + let testdata = [Dash, Dot, LetterSpace, Unknown, Dot, WordSpace, Dash]; let str: String = testdata.iter().map(ToString::to_string).collect(); assert_eq!("-. ./ -", str) } diff --git a/tests/tests.rs b/tests/tests.rs index 29e78a1..e9a486f 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "async-channel")] +mod async_channel; #[cfg(feature = "crossbeam")] mod channel; #[cfg(feature = "cw")]