From 997cda826591f7a79c06bf90d8021eb5cdca668a Mon Sep 17 00:00:00 2001 From: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Date: Wed, 13 May 2020 17:12:33 +0200 Subject: [PATCH] add ParallelStream::filter #2 Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- src/par_stream/filter.rs | 80 ++++++++++++++++++++++++++++++++++++++++ src/par_stream/mod.rs | 13 +++++++ 2 files changed, 93 insertions(+) create mode 100644 src/par_stream/filter.rs diff --git a/src/par_stream/filter.rs b/src/par_stream/filter.rs new file mode 100644 index 0000000..d0c40a8 --- /dev/null +++ b/src/par_stream/filter.rs @@ -0,0 +1,80 @@ +// use async_std::prelude::*; +use async_std::future::Future; +use async_std::sync::{self, Receiver}; +use async_std::task; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::ParallelStream; + +pin_project_lite::pin_project! { + /// A parallel stream that filters value of another stream with a function. + #[derive(Debug)] + pub struct Filter where S: ParallelStream { + #[pin] + receiver: Receiver, + limit: Option, + } +} + +impl Filter +where + S: ParallelStream, +{ + /// Create a new instance of `Filter`. + pub fn new(mut stream: S, mut f: F) -> Self + where + S: ParallelStream, + F: FnMut(&S::Item) -> Fut + Send + Sync + Copy + 'static, + Fut: Future + Send, + S::Item: Sync, + { + let (sender, receiver) = sync::channel(1); + let limit = stream.get_limit(); + task::spawn(async move { + while let Some(item) = stream.next().await { + let sender = sender.clone(); + task::spawn(async move { + let res = f(&item).await; + if res { + sender.send(item).await; + } + }); + } + }); + Filter { receiver, limit } + } +} + +impl ParallelStream for Filter +where + S: ParallelStream, +{ + type Item = S::Item; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use async_std::prelude::*; + let this = self.project(); + this.receiver.poll_next(cx) + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + let s = async_std::stream::from_iter(vec![2, 1, 2, 3, 2]); + let mut output: Vec = vec![]; + let mut stream = crate::from_stream(s).filter(|&n| async move { n % 2 == 0 }); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![2usize; 3]); +} diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 500befe..c8b3070 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -5,11 +5,13 @@ use std::pin::Pin; use crate::FromParallelStream; +pub use filter::Filter; pub use for_each::ForEach; pub use map::Map; pub use next::NextFuture; pub use take::Take; +mod filter; mod for_each; mod map; mod next; @@ -40,6 +42,17 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { Map::new(self, f) } + /// Applies `f` to each item of this stream in parallel, producing a new + /// stream with items filtered. + fn filter(self, f: F) -> Filter + where + F: FnMut(&Self::Item) -> Fut + Send + Sync + Copy + 'static, + Fut: Future + Send, + Self::Item: Sync, + { + Filter::new(self, f) + } + /// Applies `f` to each item of this stream in parallel, producing a new /// stream with the results. fn next(&mut self) -> NextFuture<'_, Self> {