diff --git a/src/lib.rs b/src/lib.rs index 7078668..1adf181 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,7 +52,7 @@ mod par_stream; pub use from_parallel_stream::FromParallelStream; pub use from_stream::{from_stream, FromStream}; pub use into_parallel_stream::IntoParallelStream; -pub use par_stream::{ForEach, Map, NextFuture, ParallelStream, Take, Any}; +pub use par_stream::{Any, ForEach, Map, NextFuture, ParallelStream, Take}; pub mod prelude; pub mod vec; diff --git a/src/par_stream/any.rs b/src/par_stream/any.rs index 2ab6137..335dca0 100644 --- a/src/par_stream/any.rs +++ b/src/par_stream/any.rs @@ -1,8 +1,8 @@ -use async_std::prelude::*; use async_std::future::Future; -use pin_project_lite::pin_project; -use async_std::task::{self, Context, Poll}; +use async_std::prelude::*; use async_std::sync::{self, Receiver, Sender}; +use async_std::task::{self, Context, Poll}; +use pin_project_lite::pin_project; use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; @@ -59,13 +59,14 @@ impl Any { task::spawn(async move { // Execute the closure. let res = f(item).await; + value.fetch_or(res, Ordering::SeqCst); // Wake up the receiver if we know we're done. ref_count.fetch_sub(1, Ordering::SeqCst); - if res { - value.fetch_or(true, Ordering::SeqCst); - sender.send(()).await; - } else if exhausted.load(Ordering::SeqCst) && ref_count.load(Ordering::SeqCst) == 0 { + if value.load(Ordering::SeqCst) + || (exhausted.load(Ordering::SeqCst) + && ref_count.load(Ordering::SeqCst) == 0) + { sender.send(()).await; } }); @@ -91,13 +92,10 @@ impl Future for Any { #[async_std::test] async fn smoke() { - let s = async_std::stream::repeat(5usize); + let s = async_std::stream::from_iter(vec![6, 9, 0, 7, 10]); let result = crate::from_stream(s) - .take(3) - .any(|n| async move { - n * 2 < 9 - }) + .any(|n| async move { n * 2 < 9 }) .await; - - assert_eq!(result, false); + + assert!(result); } diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index e286939..f7ed04e 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -5,17 +5,17 @@ use std::pin::Pin; use crate::FromParallelStream; +pub use any::Any; pub use for_each::ForEach; pub use map::Map; pub use next::NextFuture; pub use take::Take; -pub use any::Any; +mod any; mod for_each; mod map; mod next; mod take; -mod any; /// Parallel version of the standard `Stream` trait. pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { diff --git a/tests/test.rs b/tests/test.rs index e69de29..8b13789 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -0,0 +1 @@ +