Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interop.flow.pipeToProcessor #3449

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
44 changes: 44 additions & 0 deletions core/shared/src/main/scala/fs2/interop/flow/StreamProcessor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package fs2
package interop
package flow

import java.util.concurrent.Flow
import cats.effect.{Async, Resource}

private[flow] final class StreamProcessor[F[_], I, O](
streamSubscriber: StreamSubscriber[F, I],
streamPublisher: StreamPublisher[F, O]
) extends Flow.Processor[I, O] {
override def onSubscribe(subscription: Flow.Subscription): Unit =
streamSubscriber.onSubscribe(subscription)

override def onNext(i: I): Unit =
streamSubscriber.onNext(i)

override def onError(ex: Throwable): Unit =
streamSubscriber.onError(ex)

override def onComplete(): Unit =
streamSubscriber.onComplete()

override def subscribe(subscriber: Flow.Subscriber[? >: O <: Object]): Unit =
streamPublisher.subscribe(subscriber)
}

private[flow] object StreamProcessor {
def fromPipe[F[_], I, O](
pipe: Pipe[F, I, O],
chunkSize: Int
)(implicit
F: Async[F]
): Resource[F, StreamProcessor[F, I, O]] =
for {
streamSubscriber <- Resource.eval(StreamSubscriber[F, I](chunkSize))
inputStream = streamSubscriber.stream(subscribe = F.unit)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have access to the upstream Publisher yet, so we can't do the subscribe here. So we do nothing and wait for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we do nothing and wait for it.

I'm not entirely sure I understand the waiting part. Can/should we use a Deferred here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No sorry, my point is that subscribe should be something like IO(publisher.subscribe(streamSubscriber)) as you can see in the implementation of syntax.fromPublisher.
Here, however, we don't have a Publisher yet, so we don't control when we are published.

Basically, this is very similar to the fromPublisher overload that accepts a Subscriber => F[Unit], just that rather than receiving the lambda, we are just returning a raw Processor / Subscriber.

outputStream = pipe(inputStream)
streamPublisher <- StreamPublisher(outputStream)
} yield new StreamProcessor(
streamSubscriber,
streamPublisher
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ private[flow] object StreamSubscriber {
/** Instantiates a new [[StreamSubscriber]] for the given buffer size. */
def apply[F[_], A](
chunkSize: Int
)(implicit F: Async[F]): F[StreamSubscriber[F, A]] = {
)(implicit
F: Async[F]
): F[StreamSubscriber[F, A]] = {
require(chunkSize > 0, "The buffer size MUST be positive")

F.delay {
Expand Down
13 changes: 11 additions & 2 deletions core/shared/src/main/scala/fs2/interop/flow/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
package fs2
package interop

import cats.effect.IO
import cats.effect.kernel.{Async, Resource}
import cats.effect.{Async, IO, Resource}
import cats.effect.unsafe.IORuntime

import java.util.concurrent.Flow
import java.util.concurrent.Flow.{Publisher, Subscriber, defaultBufferSize}

/** Implementation of the reactive-streams protocol for fs2; based on Java Flow.
Expand Down Expand Up @@ -199,6 +199,15 @@ package object flow {
): Stream[F, Nothing] =
StreamSubscription.subscribe(stream, subscriber)

/** TODO. */
def pipeToProcessor[F[_], I, O](
BalmungSan marked this conversation as resolved.
Show resolved Hide resolved
pipe: Pipe[F, I, O],
chunkSize: Int
)(implicit
F: Async[F]
): Resource[F, Flow.Processor[I, O]] =
StreamProcessor.fromPipe(pipe, chunkSize)

/** A default value for the `chunkSize` argument,
* that may be used in the absence of other constraints;
* we encourage choosing an appropriate value consciously.
Expand Down
Loading