Skip to content

Commit

Permalink
refactor: use record classes
Browse files Browse the repository at this point in the history
  • Loading branch information
yeikel committed Dec 12, 2024
1 parent 318bcc9 commit 1b5ae1f
Show file tree
Hide file tree
Showing 37 changed files with 137 additions and 238 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.kafbat.ui.model;

import com.google.common.base.Throwables;
import io.kafbat.ui.model.BrokerDiskUsageDTO;
import io.kafbat.ui.model.MetricsCollectionErrorDTO;
import io.kafbat.ui.model.ServerStatusDTO;
import java.math.BigDecimal;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -40,8 +37,8 @@ public InternalClusterState(KafkaCluster cluster, Statistics statistics) {
.stackTrace(Throwables.getStackTraceAsString(e)))
.orElse(null);
topicCount = statistics.getTopicDescriptions().size();
brokerCount = statistics.getClusterDescription().getNodes().size();
activeControllers = Optional.ofNullable(statistics.getClusterDescription().getController())
brokerCount = statistics.getClusterDescription().nodes().size();
activeControllers = Optional.ofNullable(statistics.getClusterDescription().controller())
.map(Node::id)
.orElse(null);
version = statistics.getVersion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,11 @@
import com.google.common.collect.Table;
import java.util.Map;
import java.util.Optional;
import lombok.Value;
import org.apache.kafka.common.TopicPartition;


