Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add .end() to Kefir.pool() #137

Open
cefn opened this issue Sep 2, 2015 · 15 comments
Open

Add .end() to Kefir.pool() #137

cefn opened this issue Sep 2, 2015 · 15 comments

Comments

@cefn
Copy link

cefn commented Sep 2, 2015

I've got a puzzle with a framework I'm putting together, where I would like to be able to properly dispose of promises when they are no longer needed (and unsubscribe from the relevant streams).

I believe a subscription is made in order to back the Promise, but when program logic establishes that the promise is no longer needed I can't find anything which can be done to tidy up and unsubscribe from the stream, as the onX handler is hidden within Kefir, meaning the stream would remain subscribed indefinitely with all its underlying resources committed.

If there is some way to e.g. trigger Promise rejection/fulfilment which could cause unsubscription, or to get hold of the subscribed value- and end-handler(s) to manually unsubscribe, that would be very useful to know.

Equally, perhaps there's a Kefiric way to tackle this with a minimum of weirdness (perhaps creating some kind of derived stream, creating the Promise from the derived stream, then manipulating the derived stream causing the unsubscription). I haven't been able to pin down the right transformation of a stream which would permit this, yet. Pool would be promising, but it never ends, so I can't externally induce the toPromise to complete.

If there is no obvious way, perhaps this could be a feature request for there to be some channel for unsubscription and termination of Promises.

@rpominov
Copy link
Member

rpominov commented Sep 2, 2015

Promises are broken. They don't have any mechanism for clearing resources. I wish promises work same as observables — when all consumers unsubscribe your callback is called, allowing you to free resources. But unfortunately we can't even unsubscribe from a promise :(

We could manually attach a .kefirDispose() method to the result promise. But then we'll have a problem, that anyone who has the promise will be able to dispose.

Here a solution I came up with, that uses current APIs:

function createEcho(source) {
  var unsub = null
  var stream = Kefir.stream(function(em) {
    if (source === null) {
      return function() {}
    }
    source.onAny(em.emitEvent)
    unsub = function() {
      source.offAny(em.emitEvent)
      unsub = null
    }
    return unsub
  })
  return {
    stream: stream,
    disconnect: function() {
      if (unsub) {
        unsub()
      }
      source = null
    }
  }
}



// -----------------------------------------------------------------------------



var source = ... // some stream
var echo = createEcho(source)
var promise = echo.stream.toPromise()


echo.disconnect()
promise = echo = null


// (I didn't test it)

Maybe it's a good idea to add createEcho to the core, or maybe someone could create a separate npm module with it.

@cefn
Copy link
Author

cefn commented Sep 3, 2015

It's not unreasonable to expect stream termination to be explicit, but controllable, through an intermediate stream which exposes that control, but currently the API doesn't really allow for streams where you can inject events, except through a kind of hack using Kefir.stream's callback to export the emitter as shown in the below example.

The intent is roughly illustrated by the steps below although this isn't a suggested API signature.

The outputUnsubscribe function always runs, regardless if the Promise is hit or not. I think in the case setTimeout is 100 ms, the Promise triggers unsubscribe, and in the case setTimeout is 1000, the outputPipe.emitter.end() call triggers it.

Some API options suggested by this are...

  • Some kind of pipe primitive, which may address other issues too.
  • (related) a convention for exposing an 'emitter' without closure acrobatics.

I understand there may be that there is no room for a pipe primitive because of Kefir's minimal API intent. A simple pipe primitive could have the signatures of both an emitter and a stream, but extra thinking would be needed to ensure that a downstream pipe which receives an End through its own emitter then unsubscribes from all its upstream sources.

var Kefir = require("kefir");

var input = Kefir.sequentially(500, [0,1,2,3]);

var outputPipe;

var outputSubscribe = function(){
    input.onAny(outputPipe);
};

var outputUnsubscribe = function(){
    input.offAny(outputPipe);
};

var output = Kefir.stream(function(emitter){
    outputPipe = function(event){
        emitter.emitEvent(event);
    }
    outputPipe.emitter = emitter;
    outputSubscribe();
    return outputUnsubscribe;
});
output.onEnd(outputUnsubscribe);

output.take(1).toPromise().then(console.log)

setTimeout(function(){
    outputPipe.emitter.end();
}, 1000);

@rpominov
Copy link
Member

rpominov commented Sep 3, 2015

Here is another pattern:

var input = ... 
var disposer = ...
var promise = input.takeUntilBy(disposer).toPromise()

If only you could create the disposer somehow naturally, without emitter closure acrobatics :)
For example var disposer = Kefir.fromEvents(closeButton, 'click').take(1).

@cefn
Copy link
Author

cefn commented Sep 3, 2015

Here's an alternative - using Promises to fight Promises, something like...

var Kefir = require("kefir"),
      Q = require("q");

var input = ...
var disposer = Q.defer();
var promise = input.takeUntilBy(Kefir.fromPromise(disposer.promise)).toPromise();
disposer.resolve();

(untested)

