diff --git a/.dev/dev_arm64.yaml b/.dev/dev_arm64.yaml index 220140d3d..432e72b39 100644 --- a/.dev/dev_arm64.yaml +++ b/.dev/dev_arm64.yaml @@ -1,5 +1,5 @@ # This is a compose file designed for arm64/Apple Silicon systems -# To adapt this to x86 please find and replace ".arm64" with empty +# To adapt this to x86 please find and replace "" with empty # ARM64 supported images for kafka can be found here # https://hub.docker.com/r/confluentinc/cp-kafka/tags?page=1&name=arm64 @@ -9,36 +9,37 @@ name: "kafbat-ui-dev" services: - kafbat-ui: - container_name: kafbat-ui - image: ghcr.io/kafbat/kafka-ui:latest - ports: - - 8080:8080 - depends_on: - - kafka0 - - schema-registry0 - - kafka-connect0 - - ksqldb0 - environment: - KAFKA_CLUSTERS_0_NAME: local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 - KAFKA_CLUSTERS_0_METRICS_PORT: 9997 - KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085 - KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first - KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 - KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb0:8088 - DYNAMIC_CONFIG_ENABLED: 'true' - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true' - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true' + #kafbat-ui: + # container_name: kafbat-ui + # image: ghcr.io/kafbat/kafka-ui:latest + # ports: + # - 8080:8080 + # depends_on: + # - kafka0 + # - schema-registry0 + # - kafka-connect0 + # - ksqldb0 + # environment: + # KAFKA_CLUSTERS_0_NAME: local + # KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 + # KAFKA_CLUSTERS_0_METRICS_PORT: 9997 + # KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry0:8085 + # KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first + # KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083 + # KAFKA_CLUSTERS_0_KSQLDBSERVER: http://ksqldb0:8088 + # DYNAMIC_CONFIG_ENABLED: 'true' + # KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true' + # KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true' kafka0: - image: confluentinc/cp-kafka:7.6.0.arm64 + image: confluentinc/cp-kafka:7.6.0 user: "0:0" hostname: kafka0 container_name: kafka0 ports: - 9092:9092 - 9997:9997 + - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT @@ -55,12 +56,12 @@ services: KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' KAFKA_JMX_PORT: 9997 -# KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar + KAFKA_JMX_HOSTNAME: localhost # uncomment this line and comment the next one if running with kafka-ui as a jar KAFKA_JMX_OPTS: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=kafka0 -Dcom.sun.management.jmxremote.rmi.port=9997 CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' schema-registry0: - image: confluentinc/cp-schema-registry:7.6.0.arm64 + image: confluentinc/cp-schema-registry:7.6.0 ports: - 8085:8085 depends_on: @@ -76,7 +77,7 @@ services: SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas kafka-connect0: - image: confluentinc/cp-kafka-connect:7.6.0.arm64 + image: confluentinc/cp-kafka-connect:7.6.0 ports: - 8083:8083 depends_on: @@ -101,7 +102,7 @@ services: CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/local/share/kafka/plugins,/usr/share/filestream-connectors" ksqldb0: - image: confluentinc/cp-ksqldb-server:7.6.0.arm64 + image: confluentinc/cp-ksqldb-server:7.6.0 depends_on: - kafka0 - kafka-connect0 @@ -119,7 +120,7 @@ services: KSQL_CACHE_MAX_BYTES_BUFFERING: 0 kafka-init-topics: - image: confluentinc/cp-kafka:7.6.0.arm64 + image: confluentinc/cp-kafka:7.6.0 volumes: - ../documentation/compose/data/message.json:/data/message.json depends_on: diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java index f0b291af6..6dfd7387d 100644 --- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java +++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java @@ -1,5 +1,7 @@ package io.kafbat.ui.client; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.connect.ApiClient; import io.kafbat.ui.connect.api.KafkaConnectClientApi; @@ -17,6 +19,7 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.Objects; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.springframework.http.ResponseEntity; @@ -56,10 +59,21 @@ private static Flux withRetryOnConflict(Flux publisher) { return publisher.retryWhen(conflictCodeRetry()); } + // Adapted from https://github.com/apache/kafka/blob/a0a501952b6d61f6f273bdb8f842346b51e9dfce/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java#L35 + private record ErrorMessage(@JsonProperty("message") String message) { + } + private static Mono withBadRequestErrorHandling(Mono publisher) { return publisher - .onErrorResume(WebClientResponseException.BadRequest.class, e -> - Mono.error(new ValidationException("Invalid configuration"))) + .onErrorResume(WebClientResponseException.BadRequest.class, e -> { + final var errorMessage = e.getResponseBodyAs(ErrorMessage.class); + + if (errorMessage != null && errorMessage.message() != null) { + return Mono.error(new ValidationException(errorMessage.message())); + } + + return Mono.error(new ValidationException("Invalid configuration")); + }) .onErrorResume(WebClientResponseException.InternalServerError.class, e -> Mono.error(new ValidationException("Invalid configuration"))); }