Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Support Node Readable stream pause / backpressure #3

Open
hekike opened this issue Apr 20, 2015 · 2 comments
Open

Support Node Readable stream pause / backpressure #3

hekike opened this issue Apr 20, 2015 · 2 comments

Comments

@hekike
Copy link

hekike commented Apr 20, 2015

This issue is moved from: Reactive-Extensions/RxJS#508

http://nodejs.org/api/stream.html#stream_readable_pause

I tried to request only two items from a Readable Node stream with Rx and I observed the following behaviour:

  • Rx subscribe next emits only the expected two items
  • but the Rx Observable reads every item from the stream, doesn't pause the stream after two

Rx

var source = Rx.Node.fromStream(dbUserStream).controlled();
source.subscribe(..);
source.request(2);

or

Rx.Node.fromStream(dbUserStream)
  .take(2)
  .subscribe(..);

I think Rx should support some backpressure here and pause the Readable stream. Would be great for take also.

In Highland it works in the following way:

_(dbUserStream)
  .take(2)
  .toArray(function (users) { ... });

Did I miss something?

@paulpdaniels
Copy link

Since down stream operators won't have control of this, I am thinking it should be built into the operator, so either:

var pauser = Rx.Observable.just(true).delay(400);

//Pass in an observable and pause the stream on truthy values and resume on falsey (or vice versa)
Rx.Node.fromReadableStream(stream, pauser);

or take the backpressure/connect approach

var pausableObservable = Rx.Node.fromReadableStream(stream);

//It is now paused
var disposable = pausableObservable.pause();


//It is now unpaused
disposable.dispose();

Personally I tend toward the first, since the second might have some confusing semantics for some people, and in general it forces yet another chain break. But I don't generally use this method so I would leave it up to people like @hekike .

@Zalastax
Copy link

Zalastax commented May 3, 2015

I had to implement this in my own code today and solved it by adding:

        pauser.distinctUntilChanged().subscribe(function (b) {
            if (b) {
                stream.resume();
            } else {
                stream.pause();
            }
        });

in fromStream.
This uses the Rx.Node.fromReadableStream(stream, pauser); syntax

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants