From 4af796b7b43d24059d31756310ec8e4b9c0d9bbd Mon Sep 17 00:00:00 2001 From: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> Date: Wed, 13 May 2020 18:06:42 +0200 Subject: [PATCH] add ParallelStream::find_map #2 Signed-off-by: Benjamin Coenen <5719034+bnjjj@users.noreply.github.com> --- src/par_stream/find_map.rs | 100 +++++++++++++++++++++++++++++++++++++ src/par_stream/mod.rs | 13 +++++ 2 files changed, 113 insertions(+) create mode 100644 src/par_stream/find_map.rs diff --git a/src/par_stream/find_map.rs b/src/par_stream/find_map.rs new file mode 100644 index 0000000..1f03f28 --- /dev/null +++ b/src/par_stream/find_map.rs @@ -0,0 +1,100 @@ +// use async_std::prelude::*; +use async_std::future::Future; +use async_std::sync::{self, Receiver}; +use async_std::task; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::ParallelStream; + +pin_project_lite::pin_project! { + /// A parallel stream that FindMaps value of another stream with a function. + #[derive(Debug)] + pub struct FindMap { + #[pin] + receiver: Receiver, + limit: Option, + already_sent: bool, + } +} + +impl FindMap +where + T: Send + 'static, +{ + /// Create a new instance of `FindMap`. + pub fn new(mut stream: S, mut f: F) -> Self + where + S: ParallelStream, + F: FnMut(S::Item) -> Fut + Send + Sync + Copy + 'static, + Fut: Future> + Send, + { + let (sender, receiver) = sync::channel(1); + let limit = stream.get_limit(); + task::spawn(async move { + while let Some(item) = stream.next().await { + let sender = sender.clone(); + task::spawn(async move { + let res = f(item).await; + if let Some(res) = res { + sender.send(res).await; + } + }); + } + }); + FindMap { + receiver, + limit, + already_sent: false, + } + } +} + +impl ParallelStream for FindMap +where + T: Send + 'static, +{ + type Item = T; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use async_std::prelude::*; + let this = self.project(); + if *this.already_sent { + return Poll::Ready(None); + } + if let Poll::Ready(elt) = this.receiver.poll_next(cx) { + if let Some(elt) = elt { + *this.already_sent = true; + return Poll::Ready(Some(elt)); + } + return Poll::Ready(None); + } + Poll::Pending + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + let s = async_std::stream::from_iter(vec![1, 1, 2, 3, 2]); + let mut output: Vec = vec![]; + let mut stream = crate::from_stream(s).find_map(|n| async move { + if n % 2 == 0 { + Some(42usize) + } else { + None + } + }); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![42]); +} diff --git a/src/par_stream/mod.rs b/src/par_stream/mod.rs index 500befe..bee9bd1 100644 --- a/src/par_stream/mod.rs +++ b/src/par_stream/mod.rs @@ -5,11 +5,13 @@ use std::pin::Pin; use crate::FromParallelStream; +pub use find_map::FindMap; pub use for_each::ForEach; pub use map::Map; pub use next::NextFuture; pub use take::Take; +mod find_map; mod for_each; mod map; mod next; @@ -29,6 +31,17 @@ pub trait ParallelStream: Sized + Send + Sync + Unpin + 'static { /// Get the max concurrency limit fn get_limit(&self) -> Option; + /// Applies `f` to each item of this stream in parallel, producing a new + /// stream with the results. + fn find_map(self, f: F) -> FindMap + where + F: FnMut(Self::Item) -> Fut + Send + Sync + Copy + 'static, + T: Send + 'static, + Fut: Future> + Send, + { + FindMap::new(self, f) + } + /// Applies `f` to each item of this stream in parallel, producing a new /// stream with the results. fn map(self, f: F) -> Map