Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FE: Implement previous button in topic messages page #550

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords
messagesProcessing.sentConsumingInfo(sink, records);
}

// cursor is null if target partitions were fully polled (no, need to do paging)
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
messagesProcessing.sendFinishEvents(sink, cursor);
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, Cursor.Tracking cursor,
boolean hasNext) {
messagesProcessing.sendFinishEvents(sink, cursor, hasNext);
sink.complete();
}
}
17 changes: 11 additions & 6 deletions api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import io.kafbat.ui.model.TopicMessageConsumingDTO;
import io.kafbat.ui.model.TopicMessageEventDTO;
import io.kafbat.ui.model.TopicMessageNextPageCursorDTO;
import javax.annotation.Nullable;
import io.kafbat.ui.model.TopicMessagePageCursorDTO;
import reactor.core.publisher.FluxSink;

class ConsumingStats {
Expand All @@ -28,13 +27,19 @@ void incFilterApplyError() {
filterApplyErrors++;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, Cursor.Tracking cursor, boolean hasNext) {
String previousCursorId = cursor.getPreviousCursorId();
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.cursor(
cursor != null
? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
.prevCursor(
previousCursorId != null
? new TopicMessagePageCursorDTO().id(previousCursorId)
: null
)
.nextCursor(
hasNext
? new TopicMessagePageCursorDTO().id(cursor.registerCursor())
: null
)
.consuming(createConsumingStats())
Expand Down
19 changes: 16 additions & 3 deletions api/src/main/java/io/kafbat/ui/emitter/Cursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -22,7 +24,9 @@ public static class Tracking {
private final ConsumerPosition originalPosition;
private final Predicate<TopicMessageDTO> filter;
private final int limit;
private final Function<Cursor, String> registerAction;
private final String cursorId;
private final BiFunction<Cursor, String, String> registerAction;
private final Function<String, Optional<String>> previousCursorIdGetter;

//topic -> partition -> offset
private final Table<String, Integer, Long> trackingOffsets = HashBasedTable.create();
Expand All @@ -31,12 +35,16 @@ public Tracking(ConsumerRecordDeserializer deserializer,
ConsumerPosition originalPosition,
Predicate<TopicMessageDTO> filter,
int limit,
Function<Cursor, String> registerAction) {
String cursorId,
BiFunction<Cursor, String, String> registerAction,
Function<String, Optional<String>> previousCursorIdGetter) {
this.deserializer = deserializer;
this.originalPosition = originalPosition;
this.filter = filter;
this.limit = limit;
this.cursorId = cursorId;
this.registerAction = registerAction;
this.previousCursorIdGetter = previousCursorIdGetter;
}

void trackOffset(String topic, int partition, long offset) {
Expand Down Expand Up @@ -82,9 +90,14 @@ String registerCursor() {
),
filter,
limit
)
),
this.cursorId
);
}

String getPreviousCursorId() {
return this.previousCursorIdGetter.apply(this.cursorId).orElse(null);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ void sentConsumingInfo(FluxSink<TopicMessageEventDTO> sink, PolledRecords polled
}
}

void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
void sendFinishEvents(FluxSink<TopicMessageEventDTO> sink, Cursor.Tracking cursor, boolean hasNext) {
if (!sink.isCancelled()) {
consumingStats.sendFinishEvent(sink, cursor);
consumingStats.sendFinishEvent(sink, cursor, hasNext);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
if (sink.isCancelled()) {
log.debug("Polling finished due to sink cancellation");
}
sendFinishStatsAndCompleteSink(sink, pollRange.isEmpty() ? null : cursor);
sendFinishStatsAndCompleteSink(sink, cursor, !pollRange.isEmpty());
log.debug("Polling finished");
} catch (InterruptException kafkaInterruptException) {
log.debug("Polling finished due to thread interruption");
Expand Down
34 changes: 13 additions & 21 deletions api/src/main/java/io/kafbat/ui/service/MessagesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.jetbrains.annotations.NotNull;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -216,46 +217,37 @@ public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
@Nullable Integer limit,
@Nullable String keySerde,
@Nullable String valueSerde) {
return loadMessages(
cluster,
topic,
Cursor cursor = new Cursor(
deserializationService.deserializerFor(cluster, topic, keySerde, valueSerde),
consumerPosition,
getMsgFilter(containsStringFilter, filterId),
fixPageSize(limit)
);
String cursorId = cursorsStorage.register(cursor, null);
return loadMessages(cluster, topic, cursorId, cursor);
}

public Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic, String cursorId) {
Cursor cursor = cursorsStorage.getCursor(cursorId)
.orElseThrow(() -> new ValidationException("Next page cursor not found. Maybe it was evicted from cache."));
return loadMessages(
cluster,
topic,
cursor.deserializer(),
cursor.consumerPosition(),
cursor.filter(),
cursor.limit()
);
return loadMessages(cluster, topic, cursorId, cursor);
}

private Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster,
String topic,
ConsumerRecordDeserializer deserializer,
ConsumerPosition consumerPosition,
Predicate<TopicMessageDTO> filter,
int limit) {
private @NotNull Flux<TopicMessageEventDTO> loadMessages(KafkaCluster cluster, String topic,
String cursorId, Cursor cursor) {
return withExistingTopic(cluster, topic)
.flux()
.publishOn(Schedulers.boundedElastic())
.flatMap(td -> loadMessagesImpl(cluster, deserializer, consumerPosition, filter, limit));
.flatMap(td -> loadMessagesImpl(cluster,
cursor.deserializer(), cursor.consumerPosition(), cursor.filter(), cursor.limit(), cursorId));
}

private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
ConsumerRecordDeserializer deserializer,
ConsumerPosition consumerPosition,
Predicate<TopicMessageDTO> filter,
int limit) {
int limit,
String cursorId) {
var emitter = switch (consumerPosition.pollingMode()) {
case TO_OFFSET, TO_TIMESTAMP, LATEST -> new BackwardEmitter(
() -> consumerGroupService.createConsumer(cluster),
Expand All @@ -264,7 +256,7 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
deserializer,
filter,
cluster.getPollingSettings(),
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit, cursorId)
);
case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> new ForwardEmitter(
() -> consumerGroupService.createConsumer(cluster),
Expand All @@ -273,7 +265,7 @@ private Flux<TopicMessageEventDTO> loadMessagesImpl(KafkaCluster cluster,
deserializer,
filter,
cluster.getPollingSettings(),
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit)
cursorsStorage.createNewCursor(deserializer, consumerPosition, filter, limit, cursorId)
);
case TAILING -> new TailingEmitter(
() -> consumerGroupService.createConsumer(cluster),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.TopicMessageDTO;
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
import jakarta.annotation.Nullable;
import java.util.Map;
import java.util.Optional;
import java.util.function.Predicate;
Expand All @@ -20,23 +21,36 @@ public class PollingCursorsStorage {
.maximumSize(MAX_SIZE)
.build();

private final Cache<String, String> previousCursorsMap = CacheBuilder.newBuilder()
.maximumSize(MAX_SIZE)
.build();

public Cursor.Tracking createNewCursor(ConsumerRecordDeserializer deserializer,
ConsumerPosition originalPosition,
Predicate<TopicMessageDTO> filter,
int limit) {
return new Cursor.Tracking(deserializer, originalPosition, filter, limit, this::register);
int limit,
String cursorId) {
return new Cursor.Tracking(deserializer, originalPosition, filter, limit, cursorId,
this::register, this::getPreviousCursorId);
}

public Optional<Cursor> getCursor(String id) {
return Optional.ofNullable(cursorsCache.getIfPresent(id));
}

public String register(Cursor cursor) {
public String register(Cursor cursor, @Nullable String previousCursorId) {
var id = RandomStringUtils.random(8, true, true);
cursorsCache.put(id, cursor);
if (previousCursorId != null) {
previousCursorsMap.put(id, previousCursorId);
}
return id;
}

public Optional<String> getPreviousCursorId(String cursorId) {
return Optional.ofNullable(previousCursorsMap.getIfPresent(cursorId));
}

@VisibleForTesting
public Map<String, Cursor> asMap() {
return cursorsCache.asMap();
Expand Down
2 changes: 1 addition & 1 deletion api/src/test/java/io/kafbat/ui/emitter/CursorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private ForwardEmitter createForwardEmitter(ConsumerPosition position) {
}

private Cursor.Tracking createCursor(ConsumerPosition position) {
return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE);
return cursorsStorage.createNewCursor(createRecordsDeserializer(), position, m -> true, PAGE_SIZE, "CursorId");
}

private EnhancedConsumer createConsumer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ void cursorIsRegisteredAfterPollingIsDoneAndCanBeUsedForNextPagePolling(PollingM
null, null, pageSize, StringSerde.name(), StringSerde.name())
.doOnNext(evt -> {
if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) {
assertThat(evt.getCursor()).isNotNull();
cursorIdCatcher.set(evt.getCursor().getId());
assertThat(evt.getNextCursor()).isNotNull();
cursorIdCatcher.set(evt.getNextCursor().getId());
}
})
.filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
Expand All @@ -147,7 +147,7 @@ void cursorIsRegisteredAfterPollingIsDoneAndCanBeUsedForNextPagePolling(PollingM
Flux<String> remainingMsgs = messagesService.loadMessages(cluster, testTopic, cursorIdCatcher.get())
.doOnNext(evt -> {
if (evt.getType() == TopicMessageEventDTO.TypeEnum.DONE) {
assertThat(evt.getCursor()).isNull();
assertThat(evt.getNextCursor()).isNull();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hadisfr can we add some tests for the new backward cursor here?

}
})
.filter(evt -> evt.getType() == TopicMessageEventDTO.TypeEnum.MESSAGE)
Expand Down
8 changes: 5 additions & 3 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2954,8 +2954,10 @@ components:
$ref: "#/components/schemas/TopicMessagePhase"
consuming:
$ref: "#/components/schemas/TopicMessageConsuming"
cursor:
$ref: "#/components/schemas/TopicMessageNextPageCursor"
prevCursor:
$ref: "#/components/schemas/TopicMessagePageCursor"
nextCursor:
$ref: "#/components/schemas/TopicMessagePageCursor"

TopicMessagePhase:
type: object
Expand Down Expand Up @@ -2985,7 +2987,7 @@ components:
filterApplyErrors:
type: integer

TopicMessageNextPageCursor:
TopicMessagePageCursor:
type: object
properties:
id:
Expand Down
20 changes: 17 additions & 3 deletions frontend/src/components/Topics/Topic/Messages/MessagesTable.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import { TopicMessage } from 'generated-sources';
import React, { useState } from 'react';
import { Button } from 'components/common/Button/Button';
import * as S from 'components/common/NewTable/Table.styled';
import { usePaginateTopics, useIsLiveMode } from 'lib/hooks/useMessagesFilters';
import {
useGoToNextPage,
useGoToPrevPage,
useIsLiveMode,
} from 'lib/hooks/useMessagesFilters';
import { useMessageFiltersStore } from 'lib/hooks/useMessageFiltersStore';

import PreviewModal from './PreviewModal';
Expand All @@ -20,12 +24,14 @@ const MessagesTable: React.FC<MessagesTableProps> = ({
messages,
isFetching,
}) => {
const paginate = usePaginateTopics();
const goToNextPage = useGoToNextPage();
const goToPrevPage = useGoToPrevPage();
const [previewFor, setPreviewFor] = useState<string | null>(null);

const [keyFilters, setKeyFilters] = useState<PreviewFilter[]>([]);
const [contentFilters, setContentFilters] = useState<PreviewFilter[]>([]);
const nextCursor = useMessageFiltersStore((state) => state.nextCursor);
const prevCursor = useMessageFiltersStore((state) => state.prevCursor);
const isLive = useIsLiveMode();

return (
Expand Down Expand Up @@ -97,11 +103,19 @@ const MessagesTable: React.FC<MessagesTableProps> = ({
</Table>
<S.Pagination>
<S.Pages>
<Button
disabled={isLive || isFetching || !prevCursor}
buttonType="secondary"
buttonSize="L"
onClick={goToPrevPage}
>
← Previous
</Button>
<Button
disabled={isLive || isFetching || !nextCursor}
buttonType="secondary"
buttonSize="L"
onClick={paginate}
onClick={goToNextPage}
>
Next →
</Button>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ jest.mock('react-router-dom', () => ({

jest.mock('lib/hooks/useMessagesFilters', () => ({
useIsLiveMode: jest.fn(),
usePaginateTopics: jest.fn(),
useGoToNextPage: jest.fn(),
useGoToPrevPage: jest.fn(),
}));

describe('MessagesTable', () => {
Expand Down Expand Up @@ -73,12 +74,23 @@ describe('MessagesTable', () => {
expect(screen.queryByText(/next/i)).toBeDisabled();
});

it('should check if previous button is disabled isLive Param', () => {
renderComponent({ isFetching: true });
expect(screen.queryByText(/previous/i)).toBeDisabled();
});

it('should check if next button is disabled if there is no nextCursor', () => {
(useIsLiveMode as jest.Mock).mockImplementation(() => false);
renderComponent({ isFetching: false });
expect(screen.queryByText(/next/i)).toBeDisabled();
});

it('should check if previous button is disabled if there is no prevCursor', () => {
(useIsLiveMode as jest.Mock).mockImplementation(() => false);
renderComponent({ isFetching: false });
expect(screen.queryByText(/previous/i)).toBeDisabled();
});

it('should check the display of the loader element during loader', () => {
renderComponent({ isFetching: true });
expect(screen.getByRole('progressbar')).toBeInTheDocument();
Expand Down
Loading
Loading