diff --git a/src/from_fn.rs b/src/from_fn.rs new file mode 100644 index 0000000..07ad2a3 --- /dev/null +++ b/src/from_fn.rs @@ -0,0 +1,91 @@ +use async_std::future::Future; +use async_std::sync::{self, Receiver}; +use async_std::task; +use async_std::task::{Context, Poll}; +use core::pin::Pin; +use pin_project_lite::pin_project; + +use crate::ParallelStream; + +pin_project! { + /// A parallel stream that yields elements by calling a closure. + /// + /// This stream is created by the [`from_fn`] function. + /// See it documentation for more. + /// + /// [`from_fn`]: fn.from_fn.html + /// + /// # Examples + #[derive(Clone, Debug)] + pub struct FromFn { + #[pin] + receiver: Receiver, + f: F, + limit: Option, + } +} + +/// Creates a parallel stream from a closure. +pub fn from_fn(mut f: F) -> FromFn +where + T: Send + Sync + Unpin + 'static, + F: FnMut() -> Fut + Send + Sync + Copy + 'static, + Fut: Future> + Send, +{ + let (sender, receiver) = sync::channel(1); + task::spawn(async move { + let sender = sender.clone(); + while let Some(val) = f().await { + sender.send(val).await; + } + }); + FromFn { + f, + receiver, + limit: None, + } +} + +impl ParallelStream for FromFn +where + T: Send + Sync + Unpin + 'static, + F: FnMut() -> Fut + Send + Sync + Copy + 'static, + Fut: Future> + Send, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use async_std::prelude::*; + let this = self.project(); + this.receiver.poll_next(cx) + } + + fn limit(mut self, limit: impl Into>) -> Self { + self.limit = limit.into(); + self + } + + fn get_limit(&self) -> Option { + self.limit + } +} + +#[async_std::test] +async fn smoke() { + let mut output = vec![]; + let mut count = 0u8; + let mut stream = crate::from_fn(move || { + count += 1; + async move { + if count <= 3 { + Some(count) + } else { + None + } + } + }); + while let Some(n) = stream.next().await { + output.push(n); + } + assert_eq!(output, vec![1, 2, 3]); +} diff --git a/src/lib.rs b/src/lib.rs index d9ac3ab..7ac9f85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -44,11 +44,13 @@ #![deny(missing_debug_implementations, nonstandard_style)] #![warn(missing_docs, missing_doc_code_examples)] +mod from_fn; mod from_parallel_stream; mod from_stream; mod into_parallel_stream; mod par_stream; +pub use from_fn::{from_fn, FromFn}; pub use from_parallel_stream::FromParallelStream; pub use from_stream::{from_stream, FromStream}; pub use into_parallel_stream::IntoParallelStream;