Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix uncancellable reactive-streams StreamSubscriber #3446

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

i-surkov
Copy link

Fixes #3260.

Due to the changes in cats-effect async cancellation semantics, StreamSubscriber's dequeue1 became uncancellable, this PR fixes that.

I've also added identical test cases for both reactive-streams and flow interops which checks that the streams provided by StreamSubscriber is indeed interruptible (it failed for reactive-streams before the fix), and that it does call cancel() on the Subscription when interrupted.

@i-surkov i-surkov force-pushed the fix-reactive-streams-termination branch from 191585b to 8405c21 Compare June 19, 2024 09:25
F.async[Either[Throwable, Option[Chunk[A]]]] { cb =>
F.delay {
nextState(OnDequeue(out => cb(Right(out))))
Some(F.unit)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the cleanup of the subscription handled around this effect? Producing Some(F.unit) basically says that there is no cleanup action which must be taken on the state resulting from nextState, and it actually corresponds to a memory leak unless we have a higher level guarantee of cleanup.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup happens with the onFinalize call in the Streams.bracket here:

def stream(subscribe: F[Unit])(implicit ev: ApplicativeError[F, Throwable]): Stream[F, A] =
      Stream.bracket(subscribe)(_ => onFinalize) >> Stream
        .eval(dequeue1)
        .repeat
        .rethrow
        .unNoneTerminate
        .unchunks

which is the only place where dequeue1 is used internally.

Although technically I guess someone could call StreamSubscriber#sub.dequeue1 directly, since it is exposed here (unlike analogous flow interop).

@i-surkov i-surkov force-pushed the fix-reactive-streams-termination branch from 8405c21 to 021a610 Compare June 25, 2024 11:53
@i-surkov
Copy link
Author

i-surkov commented Jul 8, 2024

Hi @armanbilge, just checking if you will have some time to review this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reactive-streams interop is uncancelable
2 participants