Skip to content

Commit

Permalink
Merge branch 'main' into feature/delete_cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean authored Mar 16, 2024
2 parents 963a3d3 + 407d678 commit ed969dd
Show file tree
Hide file tree
Showing 125 changed files with 3,973 additions and 3,449 deletions.
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@
- [ ] New and existing unit tests pass locally with my changes
- [ ] Any dependent changes have been merged

Check out [Contributing](https://github.com/kafbat/kafka-ui/blob/main/CONTRIBUTING.md) and [Code of Conduct](https://github.com/kafbat/kafka-ui/blob/main/CODE-OF-CONDUCT.md)
Check out [Contributing](https://github.com/kafbat/kafka-ui/blob/main/.github/CONTRIBUTING.md) and [Code of Conduct](https://github.com/kafbat/kafka-ui/blob/main/.github/CODE-OF-CONDUCT.md)

**A picture of a cute animal (not mandatory but encouraged)**
12 changes: 10 additions & 2 deletions .github/workflows/frontend_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,45 @@ jobs:
NODE_ENV: dev
runs-on: ubuntu-latest
steps:

- uses: actions/checkout@v4
with:
# Disabling shallow clone is recommended for improving relevancy of reporting
fetch-depth: 0
ref: ${{ github.event.pull_request.head.sha }}
token: ${{ github.token }}

- uses: pnpm/[email protected]
with:
version: 8.6.12

- name: Install node
uses: actions/[email protected]
with:
node-version: "18.17.1"
cache: "pnpm"
cache-dependency-path: "./frontend/pnpm-lock.yaml"

- name: Install Node dependencies
run: |
cd frontend/
pnpm install --frozen-lockfile
- name: Generate sources
- name: Compile
run: |
cd frontend/
pnpm gen:sources
pnpm compile
- name: Linter
run: |
cd frontend/
pnpm lint:CI
- name: Tests
run: |
cd frontend/
pnpm test:CI
- name: SonarCloud Scan
if: false # TODO remove when public
uses: sonarsource/sonarcloud-github-action@master
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/release-serde-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ jobs:
- name: Publish to Maven Central
run: |
mvn source:jar javadoc:jar package gpg:sign -Dgpg.passphrase=${{ secrets.GPG_PASSPHRASE }} -Dserver.username=${{ secrets.NEXUS_USERNAME }} -Dserver.password=${{ secrets.NEXUS_PASSWORD }} nexus-staging:deploy -pl serde-api -s settings.xml
mvn source:jar javadoc:jar package gpg:sign \
-Dgpg.passphrase=${{ secrets.GPG_PASSPHRASE }} \
-Dserver.username=${{ secrets.NEXUS_USERNAME }} \
-Dserver.password=${{ secrets.NEXUS_PASSWORD }} \
central-publishing:publish -pl serde-api -s settings.xml
51 changes: 35 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,32 +1,46 @@
![logo](documentation/images/logo.png) Kafbat UI 
------------------
#### Versatile, fast and lightweight web UI for managing Apache Kafka® clusters. Built by developers, for developers.
<br/>
<div align="center">
<img src="documentation/images/logo_new.png" alt="logo"/>
<h3>Kafbat UI</h3>

[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/kafbat/kafka-ui/blob/main/LICENSE)
![Price free](documentation/images/free-open-source.svg)
[![Release version](https://img.shields.io/github/v/release/kafbat/kafka-ui)](https://github.com/kafbat/kafka-ui/releases)
[![Chat with us](https://img.shields.io/discord/897805035122077716)](https://discord.gg/4DWzD7pGE5)
[![Docker pulls](https://img.shields.io/docker/pulls/kafbat/kafka-ui)](https://hub.docker.com/r/kafbat/kafka-ui)
Versatile, fast and lightweight web UI for managing Apache Kafka® clusters.
</div>

<div align="center">
<a href="https://github.com/kafbat/kafka-ui/blob/main/LICENSE"><img src="https://img.shields.io/badge/License-Apache%202.0-blue.svg" alt="License"/></a>
<img src="documentation/images/free-open-source.svg" alt="price free"/>
<a href="https://github.com/kafbat/kafka-ui/releases"><img src="https://img.shields.io/github/v/release/kafbat/kafka-ui" alt="latest release version"/></a>
<a href="https://discord.gg/4DWzD7pGE5"><img src="https://img.shields.io/discord/897805035122077716" alt="discord online number count"/></a>
<a href="https://github.com/sponsors/kafbat"><img src="https://img.shields.io/github/sponsors/kafbat?style=flat&logo=githubsponsors&logoColor=%23EA4AAA&label=Support%20us" alt="" /></a>
</div>

<p align="center">
<a href="https://ui.docs.kafbat.io/">DOCS</a> •
<a href="https://ui.docs.kafbat.io/configuration/quick-start">QUICK START</a> •
<a href="https://discord.gg/4DWzD7pGE5">COMMUNITY DISCORD</a>
<a href="https://ui.docs.kafbat.io/">Documentation</a> •
<a href="https://ui.docs.kafbat.io/configuration/quick-start">Quick Start</a> •
<a href="https://discord.gg/4DWzD7pGE5">Community</a>
<br/>
<a href="https://aws.amazon.com/marketplace/pp/{replaceMe}">AWS Marketplace</a> •
<a href="https://www.producthunt.com/products/ui-for-apache-kafka/reviews/new">ProductHunt</a>
</p>

<p align="center">
<img src="https://repobeats.axiom.co/api/embed/2e8a7c2d711af9daddd34f9791143e7554c35d0f.svg" />
<img src="https://repobeats.axiom.co/api/embed/88d2bd9887380c7d86e2f986725d9af52ebad7f4.svg" alt="stats"/>
</p>

#### Kafbat UI is a free, open-source web UI to monitor and manage Apache Kafka clusters.

Kafbat UI is a simple tool that makes your data flows observable, helps find and troubleshoot issues faster and deliver optimal performance. Its lightweight dashboard makes it easy to track key metrics of your Kafka clusters - Brokers, Topics, Partitions, Production, and Consumption.

![Interface](documentation/images/Interface.gif)
<i>
Kafbat UI, developed by <b>Kafbat</b>*, proudly carries forward the legacy of the UI Apache Kafka project.
Our dedication is reflected in the continuous evolution of the project, ensuring adherence to its foundational vision while adapting to meet modern demands.
We extend our gratitude to Provectus for their past support in groundbreaking work, which serves as a cornerstone for our ongoing innovation and dedication.

<b>*</b> - The <b>Kafbat</b> team comprises key contributors from the project's inception, bringing a wealth of experience and insight to this renewed endeavor.
</i>

# Interface

![Interface](https://raw.githubusercontent.com/kafbat/kafka-ui/images/overview.gif)

# Features
* **Multi-Cluster Management** — monitor and manage all your clusters in one place
Expand Down Expand Up @@ -108,7 +122,7 @@ services:
- ~/kui/config.yml:/etc/kafkaui/dynamic_config.yaml
```

Please refer to our [configuration](https://ui.docs.kafbat.io/configuration/quick-start) page to proceed with further app configuration.
Please refer to our [configuration](https://ui.docs.kafbat.io/configuration/configuration-file) page to proceed with further app configuration.

## Some useful configuration related links

Expand All @@ -134,8 +148,13 @@ Info endpoint (build info) is located at `/actuator/info`.

# Configuration options

All of the environment variables/config properties could be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).
All the environment variables/config properties could be found [here](https://ui.docs.kafbat.io/configuration/misc-configuration-properties).

# Contributing

Please refer to [contributing guide](https://ui.docs.kafbat.io/development/contributing), we'll guide you from there.

# Support

As we're fully independent, team members contribute in their free time.
Your support is crucial for us, if you wish to sponsor us, take a look [here](https://github.com/sponsors/kafbat)
110 changes: 55 additions & 55 deletions api/src/main/java/io/kafbat/ui/controller/MessagesController.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package io.kafbat.ui.controller;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_DELETE;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_PRODUCE;
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_READ;
import static java.util.stream.Collectors.toMap;

import io.kafbat.ui.api.MessagesApi;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.CreateTopicMessageDTO;
import io.kafbat.ui.model.MessageFilterIdDTO;
import io.kafbat.ui.model.MessageFilterRegistrationDTO;
import io.kafbat.ui.model.MessageFilterTypeDTO;
import io.kafbat.ui.model.PollingModeDTO;
import io.kafbat.ui.model.SeekDirectionDTO;
import io.kafbat.ui.model.SeekTypeDTO;
import io.kafbat.ui.model.SerdeUsageDTO;
Expand All @@ -24,14 +27,10 @@
import io.kafbat.ui.service.DeserializationService;
import io.kafbat.ui.service.MessagesService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -74,6 +73,7 @@ public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilte
.map(ResponseEntity::ok);
}

@Deprecated
@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
String topicName,
Expand All @@ -86,6 +86,23 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
throw new ValidationException("Not supported");
}


@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
PollingModeDTO mode,
List<Integer> partitions,
Integer limit,
String stringFilter,
String smartFilterId,
Long offset,
Long timestamp,
String keySerde,
String valueSerde,
String cursor,
ServerWebExchange exchange) {
var contextBuilder = AccessContext.builder()
.cluster(clusterName)
.topicActions(topicName, MESSAGES_READ)
Expand All @@ -95,27 +112,26 @@ public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String
contextBuilder.auditActions(AuditAction.VIEW);
}

seekType = seekType != null ? seekType : SeekTypeDTO.BEGINNING;
seekDirection = seekDirection != null ? seekDirection : SeekDirectionDTO.FORWARD;
filterQueryType = filterQueryType != null ? filterQueryType : MessageFilterTypeDTO.STRING_CONTAINS;

var positions = new ConsumerPosition(
seekType,
topicName,
parseSeekTo(topicName, seekType, seekTo)
);
Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> job = Mono.just(
ResponseEntity.ok(
messagesService.loadMessages(
getCluster(clusterName), topicName, positions, q, filterQueryType,
limit, seekDirection, keySerde, valueSerde)
)
);

var context = contextBuilder.build();
return validateAccess(context)
.then(job)
.doOnEach(sig -> audit(context, sig));
var accessContext = contextBuilder.build();

Flux<TopicMessageEventDTO> messagesFlux;
if (cursor != null) {
messagesFlux = messagesService.loadMessages(getCluster(clusterName), topicName, cursor);
} else {
messagesFlux = messagesService.loadMessages(
getCluster(clusterName),
topicName,
ConsumerPosition.create(checkNotNull(mode), checkNotNull(topicName), partitions, timestamp, offset),
stringFilter,
smartFilterId,
limit,
keySerde,
valueSerde
);
}
return accessControlService.validateAccess(accessContext)
.then(Mono.just(ResponseEntity.ok(messagesFlux)))
.doOnEach(sig -> auditService.audit(accessContext, sig));
}

@Override
Expand All @@ -136,34 +152,6 @@ public Mono<ResponseEntity<Void>> sendTopicMessages(
).doOnEach(sig -> audit(context, sig));
}

/**
* The format is [partition]::[offset] for specifying offsets
* or [partition]::[timestamp in millis] for specifying timestamps.
*/
@Nullable
private Map<TopicPartition, Long> parseSeekTo(String topic, SeekTypeDTO seekType, List<String> seekTo) {
if (seekTo == null || seekTo.isEmpty()) {
if (seekType == SeekTypeDTO.LATEST || seekType == SeekTypeDTO.BEGINNING) {
return null;
}
throw new ValidationException("seekTo should be set if seekType is " + seekType);
}
return seekTo.stream()
.map(p -> {
String[] split = p.split("::");
if (split.length != 2) {
throw new IllegalArgumentException(
"Wrong seekTo argument format. See API docs for details");
}

return Pair.of(
new TopicPartition(topic, Integer.parseInt(split[0])),
Long.parseLong(split[1])
);
})
.collect(toMap(Pair::getKey, Pair::getValue));
}

@Override
public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterName,
String topicName,
Expand All @@ -190,7 +178,19 @@ public Mono<ResponseEntity<TopicSerdeSuggestionDTO>> getSerdes(String clusterNam
);
}

@Override
public Mono<ResponseEntity<MessageFilterIdDTO>> registerFilter(String clusterName,
String topicName,
Mono<MessageFilterRegistrationDTO> registration,
ServerWebExchange exchange) {



final Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
.cluster(clusterName)
.topicActions(topicName, MESSAGES_READ)
.build());
return validateAccess.then(registration)
.map(reg -> messagesService.registerMessageFilter(reg.getFilterCode()))
.map(id -> ResponseEntity.ok(new MessageFilterIdDTO().id(id)));
}
}
14 changes: 9 additions & 5 deletions api/src/main/java/io/kafbat/ui/emitter/AbstractEmitter.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kafbat.ui.emitter;

import io.kafbat.ui.model.TopicMessageEventDTO;
import jakarta.annotation.Nullable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import reactor.core.publisher.FluxSink;
Expand All @@ -21,12 +22,14 @@ protected PolledRecords poll(FluxSink<TopicMessageEventDTO> sink, EnhancedConsum
return records;
}

protected boolean sendLimitReached() {
protected boolean isSendLimitReached() {
return messagesProcessing.limitReached();
}

protected void send(FluxSink<TopicMessageEventDTO> sink, Iterable<ConsumerRecord<Bytes, Bytes>> records) {
messagesProcessing.send(sink, records);
protected void send(FluxSink<TopicMessageEventDTO> sink,
Iterable<ConsumerRecord<Bytes, Bytes>> records,
@Nullable Cursor.Tracking cursor) {
messagesProcessing.send(sink, records, cursor);
}

protected void sendPhase(FluxSink<TopicMessageEventDTO> sink, String name) {
Expand All @@ -37,8 +40,9 @@ protected void sendConsuming(FluxSink<TopicMessageEventDTO> sink, PolledRecords
messagesProcessing.sentConsumingInfo(sink, records);
}

protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink) {
messagesProcessing.sendFinishEvent(sink);
// cursor is null if target partitions were fully polled (no, need to do paging)
protected void sendFinishStatsAndCompleteSink(FluxSink<TopicMessageEventDTO> sink, @Nullable Cursor.Tracking cursor) {
messagesProcessing.sendFinishEvents(sink, cursor);
sink.complete();
}
}
13 changes: 5 additions & 8 deletions api/src/main/java/io/kafbat/ui/emitter/BackwardEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,15 @@ public BackwardEmitter(Supplier<EnhancedConsumer> consumerSupplier,
int messagesPerPage,
ConsumerRecordDeserializer deserializer,
Predicate<TopicMessageDTO> filter,
PollingSettings pollingSettings) {
PollingSettings pollingSettings,
Cursor.Tracking cursor) {
super(
consumerSupplier,
consumerPosition,
messagesPerPage,
new MessagesProcessing(
deserializer,
filter,
false,
messagesPerPage
),
pollingSettings
new MessagesProcessing(deserializer, filter, false, messagesPerPage),
pollingSettings,
cursor
);
}

Expand Down
Loading

0 comments on commit ed969dd

Please sign in to comment.