Skip to content

Commit

Permalink
Multicast mode used to send messages on the flux sink.
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrczarnas committed Jul 24, 2024
1 parent 368c897 commit 9933568
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void start() {
return;
}

this.jobUpdateSink = Sinks.many().unicast().onBackpressureBuffer();
this.jobUpdateSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<DqoChangeNotificationEntry> dqoNotificationModelFlux = this.jobUpdateSink.asFlux().onBackpressureBuffer(SUBSCRIBER_BACKPRESSURE_BUFFER_SIZE);
dqoNotificationModelFlux.subscribeOn(Schedulers.boundedElastic())
.doOnComplete(() -> releaseAwaitingClients())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public void start() {
this.queueEmptyFuture = new CompletableFuture<>();
this.queueEmptyFuture.complete(0);

this.loadTableStatusRequestSink = Sinks.many().unicast().onBackpressureBuffer();
this.loadTableStatusRequestSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<List<CurrentTableStatusKey>> requestLoadFlux = this.loadTableStatusRequestSink.asFlux()
.onBackpressureBuffer(SUBSCRIBER_BACKPRESSURE_BUFFER_SIZE)
.buffer(Duration.ofMillis(TableStatusCache.BATCH_COLLECTION_TIMEOUT_MS)); // wait 50 millis, maybe multiple file system updates are made, like changing multiple parquet files... we want to merge all file changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public void start() {
this.queueEmptyFuture = new CompletableFuture<>();
this.queueEmptyFuture.complete(0);

this.loadObjectRequestSink = Sinks.many().unicast().onBackpressureBuffer();
this.loadObjectRequestSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<List<LabelRefreshKey>> requestLoadFlux = this.loadObjectRequestSink.asFlux()
.onBackpressureBuffer(SUBSCRIBER_BACKPRESSURE_BUFFER_SIZE)
.buffer(Duration.ofMillis(50)) // wait 50 millis, maybe multiple file system updates are made, we want to merge all file changes
Expand Down

0 comments on commit 9933568

Please sign in to comment.