Skip to content

Commit

Permalink
Drop messages v1
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean committed Oct 24, 2024
1 parent 7be3325 commit a5df4b3
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,11 @@
import static io.kafbat.ui.model.rbac.permission.TopicAction.MESSAGES_READ;

import io.kafbat.ui.api.MessagesApi;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.model.ConsumerPosition;
import io.kafbat.ui.model.CreateTopicMessageDTO;
import io.kafbat.ui.model.MessageFilterIdDTO;
import io.kafbat.ui.model.MessageFilterRegistrationDTO;
import io.kafbat.ui.model.MessageFilterTypeDTO;
import io.kafbat.ui.model.PollingModeDTO;
import io.kafbat.ui.model.SeekDirectionDTO;
import io.kafbat.ui.model.SeekTypeDTO;
import io.kafbat.ui.model.SerdeUsageDTO;
import io.kafbat.ui.model.SmartFilterTestExecutionDTO;
import io.kafbat.ui.model.SmartFilterTestExecutionResultDTO;
Expand Down Expand Up @@ -73,25 +69,8 @@ public Mono<ResponseEntity<SmartFilterTestExecutionResultDTO>> executeSmartFilte
.map(ResponseEntity::ok);
}

@Deprecated
@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName,
String topicName,
SeekTypeDTO seekType,
List<String> seekTo,
Integer limit,
String q,
MessageFilterTypeDTO filterQueryType,
SeekDirectionDTO seekDirection,
String keySerde,
String valueSerde,
ServerWebExchange exchange) {
throw new ValidationException("Not supported");
}


@Override
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessagesV2(String clusterName, String topicName,
public Mono<ResponseEntity<Flux<TopicMessageEventDTO>>> getTopicMessages(String clusterName, String topicName,
PollingModeDTO mode,
List<Integer> partitions,
Integer limit,
Expand Down
4 changes: 2 additions & 2 deletions api/src/test/java/io/kafbat/ui/KafkaConsumerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void shouldDeleteRecords() {
}

long count = webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages?mode=EARLIEST", LOCAL, topicName)
.accept(TEXT_EVENT_STREAM)
.exchange()
.expectStatus()
Expand All @@ -77,7 +77,7 @@ public void shouldDeleteRecords() {
.isOk();

count = webTestClient.get()
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages/v2?mode=EARLIEST", LOCAL, topicName)
.uri("/api/clusters/{clusterName}/topics/{topicName}/messages?mode=EARLIEST", LOCAL, topicName)
.exchange()
.expectStatus()
.isOk()
Expand Down
142 changes: 27 additions & 115 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -662,33 +662,45 @@ paths:
required: true
schema:
type: string
- name: seekType
- name: mode
in: query
description: Messages polling mode
schema:
$ref: "#/components/schemas/SeekType"
- name: seekTo
$ref: "#/components/schemas/PollingMode"
- name: partitions
in: query
schema:
type: array
description: List of target partitions (all partitions if not provided)
items:
type: string
description: The format is [partition]::[offset] for specifying offsets or [partition]::[timestamp in millis] for specifying timestamps
type: integer
- name: limit
in: query
description: Max number of messages can be returned
schema:
type: integer
- name: q
- name: stringFilter
in: query
description: query string to contains string filtration
schema:
type: string
- name: filterQueryType
- name: smartFilterId
in: query
description: filter id, that was registered beforehand
schema:
$ref: "#/components/schemas/MessageFilterType"
- name: seekDirection
type: string
- name: offset
in: query
description: message offset to read from / to
schema:
$ref: "#/components/schemas/SeekDirection"
type: integer
format: int64
- name: timestamp
in: query
description: timestamp (in ms) to read from / to
schema:
type: integer
format: int64
- name: keySerde
in: query
description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
Expand All @@ -699,6 +711,11 @@ paths:
description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
schema:
type: string
- name: cursor
in: query
description: "id of the cursor for pagination, if passed - all other query params ignored"
schema:
type: string
responses:
200:
description: OK
Expand Down Expand Up @@ -793,89 +810,6 @@ paths:
schema:
$ref: '#/components/schemas/MessageFilterId'


/api/clusters/{clusterName}/topics/{topicName}/messages/v2:
get:
tags:
- Messages
summary: getTopicMessagesV2
operationId: getTopicMessagesV2
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: topicName
in: path
required: true
schema:
type: string
- name: mode
in: query
description: Messages polling mode
schema:
$ref: "#/components/schemas/PollingMode"
- name: partitions
in: query
schema:
type: array
description: List of target partitions (all partitions if not provided)
items:
type: integer
- name: limit
in: query
description: Max number of messages can be returned
schema:
type: integer
- name: stringFilter
in: query
description: query string to contains string filtration
schema:
type: string
- name: smartFilterId
in: query
description: filter id, that was registered beforehand
schema:
type: string
- name: offset
in: query
description: message offset to read from / to
schema:
type: integer
format: int64
- name: timestamp
in: query
description: timestamp (in ms) to read from / to
schema:
type: integer
format: int64
- name: keySerde
in: query
description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
schema:
type: string
- name: valueSerde
in: query
description: "Serde that should be used for deserialization. Will be chosen automatically if not set."
schema:
type: string
- name: cursor
in: query
description: "id of the cursor for pagination, if passed - all other query params ignored"
schema:
type: string
responses:
200:
description: OK
content:
text/event-stream:
schema:
type: array
items:
$ref: '#/components/schemas/TopicMessageEvent'


/api/clusters/{clusterName}/topics/{topicName}/activeproducers:
get:
tags:
Expand Down Expand Up @@ -3080,14 +3014,6 @@ components:
- offset
- timestamp

SeekType:
type: string
enum:
- BEGINNING
- OFFSET
- TIMESTAMP
- LATEST

MessageFilterRegistration:
type: object
properties:
Expand All @@ -3111,20 +3037,6 @@ components:
- EARLIEST
- TAILING

MessageFilterType:
type: string
enum:
- STRING_CONTAINS
- CEL_SCRIPT

SeekDirection:
type: string
enum:
- FORWARD
- BACKWARD
- TAILING
default: FORWARD

Partition:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/lib/hooks/api/topicMessages.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export const useTopicMessages = ({

const url = `${BASE_PARAMS.basePath}/api/clusters/${encodeURIComponent(
clusterName
)}/topics/${topicName}/messages/v2`;
)}/topics/${topicName}/messages`;

const requestParams = new URLSearchParams({
limit: searchParams.get(MessagesFilterKeys.limit) || MESSAGES_PER_PAGE,
Expand Down

0 comments on commit a5df4b3

Please sign in to comment.