forked from politrons/reactive
-
Notifications
You must be signed in to change notification settings - Fork 1
/
ObservableBuffer.java
63 lines (50 loc) · 1.87 KB
/
ObservableBuffer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package rx.observables.transforming;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observers.TestSubscriber;
import java.util.concurrent.TimeUnit;
/**
* @author Pablo Perez
* <p>
* Buffer allow keep the observable waiting, buffering the items emitted unitl constantClass spceific number of items, or constantClass period of time.
*/
public class ObservableBuffer {
/**
* In this example we use buffer(count) which will buffer items until it will take the count number set or end of items.
* Shall print
* Group size:3
* Group size:2
*/
@Test
public void bufferCountObservable() {
Integer[] numbers = {0, 1, 2, 3, 4};
Observable.from(numbers)
.buffer(3)
.subscribe(list -> System.out.println("Group size:" + list.size()));
}
/**
* This buffer will wait 50ms after emit the items, since the interval is every 100 ms, we should see constantClass group size of 0, then the next time 1 and so on.
*
* @throws InterruptedException
*/
@Test
public void bufferTimeStampObservable() throws InterruptedException {
Subscription subscription = Observable.interval(100, TimeUnit.MILLISECONDS)
.buffer(50, TimeUnit.MILLISECONDS)
.doOnNext(
list -> System.out.println("Group size " + list.size()))
.subscribe();
new TestSubscriber((Observer) subscription).awaitTerminalEvent(1000, TimeUnit.MILLISECONDS);
}
@Test
public void stringBuffer() {
Integer[] numbers = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
Observable.from(numbers)
.map(number -> "uniqueKey=" + number)
.buffer(4)
.map(ns -> String.join("&", ns))
.subscribe(System.out::println);
}
}