public class InternalPartitionsOffsets {

@Value
public static class Offsets {
Long earliest;
Long latest;
public record Offsets(Long earliest, Long latest) {
}

private final Table<String, Integer, Offsets> offsets = HashBasedTable.create();
Expand Down
9 changes: 1 addition & 8 deletions api/src/main/java/io/kafbat/ui/model/InternalReplica.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
package io.kafbat.ui.model;

import lombok.Builder;
import lombok.Data;
import lombok.RequiredArgsConstructor;

@Data
@Builder
@RequiredArgsConstructor
public class InternalReplica {
private final int broker;
private final boolean leader;
private final boolean inSync;
public record InternalReplica(int broker, boolean leader, boolean inSync) {
}
4 changes: 2 additions & 2 deletions api/src/main/java/io/kafbat/ui/model/InternalTopic.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public static InternalTopic from(TopicDescription topicDescription,

partitionsOffsets.get(topicDescription.name(), partition.partition())
.ifPresent(offsets -> {
partitionDto.offsetMin(offsets.getEarliest());
partitionDto.offsetMax(offsets.getLatest());
partitionDto.offsetMin(offsets.earliest());
partitionDto.offsetMax(offsets.latest());
});

var segmentStats =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ private void fillKey(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec)
}
try {
var deserResult = keyDeserializer.deserialize(new RecordHeadersImpl(), rec.key().get());
message.setKey(deserResult.getResult());
message.setKey(deserResult.result());
message.setKeySerde(keySerdeName);
message.setKeyDeserializeProperties(deserResult.getAdditionalProperties());
message.setKeyDeserializeProperties(deserResult.additionalProperties());
} catch (Exception e) {
log.trace("Error deserializing key for key topic: {}, partition {}, offset {}, with serde {}",
rec.topic(), rec.partition(), rec.offset(), keySerdeName, e);
var deserResult = fallbackKeyDeserializer.deserialize(new RecordHeadersImpl(), rec.key().get());
message.setKey(deserResult.getResult());
message.setKey(deserResult.result());
message.setKeySerde(fallbackSerdeName);
}
}
Expand All @@ -98,15 +98,15 @@ private void fillValue(TopicMessageDTO message, ConsumerRecord<Bytes, Bytes> rec
try {
var deserResult = valueDeserializer.deserialize(
new RecordHeadersImpl(rec.headers()), rec.value().get());
message.setContent(deserResult.getResult());
message.setContent(deserResult.result());
message.setValueSerde(valueSerdeName);
message.setValueDeserializeProperties(deserResult.getAdditionalProperties());
message.setValueDeserializeProperties(deserResult.additionalProperties());
} catch (Exception e) {
log.trace("Error deserializing key for value topic: {}, partition {}, offset {}, with serde {}",
rec.topic(), rec.partition(), rec.offset(), valueSerdeName, e);
var deserResult = fallbackValueDeserializer.deserialize(
new RecordHeadersImpl(rec.headers()), rec.value().get());
message.setContent(deserResult.getResult());
message.setContent(deserResult.result());
message.setValueSerde(fallbackSerdeName);
}
}
Expand Down
7 changes: 1 addition & 6 deletions api/src/main/java/io/kafbat/ui/serdes/CustomSerdeLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.Value;


class CustomSerdeLoader {

@Value
static class CustomSerde {
Serde serde;
ClassLoader classLoader;
record CustomSerde(Serde serde, ClassLoader classLoader) {
}

// serde location -> classloader
Expand Down
4 changes: 2 additions & 2 deletions api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,10 @@ private SerdeInstance loadAndInitCustomSerde(ClustersProperties.SerdeConfig serd
serdeConfig.getClassName(), serdeConfig.getFilePath(), serdeProps, clusterProps, globalProps);
return new SerdeInstance(
serdeConfig.getName(),
loaded.getSerde(),
loaded.serde(),
nullablePattern(serdeConfig.getTopicKeysPattern()),
nullablePattern(serdeConfig.getTopicValuesPattern()),
loaded.getClassLoader()
loaded.classLoader()
);
}

Expand Down
6 changes: 3 additions & 3 deletions api/src/main/java/io/kafbat/ui/service/BrokerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private Mono<List<ConfigEntry>> loadBrokersConfig(
}

private Flux<InternalBrokerConfig> getBrokersConfig(KafkaCluster cluster, Integer brokerId) {
if (statisticsCache.get(cluster).getClusterDescription().getNodes()
if (statisticsCache.get(cluster).getClusterDescription().nodes()
.stream().noneMatch(node -> node.id() == brokerId)) {
return Flux.error(
new NotFoundException(String.format("Broker with id %s not found", brokerId)));
Expand All @@ -70,7 +70,7 @@ public Flux<InternalBroker> getBrokers(KafkaCluster cluster) {
return adminClientService
.get(cluster)
.flatMap(ReactiveAdminClient::describeCluster)
.map(description -> description.getNodes().stream()
.map(description -> description.nodes().stream()
.map(node -> new InternalBroker(node, partitionsDistribution, stats))
.collect(Collectors.toList()))
.flatMapMany(Flux::fromIterable);
Expand Down Expand Up @@ -113,7 +113,7 @@ private Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> getC
KafkaCluster cluster, List<Integer> reqBrokers) {
return adminClientService.get(cluster)
.flatMap(admin -> {
List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().getNodes()
List<Integer> brokers = statisticsCache.get(cluster).getClusterDescription().nodes()
.stream()
.map(Node::id)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ private SerdeDescriptionDTO toDto(SerdeInstance serdeInstance,
return new SerdeDescriptionDTO()
.name(serdeInstance.getName())
.description(serdeInstance.description().orElse(null))
.schema(schemaOpt.map(SchemaDescription::getSchema).orElse(null))
.additionalProperties(schemaOpt.map(SchemaDescription::getAdditionalProperties).orElse(null))
.schema(schemaOpt.map(SchemaDescription::schema).orElse(null))
.additionalProperties(schemaOpt.map(SchemaDescription::additionalProperties).orElse(null))
.preferred(preferred);
}

Expand Down
2 changes: 1 addition & 1 deletion api/src/main/java/io/kafbat/ui/service/FeatureService.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ private Mono<ClusterFeature> quotaManagement(ReactiveAdminClient adminClient) {
}

private Mono<ClusterFeature> aclEdit(ReactiveAdminClient adminClient, ClusterDescription clusterDescription) {
var authorizedOps = Optional.ofNullable(clusterDescription.getAuthorizedOperations()).orElse(Set.of());
var authorizedOps = Optional.ofNullable(clusterDescription.authorizedOperations()).orElse(Set.of());
boolean canEdit = aclViewEnabled(adminClient)
&& (authorizedOps.contains(AclOperation.ALL) || authorizedOps.contains(AclOperation.ALTER));
return canEdit
Expand Down
57 changes: 27 additions & 30 deletions api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,11 @@ static Mono<Set<SupportedFeature>> forVersion(AdminClient ac, String kafkaVersio
}
}

@Value
public static class ClusterDescription {
@Nullable
Node controller;
String clusterId;
Collection<Node> nodes;
@Nullable // null, if ACL is disabled
Set<AclOperation> authorizedOperations;
/**
* @param authorizedOperations null, if ACL is disabled
*/
public record ClusterDescription(@Nullable Node controller, String clusterId, Collection<Node> nodes,
@Nullable Set<AclOperation> authorizedOperations) {
}

@Builder
Expand All @@ -147,7 +144,7 @@ private static Mono<ConfigRelatedInfo> extract(AdminClient ac) {
// choosing node from which we will get configs (starting with controller)
var targetNodeId = Optional.ofNullable(desc.controller)
.map(Node::id)
.orElse(desc.getNodes().iterator().next().id());
.orElse(desc.nodes().iterator().next().id());
return loadBrokersConfig(ac, List.of(targetNodeId))
.map(map -> map.isEmpty() ? List.<ConfigEntry>of() : map.get(targetNodeId))
.flatMap(configs -> {
Expand Down Expand Up @@ -191,18 +188,18 @@ private static Mono<Boolean> isAuthorizedSecurityEnabled(AdminClient ac, @Nullab
// (see MonoSink.success(..) javadoc for details)
public static <T> Mono<T> toMono(KafkaFuture<T> future) {
return Mono.<T>create(sink -> future.whenComplete((res, ex) -> {
if (ex != null) {
// KafkaFuture doc is unclear about what exception wrapper will be used
// (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
sink.error(ex.getCause()); //unwrapping exception
} else {
sink.error(ex);
}
} else {
sink.success(res);
}
})).doOnCancel(() -> future.cancel(true))
if (ex != null) {
// KafkaFuture doc is unclear about what exception wrapper will be used
// (from docs it should be ExecutionException, be we actually see CompletionException, so checking both
if (ex instanceof CompletionException || ex instanceof ExecutionException) {
sink.error(ex.getCause()); //unwrapping exception
} else {
sink.error(ex);
}
} else {
sink.success(res);
}
})).doOnCancel(() -> future.cancel(true))
// AdminClient is using single thread for kafka communication
// and by default all downstream operations (like map(..)) on created Mono will be executed on this thread.
// If some of downstream operation are blocking (by mistake) this can lead to
Expand Down Expand Up @@ -391,7 +388,7 @@ static <K, V> Mono<Map<K, V>> toMonoWithExceptionFilter(Map<K, KafkaFuture<V>> v

public Mono<Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>> describeLogDirs() {
return describeCluster()
.map(d -> d.getNodes().stream().map(Node::id).collect(toList()))
.map(d -> d.nodes().stream().map(Node::id).collect(toList()))
.flatMap(this::describeLogDirs);
}

Expand Down Expand Up @@ -419,12 +416,12 @@ private static Mono<ClusterDescription> describeClusterImpl(AdminClient client,
result.controller(), result.clusterId(), result.nodes(), result.authorizedOperations());
return toMono(allOfFuture).then(
Mono.fromCallable(() ->
new ClusterDescription(
result.controller().get(),
result.clusterId().get(),
result.nodes().get(),
result.authorizedOperations().get()
)
new ClusterDescription(
result.controller().get(),
result.clusterId().get(),
result.nodes().get(),
result.authorizedOperations().get()
)
)
);
}
Expand Down Expand Up @@ -599,8 +596,8 @@ private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collect

@VisibleForTesting
static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
Predicate<TopicPartition> partitionPredicate,
boolean failOnUnknownLeader) {
Predicate<TopicPartition> partitionPredicate,
boolean failOnUnknownLeader) {
var goodPartitions = new HashSet<TopicPartition>();
for (TopicDescription description : topicDescriptions) {
var goodTopicPartitions = new ArrayList<TopicPartition>();
Expand Down
6 changes: 3 additions & 3 deletions api/src/main/java/io/kafbat/ui/service/StatisticsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ public Mono<Statistics> updateCache(KafkaCluster c) {
private Mono<Statistics> getStatistics(KafkaCluster cluster) {
return adminClientService.get(cluster).flatMap(ac ->
ac.describeCluster().flatMap(description ->
ac.updateInternalStats(description.getController()).then(
ac.updateInternalStats(description.controller()).then(
Mono.zip(
List.of(
metricsCollector.getBrokerMetrics(cluster, description.getNodes()),
metricsCollector.getBrokerMetrics(cluster, description.nodes()),
getLogDirInfo(description, ac),
featureService.getAvailableFeatures(ac, cluster, description),
loadTopicConfigs(cluster),
Expand All @@ -64,7 +64,7 @@ private Mono<Statistics> getStatistics(KafkaCluster cluster) {
}

private Mono<InternalLogDirStats> getLogDirInfo(ClusterDescription desc, ReactiveAdminClient ac) {
var brokerIds = desc.getNodes().stream().map(Node::id).collect(Collectors.toSet());
var brokerIds = desc.nodes().stream().map(Node::id).collect(Collectors.toSet());
return ac.describeLogDirs(brokerIds).map(InternalLogDirStats::new);
}

Expand Down
6 changes: 3 additions & 3 deletions api/src/main/java/io/kafbat/ui/service/TopicsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public Mono<ReplicationFactorChangeResponseDTO> changeReplicationFactor(
Integer actual = topic.getReplicationFactor();
Integer requested = replicationFactorChange.getTotalReplicationFactor();
Integer brokersCount = statisticsCache.get(cluster).getClusterDescription()
.getNodes().size();
.nodes().size();

if (requested.equals(actual)) {
return Mono.error(
Expand Down Expand Up @@ -361,14 +361,14 @@ private Map<Integer, List<Integer>> getCurrentAssignment(InternalTopic topic) {
.collect(toMap(
InternalPartition::getPartition,
p -> p.getReplicas().stream()
.map(InternalReplica::getBroker)
.map(InternalReplica::broker)
.collect(toList())
));
}

private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
Map<Integer, List<Integer>> currentAssignment) {
Map<Integer, Integer> result = statisticsCache.get(cluster).getClusterDescription().getNodes()
Map<Integer, Integer> result = statisticsCache.get(cluster).getClusterDescription().nodes()
.stream()
.map(Node::id)
.collect(toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ public Optional<JsonNode> getColumnValue(List<JsonNode> row, String column) {
}
}

@Value
private static class KsqlRequest {
String ksql;
Map<String, String> streamsProperties;
private record KsqlRequest(String ksql, Map<String, String> streamsProperties) {
}

//--------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -172,7 +169,7 @@ public Flux<KsqlResponseTable> execute(String ksql, Map<String, String> streamPr
if (parsedStatements.isEmpty()) {
return errorTableFlux("Sql statement is invalid or unsupported");
}
var statements = parsedStatements.get().getStatements();
var statements = parsedStatements.get().statements();
if (statements.size() > 1) {
return errorTableFlux("Only single statement supported now");
}
Expand Down
5 changes: 1 addition & 4 deletions api/src/main/java/io/kafbat/ui/service/ksql/KsqlGrammar.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import ksql.KsqlGrammarLexer;
import ksql.KsqlGrammarParser;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.Delegate;
import org.antlr.v4.runtime.BaseErrorListener;
import org.antlr.v4.runtime.CharStream;
Expand All @@ -22,9 +21,7 @@ class KsqlGrammar {
private KsqlGrammar() {
}

@Value
static class KsqlStatements {
List<KsqlGrammarParser.SingleStatementContext> statements;
record KsqlStatements(List<KsqlGrammarParser.SingleStatementContext> statements) {
}

// returns Empty if no valid statements found
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,7 @@
@Service
public class KsqlServiceV2 {

@lombok.Value
private static class KsqlExecuteCommand {
KafkaCluster cluster;
String ksql;
Map<String, String> streamProperties;
private record KsqlExecuteCommand(KafkaCluster cluster, String ksql, Map<String, String> streamProperties) {
}

private final Cache<String, KsqlExecuteCommand> registeredCommands =
Expand Down
10 changes: 1 addition & 9 deletions api/src/main/java/io/kafbat/ui/service/masking/DataMasking.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,7 @@ public class DataMasking {

private static final JsonMapper JSON_MAPPER = new JsonMapper();

@Value
static class Mask {
@Nullable
Pattern topicKeysPattern;
@Nullable
Pattern topicValuesPattern;

MaskingPolicy policy;

record Mask(@Nullable Pattern topicKeysPattern, @Nullable Pattern topicValuesPattern, MaskingPolicy policy) {
boolean shouldBeApplied(String topic, Serde.Target target) {
return target == Serde.Target.KEY
? topicKeysPattern != null && topicKeysPattern.matcher(topic).matches()
Expand Down
Loading

0 comments on commit 1b5ae1f

Please sign in to comment.