Skip to content

Commit

Permalink
BE: Messages API v2 (#115)
Browse files Browse the repository at this point in the history
Co-authored-by: iliax <[email protected]>
Co-authored-by: Roman Zabaluev <[email protected]>
  • Loading branch information
3 people authored Mar 16, 2024
1 parent 2abacc9 commit d637fa2
Show file tree
Hide file tree
Showing 25 changed files with 1,009 additions and 294 deletions.
110 changes: 55 additions & 55 deletions api/src/main/java/io/kafbat/ui/controller/MessagesController.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package io.kafbat.ui.controller;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_DELETE;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_PRODUCE;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
import static java.util.stream.Collectors.toMap;

import io.kafbat.ui.api.MessagesApi;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.CreateTopicMessageDTO;
import io.kafbat.ui.model.MessageFilterIdDTO;
import io.kafbat.ui.model.MessageFilterRegistrationDTO;
import io.kafbat.ui.model.MessageFilterTypeDTO;
import io.kafbat.ui.model.PollingModeDTO;
import io.kafbat.ui.model.SeekDirectionDTO;
import io.kafbat.ui.model.SeekTypeDTO;
import io.kafbat.ui.model.SerdeUsageDTO;
Expand All @@ -24,14 +27,10 @@
import io.kafbat.ui.service.DeserializationService;
import io.kafbat.ui.service.MessagesService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -74,6 +73,7 @@ public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilte
.map(ResponseEntity::ok);
}

@Deprecated
@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
String topicName,
Expand All @@ -86,6 +86,23 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
throw new ValidationException("Not supported");
}


@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
PollingModeDTO mode,
List<Integer> partitions,
Integer limit,
String stringFilter,
String smartFilterId,
Long offset,
Long timestamp,
String keySerde,
String valueSerde,
String cursor,
ServerWebExchange exchange) {
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topicActions(topicName, MESSAGES_READ)
Expand All @@ -95,27 +112,26 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
contextBuilder.auditActions(AuditAction.VIEW);
}

seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;

var positions = new ConsumerPosition(
seekType,
topicName,
parseSeekTo(topicName, seekType, seekTo)
);
Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
limit, seekDirection, keySerde, valueSerde)
)
);

var context = contextBuilder.build();
return validateAccess(context)
.then(job)
.doOnEach(sig -> audit(context, sig));
var accessContext = contextBuilder.build();

Flux<TopicMessageEventDTO> messagesFlux;
if (cursor != null) {
messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
} else {
messagesFlux = messagesService.loadMessages(
getCluster(clusterName),
topicName,
ConsumerPosition.create(checkNotNull(mode), checkNotNull(topicName), partitions, timestamp, offset),
stringFilter,
smartFilterId,
limit,
keySerde,
valueSerde
);
}
return accessControlService.validateAccess(accessContext)
.then(Mono.just(ResponseEntity.ok(messagesFlux)))
.doOnEach(sig -> auditService.audit(accessContext, sig));
}

@Override
Expand All @@ -136,34 +152,6 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(
).doOnEach(sig -> audit(context, sig));
}

/**
* The format is [partition]::[offset] for specifying offsets
* or [partition]::[timestamp in millis] for specifying timestamps.
*/
@Nullable
private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
if (seekTo == null || seekTo.isEmpty()) {
if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
return null;
}
throw new ValidationException("seekTo should be set if seekType is " + seekType);
}
return seekTo.stream()
.map(p -> {
String[] split = p.split("::");
if (split.length != 2) {
throw new IllegalArgumentException(
"Wrong seekTo argument format. See API docs for details");
}

return Pair.of(
new TopicPartition(topic, Integer.parseInt(split[0])),
Long.parseLong(split[1])
);
})
.collect(toMap(Pair::getKey, Pair::getValue));
}

@Override
public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
String topicName,
Expand All @@ -190,7 +178,19 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
);
}

@Override
public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
String topicName,
Mono<MessageFilterRegistrationDTO> registration,
ServerWebExchange exchange) {



final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topicActions(topicName, MESSAGES_READ)
.build());
return validateAccess.then(registration)
.map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
.map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
}
}
14 changes: 9 additions & 5 deletions api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kafbat.ui.emitter;

import io.kafbat.ui.model.TopicMessageEventDTO;
import jakarta.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
Expand All @@ -21,12 +22,14 @@ protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsum
return records;
}

protected boolean sendLimitReached() {
protected boolean isSendLimitReached() {
return messagesProcessing.limitReached();
}

protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
messagesProcessing.send(sink, records);
protected void send(FluxSink<TopicMessageEventDTO> sink,
Iterable<ConsumerRecord<Bytes, Bytes>> records,
@Nullable Cursor.Tracking cursor) {
messagesProcessing.send(sink, records, cursor);
}

protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
Expand All @@ -37,8 +40,9 @@ protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords
messagesProcessing.sentConsumingInfo(sink, records);
}

protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
messagesProcessing.sendFinishEvent(sink);
// cursor is null if target partitions were fully polled (no, need to do paging)
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
messagesProcessing.sendFinishEvents(sink, cursor);
sink.complete();
}
}
13 changes: 5 additions & 8 deletions api/src/main/java/io/kafbat/ui/emitter/BackwardEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
PollingSettings pollingSettings,
Cursor.Tracking cursor) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
false,
messagesPerPage
),
pollingSettings
new MessagesProcessing(deserializer, filter, false, messagesPerPage),
pollingSettings,
cursor
);
}

Expand Down
9 changes: 8 additions & 1 deletion api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.kafbat.ui.model.TopicMessageConsumingDTO;
import io.kafbat.ui.model.TopicMessageEventDTO;
import io.kafbat.ui.model.TopicMessageNextPageCursorDTO;
import javax.annotation.Nullable;
import reactor.core.publisher.FluxSink;

class ConsumingStats {
Expand All @@ -26,10 +28,15 @@ void incFilterApplyError() {
filterApplyErrors++;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.cursor(
cursor != null
? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
: null
)
.consuming(createConsumingStats())
);
}
Expand Down
90 changes: 90 additions & 0 deletions api/src/main/java/io/kafbat/ui/emitter/Cursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.kafbat.ui.emitter;

import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.PollingModeDTO;
import io.kafbat.ui.model.TopicMessageDTO;
import io.kafbat.ui.serdes.ConsumerRecordDeserializer;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.common.TopicPartition;

public record Cursor(ConsumerRecordDeserializer deserializer,
ConsumerPosition consumerPosition,
Predicate<TopicMessageDTO> filter,
int limit) {

public static class Tracking {
private final ConsumerRecordDeserializer deserializer;
private final ConsumerPosition originalPosition;
private final Predicate<TopicMessageDTO> filter;
private final int limit;
private final Function<Cursor, String> registerAction;

//topic -> partition -> offset
private final Table<String, Integer, Long> trackingOffsets = HashBasedTable.create();

public Tracking(ConsumerRecordDeserializer deserializer,
ConsumerPosition originalPosition,
Predicate<TopicMessageDTO> filter,
int limit,
Function<Cursor, String> registerAction) {
this.deserializer = deserializer;
this.originalPosition = originalPosition;
this.filter = filter;
this.limit = limit;
this.registerAction = registerAction;
}

void trackOffset(String topic, int partition, long offset) {
trackingOffsets.put(topic, partition, offset);
}

void initOffsets(Map<TopicPartition, Long> initialSeekOffsets) {
initialSeekOffsets.forEach((tp, off) -> trackOffset(tp.topic(), tp.partition(), off));
}

private Map<TopicPartition, Long> getOffsetsMap(int offsetToAdd) {
Map<TopicPartition, Long> result = new HashMap<>();
trackingOffsets.rowMap()
.forEach((topic, partsMap) ->
partsMap.forEach((p, off) -> result.put(new TopicPartition(topic, p), off + offsetToAdd)));
return result;
}

String registerCursor() {
return registerAction.apply(
new Cursor(
deserializer,
new ConsumerPosition(
switch (originalPosition.pollingMode()) {
case TO_OFFSET, TO_TIMESTAMP, LATEST -> PollingModeDTO.TO_OFFSET;
case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> PollingModeDTO.FROM_OFFSET;
case TAILING -> throw new IllegalStateException();
},
originalPosition.topic(),
originalPosition.partitions(),
null,
new ConsumerPosition.Offsets(
null,
getOffsetsMap(
switch (originalPosition.pollingMode()) {
case TO_OFFSET, TO_TIMESTAMP, LATEST -> 0;
// when doing forward polling we need to start from latest msg's offset + 1
case FROM_OFFSET, FROM_TIMESTAMP, EARLIEST -> 1;
case TAILING -> throw new IllegalStateException();
}
)
)
),
filter,
limit
)
);
}
}

}
13 changes: 5 additions & 8 deletions api/src/main/java/io/kafbat/ui/emitter/ForwardEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ public ForwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
PollingSettings pollingSettings,
Cursor.Tracking cursor) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
true,
messagesPerPage
),
pollingSettings
new MessagesProcessing(deserializer, filter, true, messagesPerPage),
pollingSettings,
cursor
);
}

Expand Down
11 changes: 4 additions & 7 deletions api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,16 @@ public class MessageFilters {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

public static Predicate<TopicMessageDTO> createMsgFilter(String query, MessageFilterTypeDTO type) {
return switch (type) {
case STRING_CONTAINS -> containsStringFilter(query);
case CEL_SCRIPT -> celScriptFilter(query);
};
public static Predicate<TopicMessageDTO> noop() {
return e -> true;
}

static Predicate<TopicMessageDTO> containsStringFilter(String string) {
public static Predicate<TopicMessageDTO> containsStringFilter(String string) {
return msg -> StringUtils.contains(msg.getKey(), string)
|| StringUtils.contains(msg.getContent(), string);
}

static Predicate<TopicMessageDTO> celScriptFilter(String script) {
public static Predicate<TopicMessageDTO> celScriptFilter(String script) {
CelValidationResult celValidationResult = CEL_COMPILER.compile(script);
if (celValidationResult.hasError()) {
throw new CelException(script, celValidationResult.getErrorString());
Expand Down
Loading

0 comments on commit d637fa2

Please sign in to comment.