Skip to content

Commit

Permalink
BE: Expose Kafka Connect validation errors in the UI
Browse files Browse the repository at this point in the history
  • Loading branch information
yeikel committed Dec 12, 2024
1 parent 318bcc9 commit ab62e64
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kafbat.ui.client;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.connect.ApiClient;
import io.kafbat.ui.connect.api.KafkaConnectClientApi;
Expand All @@ -14,9 +15,11 @@
import io.kafbat.ui.exception.KafkaConnectConflictReponseException;
import io.kafbat.ui.exception.ValidationException;
import io.kafbat.ui.util.WebClientConfigurator;
import jakarta.validation.constraints.NotNull;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
Expand Down Expand Up @@ -58,10 +61,24 @@ private static <T> Flux<T> withRetryOnConflict(Flux<T> publisher) {

private static <T> Mono<T> withBadRequestErrorHandling(Mono<T> publisher) {
return publisher
.onErrorResume(WebClientResponseException.BadRequest.class, e ->
Mono.error(new ValidationException("Invalid configuration")))
.onErrorResume(WebClientResponseException.InternalServerError.class, e ->
Mono.error(new ValidationException("Invalid configuration")));
.onErrorResume(WebClientResponseException.BadRequest.class,
RetryingKafkaConnectClient::parseConnectErrorMessage)
.onErrorResume(WebClientResponseException.InternalServerError.class,
RetryingKafkaConnectClient::parseConnectErrorMessage);
}

// Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
// Adding the connect runtime dependency for this single class seems excessive
private record ErrorMessage(@NotNull @JsonProperty("message") String message) {
}

private static <T> @NotNull Mono<T> parseConnectErrorMessage(WebClientResponseException parseException) {
final var errorMessage = parseException.getResponseBodyAs(ErrorMessage.class);
return Mono.error(new ValidationException(
Objects.requireNonNull(errorMessage,
// see https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
"This should not happen according to the ConnectExceptionMapper")
.message()));
}

@Override
Expand Down
20 changes: 15 additions & 5 deletions api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.kafbat.ui.api.model.ErrorResponse;
import io.kafbat.ui.model.ConnectorDTO;
import io.kafbat.ui.model.ConnectorPluginConfigDTO;
import io.kafbat.ui.model.ConnectorPluginConfigValidationResponseDTO;
Expand Down Expand Up @@ -268,19 +269,28 @@ public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() {


@Test
@SuppressWarnings("checkstyle:LineLength")
public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() {
webTestClient.put()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
LOCAL, connectName, connectorName)
.bodyValue(Map.of(
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max", "invalid number",
"topics", "another-topic",
"file", "/tmp/test"
"connector.class", "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max", "invalid number",
"topics", "another-topic",
"file", "/tmp/test"
)
)
.exchange()
.expectStatus().isBadRequest();
.expectStatus().isBadRequest()
.expectBody(ErrorResponse.class)
.value(response -> assertThat(response.getMessage()).isEqualTo(
"""
Connector configuration is invalid and contains the following 2 error(s):
Invalid value invalid number for configuration tasks.max: Not a number of type INT
Invalid value null for configuration tasks.max: Value must be non-null
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"""
));

webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config",
Expand Down
3 changes: 3 additions & 0 deletions etc/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
<property name="eachLine" value="true"/>
</module>

<!-- Make the @SuppressWarnings annotations available to Checkstyle -->
<module name="SuppressWarningsHolder"/>

<module name="LineLength">
<property name="fileExtensions" value="java"/>
<property name="max" value="120"/>
Expand Down

0 comments on commit ab62e64

Please sign in to comment.