From 50aa173fbdeb28b3b193d3890bc2e1d0114a647b Mon Sep 17 00:00:00 2001 From: "H@di" Date: Tue, 17 Sep 2024 10:46:19 +0330 Subject: [PATCH] FE: fix missing previousCursorId in case of empty pollRange (#550) --- .../main/java/io/kafbat/ui/emitter/AbstractEmitter.java | 6 +++--- api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java | 7 +++---- .../main/java/io/kafbat/ui/emitter/MessagesProcessing.java | 4 ++-- .../java/io/kafbat/ui/emitter/RangePollingEmitter.java | 2 +- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java b/api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java index 7638586a5..04d21b72c 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java +++ b/api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java @@ -40,9 +40,9 @@ protected void sendConsuming(FluxSink sink, PolledRecords messagesProcessing.sentConsumingInfo(sink, records); } - // cursor is null if target partitions were fully polled (no, need to do paging) - protected void sendFinishStatsAndCompleteSink(FluxSink sink, @Nullable Cursor.Tracking cursor) { - messagesProcessing.sendFinishEvents(sink, cursor); + protected void sendFinishStatsAndCompleteSink(FluxSink sink, Cursor.Tracking cursor, + boolean hasNext) { + messagesProcessing.sendFinishEvents(sink, cursor, hasNext); sink.complete(); } } diff --git a/api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java b/api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java index 3c27459b2..b09570b15 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java +++ b/api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java @@ -3,7 +3,6 @@ import io.kafbat.ui.model.TopicMessageConsumingDTO; import io.kafbat.ui.model.TopicMessageEventDTO; import io.kafbat.ui.model.TopicMessagePageCursorDTO; -import javax.annotation.Nullable; import reactor.core.publisher.FluxSink; class ConsumingStats { @@ -28,8 +27,8 @@ void incFilterApplyError() { filterApplyErrors++; } - void sendFinishEvent(FluxSink sink, @Nullable Cursor.Tracking cursor) { - String previousCursorId = cursor != null ? cursor.getPreviousCursorId() : null; + void sendFinishEvent(FluxSink sink, Cursor.Tracking cursor, boolean hasNext) { + String previousCursorId = cursor.getPreviousCursorId(); sink.next( new TopicMessageEventDTO() .type(TopicMessageEventDTO.TypeEnum.DONE) @@ -39,7 +38,7 @@ void sendFinishEvent(FluxSink sink, @Nullable Cursor.Track : null ) .nextCursor( - cursor != null + hasNext ? new TopicMessagePageCursorDTO().id(cursor.registerCursor()) : null ) diff --git a/api/src/main/java/io/kafbat/ui/emitter/MessagesProcessing.java b/api/src/main/java/io/kafbat/ui/emitter/MessagesProcessing.java index 16dead8f5..815192306 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/MessagesProcessing.java +++ b/api/src/main/java/io/kafbat/ui/emitter/MessagesProcessing.java @@ -72,9 +72,9 @@ void sentConsumingInfo(FluxSink sink, PolledRecords polled } } - void sendFinishEvents(FluxSink sink, @Nullable Cursor.Tracking cursor) { + void sendFinishEvents(FluxSink sink, Cursor.Tracking cursor, boolean hasNext) { if (!sink.isCancelled()) { - consumingStats.sendFinishEvent(sink, cursor); + consumingStats.sendFinishEvent(sink, cursor, hasNext); } } diff --git a/api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java b/api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java index 794c70e57..3275d73ce 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java +++ b/api/src/main/java/io/kafbat/ui/emitter/RangePollingEmitter.java @@ -64,7 +64,7 @@ public void accept(FluxSink sink) { if (sink.isCancelled()) { log.debug("Polling finished due to sink cancellation"); } - sendFinishStatsAndCompleteSink(sink, pollRange.isEmpty() ? null : cursor); + sendFinishStatsAndCompleteSink(sink, cursor, !pollRange.isEmpty()); log.debug("Polling finished"); } catch (InterruptException kafkaInterruptException) { log.debug("Polling finished due to thread interruption");