From a2df91a71007b9c758a57219d79ba8238d2c6f40 Mon Sep 17 00:00:00 2001 From: yeikel Date: Wed, 11 Dec 2024 22:27:57 -0500 Subject: [PATCH] BE: Expose Kafka Connect validation errors in the UI --- .../ui/client/RetryingKafkaConnectClient.java | 25 ++++++++++++++++--- .../kafbat/ui/KafkaConnectServiceTests.java | 20 +++++++++++---- etc/checkstyle/checkstyle.xml | 2 ++ 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java index f0b291af6..72ab7386a 100644 --- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java +++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java @@ -1,5 +1,6 @@ package io.kafbat.ui.client; +import com.fasterxml.jackson.annotation.JsonProperty; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.ApiClient; import io.kafbat.ui.connect.api.KafkaConnectClientApi; @@ -14,9 +15,11 @@ import io.kafbat.ui.exception.KafkaConnectConflictReponseException; import io.kafbat.ui.exception.ValidationException; import io.kafbat.ui.util.WebClientConfigurator; +import jakarta.validation.constraints.NotNull; import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Objects; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; @@ -58,10 +61,24 @@ private static Flux withRetryOnConflict(Flux publisher) { private static Mono withBadRequestErrorHandling(Mono publisher) { return publisher - .onErrorResume(WebClientResponseException.BadRequest.class, e -> - Mono.error(new ValidationException("Invalid configuration"))) - .onErrorResume(WebClientResponseException.InternalServerError.class, e -> - Mono.error(new ValidationException("Invalid configuration"))); + .onErrorResume(WebClientResponseException.BadRequest.class, + RetryingKafkaConnectClient::parseConnectErrorMessage) + .onErrorResume(WebClientResponseException.InternalServerError.class, + RetryingKafkaConnectClient::parseConnectErrorMessage); + } + + // Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java + // Adding the connect runtime dependency for this single class seems excessive + private record ErrorMessage(@NotNull @JsonProperty("message") String message) { + } + + private static @NotNull Mono parseConnectErrorMessage(WebClientResponseException parseException) { + final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class); + return Mono.error(new ValidationException( + Objects.requireNonNull(errorMessage, + // see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java + "This should not happen according to the ConnectExceptionMapper") + .message())); } @Override diff --git a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java index 3588298f4..98d4804a9 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java @@ -4,6 +4,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; +import io.kafbat.ui.api.model.ErrorResponse; import io.kafbat.ui.model.ConnectorDTO; import io.kafbat.ui.model.ConnectorPluginConfigDTO; import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO; @@ -268,19 +269,28 @@ public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() { @Test + @SuppressWarnings("checkstyle:linelength") public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() { webTestClient.put() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName) .bodyValue(Map.of( - "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", - "tasks.max", "invalid number", - "topics", "another-topic", - "file", "/tmp/test" + "connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector", + "tasks.max", "invalid number", + "topics", "another-topic", + "file", "/tmp/test" ) ) .exchange() - .expectStatus().isBadRequest(); + .expectStatus().isBadRequest() + .expectBody(ErrorResponse.class) + .value(response -> assertThat(response.getMessage()).isEqualTo( + """ + Connector configuration is invalid and contains the following 2 error(s): + Invalid value invalid number for configuration tasks.max: Not a number of type INT + Invalid value null for configuration tasks.max: Value must be non-null + You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`""" + )); webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", diff --git a/etc/checkstyle/checkstyle.xml b/etc/checkstyle/checkstyle.xml index 745f1bc36..ee9ad03dc 100644 --- a/etc/checkstyle/checkstyle.xml +++ b/etc/checkstyle/checkstyle.xml @@ -47,6 +47,8 @@ + +