Skip to content

Commit

Permalink
Merge branch 'main' into feature/acl/rw
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Makarichev committed Mar 17, 2024
2 parents eaac07b + 0b5629c commit 86a8c88
Show file tree
Hide file tree
Showing 124 changed files with 3,813 additions and 3,315 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@
- [ ] New and existing unit tests pass locally with my changes
- [ ] Any dependent changes have been merged

Check out [Contributing](https://github.com/kafbat/kafka-ui/blob/main/CONTRIBUTING.md) and [Code of Conduct](https://github.com/kafbat/kafka-ui/blob/main/CODE-OF-CONDUCT.md)
Check out [Contributing](https://github.com/kafbat/kafka-ui/blob/main/.github/CONTRIBUTING.md) and [Code of Conduct](https://github.com/kafbat/kafka-ui/blob/main/.github/CODE-OF-CONDUCT.md)

**A picture of a cute animal (not mandatory but encouraged)**
2 changes: 1 addition & 1 deletion .github/workflows/cve.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: CVE checks docker main
name: "Infra: CVE checks"
on:
workflow_dispatch:
schedule:
Expand Down
51 changes: 35 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,46 @@
![logo](documentation/images/logo.png) Kafbat UI 
------------------
#### Versatile, fast and lightweight web UI for managing Apache Kafka® clusters. Built by developers, for developers.
<br/>
<div align="center">
<img src="documentation/images/logo_new.png" alt="logo"/>
<h3>Kafbat UI</h3>

[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/kafbat/kafka-ui/blob/main/LICENSE)
![Price free](documentation/images/free-open-source.svg)
[![Release version](https://img.shields.io/github/v/release/kafbat/kafka-ui)](https://github.com/kafbat/kafka-ui/releases)
[![Chat with us](https://img.shields.io/discord/897805035122077716)](https://discord.gg/4DWzD7pGE5)
[![Docker pulls](https://img.shields.io/docker/pulls/kafbat/kafka-ui)](https://hub.docker.com/r/kafbat/kafka-ui)
Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.
</div>

<div align="center">
<a href="https://github.com/kafbat/kafka-ui/blob/main/LICENSE"><img src="https://img.shields.io/badge/License-Apache%202.0-blue.svg" alt="License"/></a>
<img src="documentation/images/free-open-source.svg" alt="price free"/>
<a href="https://github.com/kafbat/kafka-ui/releases"><img src="https://img.shields.io/github/v/release/kafbat/kafka-ui" alt="latest release version"/></a>
<a href="https://discord.gg/4DWzD7pGE5"><img src="https://img.shields.io/discord/897805035122077716" alt="discord online number count"/></a>
<a href="https://github.com/sponsors/kafbat"><img src="https://img.shields.io/github/sponsors/kafbat?style=flat&logo=githubsponsors&logoColor=%23EA4AAA&label=Support%20us" alt="" /></a>
</div>

<p align="center">
<a href="https://ui.docs.kafbat.io/">DOCS</a> •
<a href="https://ui.docs.kafbat.io/configuration/quick-start">QUICK START</a> •
<a href="https://discord.gg/4DWzD7pGE5">COMMUNITY DISCORD</a>
<a href="https://ui.docs.kafbat.io/">Documentation</a> •
<a href="https://ui.docs.kafbat.io/configuration/quick-start">Quick Start</a> •
<a href="https://discord.gg/4DWzD7pGE5">Community</a>
<br/>
<a href="https://aws.amazon.com/marketplace/pp/{replaceMe}">AWS Marketplace</a> •
<a href="https://www.producthunt.com/products/ui-for-apache-kafka/reviews/new">ProductHunt</a>
</p>

<p align="center">
<img src="https://repobeats.axiom.co/api/embed/2e8a7c2d711af9daddd34f9791143e7554c35d0f.svg" />
<img src="https://repobeats.axiom.co/api/embed/88d2bd9887380c7d86e2f986725d9af52ebad7f4.svg" alt="stats"/>
</p>

#### Kafbat UI is a free, open-source web UI to monitor and manage Apache Kafka clusters.

Kafbat UI is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.

![Interface](documentation/images/Interface.gif)
<i>
Kafbat UI, developed by <b>Kafbat</b>*, proudly carries forward the legacy of the UI Apache Kafka project.
Our dedication is reflected in the continuous evolution of the project, ensuring adherence to its foundational vision while adapting to meet modern demands.
We extend our gratitude to Provectus for their past support in groundbreaking work, which serves as a cornerstone for our ongoing innovation and dedication.

<b>*</b> - The <b>Kafbat</b> team comprises key contributors from the project's inception, bringing a wealth of experience and insight to this renewed endeavor.
</i>

# Interface

![Interface](https://raw.githubusercontent.com/kafbat/kafka-ui/images/overview.gif)

# Features
* **Multi-Cluster Management** — monitor and manage all your clusters in one place
Expand Down Expand Up @@ -108,7 +122,7 @@ services:
- ~/kui/config.yml:/etc/kafkaui/dynamic_config.yaml
```

Please refer to our [configuration](https://ui.docs.kafbat.io/configuration/quick-start) page to proceed with further app configuration.
Please refer to our [configuration](https://ui.docs.kafbat.io/configuration/configuration-file) page to proceed with further app configuration.

## Some useful configuration related links

Expand All @@ -134,8 +148,13 @@ Info endpoint (build info) is located at `/actuator/info`.

# Configuration options

All of the environment variables/config properties could be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).
All the environment variables/config properties could be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).

# Contributing

Please refer to [contributing guide](https://ui.docs.kafbat.io/development/contributing), we'll guide you from there.

# Support

As we're fully independent, team members contribute in their free time.
Your support is crucial for us, if you wish to sponsor us, take a look [here](https://github.com/sponsors/kafbat)
2 changes: 1 addition & 1 deletion api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#FROM azul/zulu-openjdk-alpine:17-jre-headless
FROM azul/zulu-openjdk-alpine@sha256:a36679ac0d28cb835e2a8c00e1e0d95509c6c51c5081c7782b85edb1f37a771a
FROM azul/zulu-openjdk-alpine@sha256:d59f1266db40341318e563fd76c21b2880ffa5d371f0c097c29d33f89c3a0010

RUN apk add --no-cache \
# snappy codec
Expand Down
24 changes: 24 additions & 0 deletions api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,30 @@
<groupId>dev.cel</groupId>
<artifactId>cel</artifactId>
</dependency>
<!-- CVE fixes -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.4.12</version>
</dependency>
<!-- CVE fixes -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.4.12</version>
</dependency>
<!-- CVE fixes -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>logging-interceptor</artifactId>
<version>4.12.0</version>
</dependency>
<!-- CVE fixes -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.26.0</version>
</dependency>

</dependencies>

Expand Down
110 changes: 55 additions & 55 deletions api/src/main/java/io/kafbat/ui/controller/MessagesController.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package io.kafbat.ui.controller;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_DELETE;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_PRODUCE;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
import static java.util.stream.Collectors.toMap;

import io.kafbat.ui.api.MessagesApi;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.CreateTopicMessageDTO;
import io.kafbat.ui.model.MessageFilterIdDTO;
import io.kafbat.ui.model.MessageFilterRegistrationDTO;
import io.kafbat.ui.model.MessageFilterTypeDTO;
import io.kafbat.ui.model.PollingModeDTO;
import io.kafbat.ui.model.SeekDirectionDTO;
import io.kafbat.ui.model.SeekTypeDTO;
import io.kafbat.ui.model.SerdeUsageDTO;
Expand All @@ -24,14 +27,10 @@
import io.kafbat.ui.service.DeserializationService;
import io.kafbat.ui.service.MessagesService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -74,6 +73,7 @@ public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilte
.map(ResponseEntity::ok);
}

@Deprecated
@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
String topicName,
Expand All @@ -86,6 +86,23 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
throw new ValidationException("Not supported");
}


@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
PollingModeDTO mode,
List<Integer> partitions,
Integer limit,
String stringFilter,
String smartFilterId,
Long offset,
Long timestamp,
String keySerde,
String valueSerde,
String cursor,
ServerWebExchange exchange) {
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topicActions(topicName, MESSAGES_READ)
Expand All @@ -95,27 +112,26 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
contextBuilder.auditActions(AuditAction.VIEW);
}

seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;

var positions = new ConsumerPosition(
seekType,
topicName,
parseSeekTo(topicName, seekType, seekTo)
);
Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
limit, seekDirection, keySerde, valueSerde)
)
);

var context = contextBuilder.build();
return validateAccess(context)
.then(job)
.doOnEach(sig -> audit(context, sig));
var accessContext = contextBuilder.build();

Flux<TopicMessageEventDTO> messagesFlux;
if (cursor != null) {
messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
} else {
messagesFlux = messagesService.loadMessages(
getCluster(clusterName),
topicName,
ConsumerPosition.create(checkNotNull(mode), checkNotNull(topicName), partitions, timestamp, offset),
stringFilter,
smartFilterId,
limit,
keySerde,
valueSerde
);
}
return accessControlService.validateAccess(accessContext)
.then(Mono.just(ResponseEntity.ok(messagesFlux)))
.doOnEach(sig -> auditService.audit(accessContext, sig));
}

@Override
Expand All @@ -136,34 +152,6 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(
).doOnEach(sig -> audit(context, sig));
}

/**
* The format is [partition]::[offset] for specifying offsets
* or [partition]::[timestamp in millis] for specifying timestamps.
*/
@Nullable
private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
if (seekTo == null || seekTo.isEmpty()) {
if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
return null;
}
throw new ValidationException("seekTo should be set if seekType is " + seekType);
}
return seekTo.stream()
.map(p -> {
String[] split = p.split("::");
if (split.length != 2) {
throw new IllegalArgumentException(
"Wrong seekTo argument format. See API docs for details");
}

return Pair.of(
new TopicPartition(topic, Integer.parseInt(split[0])),
Long.parseLong(split[1])
);
})
.collect(toMap(Pair::getKey, Pair::getValue));
}

@Override
public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
String topicName,
Expand All @@ -190,7 +178,19 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
);
}

@Override
public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
String topicName,
Mono<MessageFilterRegistrationDTO> registration,
ServerWebExchange exchange) {



final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topicActions(topicName, MESSAGES_READ)
.build());
return validateAccess.then(registration)
.map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
.map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
}
}
14 changes: 9 additions & 5 deletions api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kafbat.ui.emitter;

import io.kafbat.ui.model.TopicMessageEventDTO;
import jakarta.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
Expand All @@ -21,12 +22,14 @@ protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsum
return records;
}

protected boolean sendLimitReached() {
protected boolean isSendLimitReached() {
return messagesProcessing.limitReached();
}

protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
messagesProcessing.send(sink, records);
protected void send(FluxSink<TopicMessageEventDTO> sink,
Iterable<ConsumerRecord<Bytes, Bytes>> records,
@Nullable Cursor.Tracking cursor) {
messagesProcessing.send(sink, records, cursor);
}

protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
Expand All @@ -37,8 +40,9 @@ protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords
messagesProcessing.sentConsumingInfo(sink, records);
}

protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
messagesProcessing.sendFinishEvent(sink);
// cursor is null if target partitions were fully polled (no, need to do paging)
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
messagesProcessing.sendFinishEvents(sink, cursor);
sink.complete();
}
}
13 changes: 5 additions & 8 deletions api/src/main/java/io/kafbat/ui/emitter/BackwardEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
PollingSettings pollingSettings,
Cursor.Tracking cursor) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
false,
messagesPerPage
),
pollingSettings
new MessagesProcessing(deserializer, filter, false, messagesPerPage),
pollingSettings,
cursor
);
}

Expand Down
9 changes: 8 additions & 1 deletion api/src/main/java/io/kafbat/ui/emitter/ConsumingStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.kafbat.ui.model.TopicMessageConsumingDTO;
import io.kafbat.ui.model.TopicMessageEventDTO;
import io.kafbat.ui.model.TopicMessageNextPageCursorDTO;
import javax.annotation.Nullable;
import reactor.core.publisher.FluxSink;

class ConsumingStats {
Expand All @@ -26,10 +28,15 @@ void incFilterApplyError() {
filterApplyErrors++;
}

void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink) {
void sendFinishEvent(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
sink.next(
new TopicMessageEventDTO()
.type(TopicMessageEventDTO.TypeEnum.DONE)
.cursor(
cursor != null
? new TopicMessageNextPageCursorDTO().id(cursor.registerCursor())
: null
)
.consuming(createConsumingStats())
);
}
Expand Down
Loading

0 comments on commit 86a8c88

Please sign in to comment.