From 923f4e9cf77f10cf2bc7bf75209646bcf9e148aa Mon Sep 17 00:00:00 2001 From: Bruno Pinto Date: Thu, 19 Aug 2021 19:00:24 +0100 Subject: [PATCH] Add overlapping option to flatMap This will cause the next stream to be added before any old streams are removed. ``` const channelA = Kefir.stream((emitter) => { console.log('connect a'); let count = 0, id = setInterval(() => emitter.value(count++), 250); return () => { console.log('disconnect a'); clearInterval(id); }; }); const channelB = Kefir.stream((emitter) => { console.log('connect b'); let count = 0, id = setInterval(() => emitter.value(count++), 250); return () => { console.log('disconnect b'); clearInterval(id); }; }); const data = { a: channelA, b: Kefir.combine([channelA, channelB]), c: channelB, }; Kefir.sequentially(1000, ['a', 'b', 'c', undefined]) .flatMapLatest(p => p ? data[p] : Kefir.never()) .log('result'); ``` With overlapping option disabled (default): ``` > connect a > result 0 > result 1 > result 2 > disconnect a > connect a > connect b > result [0, 0] > result [1, 0] > result [1, 1] > result [2, 1] > result [2, 2] > disconnect a > disconnect b > connect b > result 0 > result 1 > result 2 > disconnect b > result ``` With overlapping option enabled: ``` > connect a > result 0 > result 1 > result 2 > connect b > result [3, 0] > result [3, 1] > result [4, 1] > result [4, 2] > disconnect a > result 3 > result 4 > result 5 > disconnect b > result ``` Closes: #235 #236 --- src/index.js | 6 ++- src/many-sources/abstract-pool.js | 27 ++++++---- src/many-sources/flat-map.js | 7 ++- test/specs/flat-map-latest.js | 57 ++++++++++++++++++++ test/specs/flat-map.js | 87 +++++++++++++++++++++++++++++++ 5 files changed, 172 insertions(+), 12 deletions(-) diff --git a/src/index.js b/src/index.js index 87ecec03..45b3e1b1 100644 --- a/src/index.js +++ b/src/index.js @@ -327,8 +327,10 @@ import FlatMap from './many-sources/flat-map' Observable.prototype.flatMap = function(fn) { return new FlatMap(this, fn).setName(this, 'flatMap') } -Observable.prototype.flatMapLatest = function(fn) { - return new FlatMap(this, fn, {concurLim: 1, drop: 'old'}).setName(this, 'flatMapLatest') +Observable.prototype.flatMapLatest = function(fn, options = {}) { + options.concurLim = 1 + options.drop = 'old' + return new FlatMap(this, fn, options).setName(this, 'flatMapLatest') } Observable.prototype.flatMapFirst = function(fn) { return new FlatMap(this, fn, {concurLim: 1}).setName(this, 'flatMapFirst') diff --git a/src/many-sources/abstract-pool.js b/src/many-sources/abstract-pool.js index 683314c5..7f99b5a5 100644 --- a/src/many-sources/abstract-pool.js +++ b/src/many-sources/abstract-pool.js @@ -5,12 +5,13 @@ import {concat, forEach, findByPred, find, remove, cloneArray} from '../utils/co const id = x => x -function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new'} = {}) { +function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new', overlapping = false} = {}) { Stream.call(this) this._queueLim = queueLim < 0 ? -1 : queueLim this._concurLim = concurLim < 0 ? -1 : concurLim this._drop = drop + this._overlapping = overlapping this._queue = [] this._curSources = [] this._$handleSubAny = event => this._handleSubAny(event) @@ -25,16 +26,21 @@ function AbstractPool({queueLim = 0, concurLim = -1, drop = 'new'} = {}) { inherit(AbstractPool, Stream, { _name: 'abstractPool', - _add(obj, toObs /* Function | falsey */) { + _add(obj, toObs /* Function | falsey */, allowOverflow) { toObs = toObs || id if (this._concurLim === -1 || this._curSources.length < this._concurLim) { this._addToCur(toObs(obj)) } else { - if (this._queueLim === -1 || this._queue.length < this._queueLim) { + if (this._queueLim === -1 || this._queue.length < this._queueLim || allowOverflow) { this._addToQueue(toObs(obj)) } else if (this._drop === 'old') { - this._removeOldest() - this._add(obj, toObs) + if (this._overlapping) { + this._add(obj, toObs, true) + this._removeOldest(true) + } else { + this._removeOldest() + this._add(obj, toObs) + } } } }, @@ -148,8 +154,8 @@ inherit(AbstractPool, Stream, { return index }, - _removeCur(obs) { - if (this._active) { + _removeCur(obs, after) { + if (!after && this._active) { this._unsubscribe(obs) } let index = find(this._curSources, obs) @@ -161,11 +167,14 @@ inherit(AbstractPool, Stream, { this._onEmpty() } } + if (after && this._active) { + this._unsubscribe(obs) + } return index }, - _removeOldest() { - this._removeCur(this._curSources[0]) + _removeOldest(after) { + this._removeCur(this._curSources[0], after) }, _pullQueue() { diff --git a/src/many-sources/flat-map.js b/src/many-sources/flat-map.js index 37f36ea3..e34e17fa 100644 --- a/src/many-sources/flat-map.js +++ b/src/many-sources/flat-map.js @@ -2,7 +2,12 @@ import {VALUE, ERROR, END} from '../constants' import {inherit} from '../utils/objects' import AbstractPool from './abstract-pool' -function FlatMap(source, fn, options) { +function FlatMap(source, fn, options = {}) { + if (fn && typeof fn !== 'function') { + options = fn + fn = undefined + } + AbstractPool.call(this, options) this._source = source this._fn = fn diff --git a/test/specs/flat-map-latest.js b/test/specs/flat-map-latest.js index 3893b398..a953f997 100644 --- a/test/specs/flat-map-latest.js +++ b/test/specs/flat-map-latest.js @@ -1,3 +1,4 @@ +const sinon = require('sinon') const {stream, prop, send, value, error, end, activate, deactivate, Kefir, expect} = require('../test-helpers') describe('flatMapLatest', () => { @@ -82,6 +83,62 @@ describe('flatMapLatest', () => { }) ).to.emit([value(3), value(4), value(5)], () => send(a, [value(1), value(2), value(3), value(4), value(5)])) }) + + describe('non-overlapping', () => { + it('should remove the previous stream before adding the next', () => { + onDeactivate = sinon.spy() + a = Kefir.stream(() => onDeactivate) + b = stream() + map = b.flatMapLatest() + activate(map) + send(b, [value(a)]) + send(b, [value(a)]) + deactivate(map) + expect(onDeactivate.callCount).to.equal(2) + }) + }) + + describe('overlapping', () => { + it('should add the next stream before removing the previous', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMapLatest({overlapping: true}) + activate(map) + send(a, [value(b)]) + send(a, [value(b)]) + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + + it('should accept optional map fn', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMapLatest(x => x.obs, {overlapping: true}) + activate(map) + send(a, [value({obs: b})]) + send(a, [value({obs: b})]) + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + + it('should work nicely with Kefir.constant and Kefir.never', () => { + const a = stream() + expect( + a.flatMapLatest( + x => { + if (x > 2) { + return Kefir.constant(x) + } else { + return Kefir.never() + } + }, + {overlapping: true} + ) + ).to.emit([value(3), value(4), value(5)], () => send(a, [value(1), value(2), value(3), value(4), value(5)])) + }) + }) }) describe('property', () => { diff --git a/test/specs/flat-map.js b/test/specs/flat-map.js index b765e774..182b2787 100644 --- a/test/specs/flat-map.js +++ b/test/specs/flat-map.js @@ -1,3 +1,4 @@ +const sinon = require('sinon') const {stream, prop, send, value, error, end, activate, deactivate, Kefir, expect} = require('../test-helpers') describe('flatMap', () => { @@ -239,4 +240,90 @@ describe('flatMap', () => { expect(result).to.flowErrors(c) }) }) + + describe('overlapping with a concurrency limit that has maxed out', () => { + describe('and with a queue limit', () => { + it('not maxed out, should add to the queue', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: true, queueLim: 1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // added to queue + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + + it('maxed out, should add the next stream before removing the previous', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: true, queueLim: 1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // added to queue + send(a, [value(b)]) // added to queue (overflow) + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + }) + + describe('and without a queue limit', () => { + it('should add to the queue', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: true, queueLim: -1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // replaced current + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + }) + }) + + describe('non-overlapping with a concurrency limit that has maxed out', () => { + describe('and with a queue limit', () => { + it('not maxed out, should add to the queue', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: false, queueLim: 1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // added to queue + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + + it('maxed out, should remove the previous stream before adding the next', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: false, queueLim: 1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // added to queue + send(a, [value(b)]) // added to queue (overflow) + deactivate(map) + expect(onDeactivate.callCount).to.equal(2) + }) + }) + + describe('and without a queue limit', () => { + it('should add to the queue', () => { + onDeactivate = sinon.spy() + a = stream() + b = Kefir.stream(() => onDeactivate) + map = a.flatMap({concurLim: 1, drop: 'old', overlapping: false, queueLim: -1}) + activate(map) + send(a, [value(b)]) // added to current + send(a, [value(b)]) // replaced current + deactivate(map) + expect(onDeactivate.callCount).to.equal(1) + }) + }) + }) })