diff --git a/src/par_stream/filter_map.rs b/src/par_stream/filter_map.rs new file mode 100644 index 0000000..4df0c5d --- /dev/null +++ b/src/par_stream/filter_map.rs @@ -0,0 +1,77 @@ +// use async_std::prelude::*; +use async_std::future::Future; +use async_std::sync::{self, Receiver}; +use async_std::task; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::ParallelStream; + +pin_project_lite::pin_project! { + #[derive(Debug)] + pub struct FilterMap { + #[pin] + receiver: Receiver, + limit: Option, + } +} + +impl FilterMap { + /// Create a new instance of `FilterMap`. + pub fn new(mut stream: S, mut f: F) -> Self + where + S: ParallelStream, + F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static, + Fut: Future> + Send, + { + let (sender, receiver) = sync::channel(1); + let limit = stream.get_limit(); + task::spawn(async move { + while let Some(item) = stream.next().await { + let sender = sender.clone(); + task::spawn(async move { + if let Some(res) = f(item).await { + sender.send(res).await; + } + }); + } + }); + FilterMap { receiver, limit } + } +} + +impl ParallelStream for FilterMap { + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use async_std::prelude::*; + let this = self.project(); + this.receiver.poll_next(cx) + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + let s = async_std::stream::from_iter(vec![1, 2, 1, 2, 1, 2]); + let mut output: Vec = vec![]; + let mut stream = crate::from_stream(s).filter_map(|n| async move { + if n % 2 == 0 { + Some(n) + } else { + None + } + }); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![2usize; 3]); +} diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 500befe..7b4e2d0 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -5,11 +5,13 @@ use std::pin::Pin; use crate::FromParallelStream; +pub use filter_map::FilterMap; pub use for_each::ForEach; pub use map::Map; pub use next::NextFuture; pub use take::Take; +mod filter_map; mod for_each; mod map; mod next; @@ -40,6 +42,19 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { Map::new(self, f) } + /// Applies `f` to each item of this stream in parallel, where `f` returns + /// an Future>. If the future yields a None the item is + /// dropped, if the future yields a Some(T), T is added to the new stream of + /// results + fn filter_map(self, f: F) -> FilterMap + where + F: FnMut(Self::Item) -> Fut + Send + Sync + Copy + 'static, + T: Send + 'static, + Fut: Future> + Send, + { + FilterMap::new(self, f) + } + /// Applies `f` to each item of this stream in parallel, producing a new /// stream with the results. fn next(&mut self) -> NextFuture<'_, Self> { diff --git a/tests/test.rs b/tests/test.rs index e69de29..8b13789 100644 --- a/tests/test.rs +++ b/tests/test.rs @@ -0,0 +1 @@ +