Is there any reason that pool never ends? I understand that it shouldn't end when it's contributing streams do. However, can't there be an end() call available on the pool itself, which will cause it to unsubscribe from its contributors and issue an end event to its subscribers? That would address the scenario of tidying up promises using just a single indirection through a call to pool().

Opening up a full emitter interface on pool could make it quite a powerful element although this scenario only needs an end() call available.

@rpominov
Copy link
Member

rpominov commented Sep 3, 2015

Opening up a full emitter interface on pool could make it quite a powerful element

We actually have a deprecated method doing exactly that https://github.com/rpominov/kefir/blob/master/deprecated-api-docs.md#kefirbus (see deprecation story here #88).

I still wish we'll find a better solution for that kind of problems, but perhaps bus / Kefir.emitter() will indeed return in some form eventually.

Btw, native Promises also has only Promise((resolve, reject) => ...) interface (so we may have to extract resolve from them the same way we extract emitter from Kefir.stream() callback — and that's OK) https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/Promise

Observable proposal also doesn't has such API, AFAIK https://github.com/zenparsing/es-observable

@cefn
Copy link
Author

cefn commented Sep 4, 2015

I like the clean and minimal API, so I definitely understand you wanting to minimise duplication and steer people away from sloppy practice. I recognise adding a full emitter interface to pool might be a step too far for these reasons.

However, adding the end() method to streams from pool() is complementary to the existing API as it handles the situation that...

  • Pool cannot generate an end as a result of its sources
  • Pool already has a list of upstream sources, so it makes sense for Pool to manage unsubscription on end

Without Pool looking after it, an author without control downstream subscribers, who wants to stop events being forwarded, would have to maintain a second duplicate list of sources and do their own unsubscription to achieve the same result.

https://rpominov.github.io/kefir/#pool currently says Pool never ends, but in the alternative implementation, Pool only ends when pool.end() is called and not before. Does this make sense? Or do you think even adding end() is too much?

@rpominov
Copy link
Member

rpominov commented Sep 4, 2015

Yeah, I guess adding .end() method to the pool makes sense. A PR is welcomed.

@rpominov rpominov changed the title How to reverse 'toPromise' Add .end() to Keifr.pool() Sep 5, 2015
@cefn cefn changed the title Add .end() to Keifr.pool() Add .end() to Kefir.pool() Sep 5, 2015
@cefn
Copy link
Author

cefn commented Sep 5, 2015

I've been looking into a PR but it's my first! I think I can figure out the right bit of AbstractPool, and the right bit of git, but time will tell.

@NeverwinterMoon
Copy link

If only you could create the disposer somehow naturally, without emitter closure acrobatics :)

@rpominov I know this isn't going to sound like the best idea, but what about using the private function #_emitEnd() on Pool?

I am currently trying to use this in tests only: I have a function that takes multiple streams, I then try to test this function by passing multiple pools and just plugging them with data to simulate data flow. I am also using Jest, so it makes sense to return a promise in the spec, so Jest will wait for that promise to resolve and evaluate the expected results. So, what I am doing now is (CS):

streamA = Kefir.pool()
streamB = Kefir.pool()

setTimeout (->
  streamA._emitEnd()
  streamB._emitEnd()
), 10

streamA.plug Kefir.sequentially 0, [ 'something' ]
streamB.plug Kefir.sequentially 0, [ 'somethingElse' ]

expect.assertions 1

functionUnderTest streamA, streamB
  .toPromise()
  .then (value) ->
    expect value
      .toEqual 'something'

Is there any better approach to this for such a case?

@rpominov
Copy link
Member

rpominov commented Jun 21, 2017

It's probably fine, because if #end() was implemented it would probably just call #_emitEnd() under the hood.

But I wouldn't recommend to use private API too much, after all it may change in any minor version, although I'm not planning to do so any time soon.

@mAAdhaTTah
Copy link
Collaborator

@NeverwinterMoon FWIW, I used to do something similar in my tests but found that it was easier to just pass the streams directly into said functions rather than plugging them into a pool, often using Kefir.merge. There generally are enough ways to compose a stream even for tests that I was able to convert my usage of pool -> plain streams.

@NeverwinterMoon
Copy link

@mAAdhaTTah Thanks for the advice! We have been using Kefir in our code base for quite some time now, but I started noticing that we were testing the functionality around the streams in a very awkward manner (or not testing at all in a lot of scenarios). So, now I am trying this and that to find the best approach to writing clean but reliable tests.

@mAAdhaTTah
Copy link
Collaborator

@NeverwinterMoon The other thing I'll definitely suggest is using spies. My tests tend to look like this:

it('should do a thing', () => {
  const source$ = Kefir.stream(/* emit values & then end */);
  const error = sinon.spy();
  const value = sinon.spy();

  doThing(source$).observe({
    error,
    value,
    end() {
      // Validate spies' callCount & argument values
    }
  });
});

Hope that helps!

@mAAdhaTTah
Copy link
Collaborator

@NeverwinterMoon Update: We now have chai-kefir for testing streams. Please let me know if the documentation is unclear in any way.

@NeverwinterMoon
Copy link

@mAAdhaTTah Great news! Sadly, I am no longer involved with the project that was using Kefir. But I'll pass the info to the people still working on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants