Skip to content

Commit

Permalink
FE: fix missing previousCursorId in case of empty pollRange (#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
hadisfr committed Sep 17, 2024
1 parent 410826e commit 50aa173
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 10 deletions.
6 changes: 3 additions & 3 deletions api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords
messagesProcessing.sentConsumingInfo(sink, records);
}

// cursor is null if target partitions were fully polled (no, need to do paging)
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
messagesProcessing.sendFinishEvents(sink, cursor);
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, Cursor.Tracking cursor,
boolean hasNext) {
messagesProcessing.sendFinishEvents(sink, cursor, hasNext);
sink.complete();
}
}
7 changes: 3 additions & 4 deletions api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.kafbat.ui.model.TopicMessageConsumingDTO;
import io.kafbat.ui.model.TopicMessageEventDTO;
import io.kafbat.ui.model.TopicMessagePageCursorDTO;
import javax.annotation.Nullable;
import reactor.core.publisher.FluxSink;

class ConsumingStats {
Expand All @@ -28,8 +27,8 @@ void incFilterApplyError() {
filterApplyErrors++;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
String previousCursorId = cursor != null ? cursor.getPreviousCursorId() : null;
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, Cursor.Tracking cursor, boolean hasNext) {
String previousCursorId = cursor.getPreviousCursorId();
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
Expand All @@ -39,7 +38,7 @@ void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Track
: null
)
.nextCursor(
cursor != null
hasNext
? new TopicMessagePageCursorDTO().id(cursor.registerCursor())
: null
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polled
}
}

void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, Cursor.Tracking cursor, boolean hasNext) {
if (!sink.isCancelled()) {
consumingStats.sendFinishEvent(sink, cursor);
consumingStats.sendFinishEvent(sink, cursor, hasNext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
if (sink.isCancelled()) {
log.debug("Polling finished due to sink cancellation");
}
sendFinishStatsAndCompleteSink(sink, pollRange.isEmpty() ? null : cursor);
sendFinishStatsAndCompleteSink(sink, cursor, !pollRange.isEmpty());
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
Expand Down

0 comments on commit 50aa173

Please sign in to comment.