Skip to content

Commit

Permalink
Updated test and ran fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
amadeusine committed Mar 21, 2020
1 parent 02d3be6 commit 8d9ecc1
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ mod par_stream;
pub use from_parallel_stream::FromParallelStream;
pub use from_stream::{from_stream, FromStream};
pub use into_parallel_stream::IntoParallelStream;
pub use par_stream::{ForEach, Map, NextFuture, ParallelStream, Take, Any};
pub use par_stream::{Any, ForEach, Map, NextFuture, ParallelStream, Take};

pub mod prelude;
pub mod vec;
Expand Down
26 changes: 12 additions & 14 deletions src/par_stream/any.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use async_std::prelude::*;
use async_std::future::Future;
use pin_project_lite::pin_project;
use async_std::task::{self, Context, Poll};
use async_std::prelude::*;
use async_std::sync::{self, Receiver, Sender};
use async_std::task::{self, Context, Poll};
use pin_project_lite::pin_project;

use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
Expand Down Expand Up @@ -59,13 +59,14 @@ impl Any {
task::spawn(async move {
// Execute the closure.
let res = f(item).await;
value.fetch_or(res, Ordering::SeqCst);

// Wake up the receiver if we know we're done.
ref_count.fetch_sub(1, Ordering::SeqCst);
if res {
value.fetch_or(true, Ordering::SeqCst);
sender.send(()).await;
} else if exhausted.load(Ordering::SeqCst) && ref_count.load(Ordering::SeqCst) == 0 {
if value.load(Ordering::SeqCst)
|| (exhausted.load(Ordering::SeqCst)
&& ref_count.load(Ordering::SeqCst) == 0)
{
sender.send(()).await;
}
});
Expand All @@ -91,13 +92,10 @@ impl Future for Any {

#[async_std::test]
async fn smoke() {
let s = async_std::stream::repeat(5usize);
let s = async_std::stream::from_iter(vec![6, 9, 0, 7, 10]);
let result = crate::from_stream(s)
.take(3)
.any(|n| async move {
n * 2 < 9
})
.any(|n| async move { n * 2 < 9 })
.await;
assert_eq!(result, false);

assert!(result);
}
4 changes: 2 additions & 2 deletions src/par_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ use std::pin::Pin;

use crate::FromParallelStream;

pub use any::Any;
pub use for_each::ForEach;
pub use map::Map;
pub use next::NextFuture;
pub use take::Take;
pub use any::Any;

mod any;
mod for_each;
mod map;
mod next;
mod take;
mod any;

/// Parallel version of the standard `Stream` trait.
pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static {
Expand Down
1 change: 1 addition & 0 deletions tests/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

0 comments on commit 8d9ecc1

Please sign in to comment.