diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index e91a5bc9a..5931602b2 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -35,22 +35,31 @@ public class ClustersProperties { public static class Cluster { String name; String bootstrapServers; + + TruststoreConfig ssl; + String schemaRegistry; SchemaRegistryAuth schemaRegistryAuth; KeystoreConfig schemaRegistrySsl; + String ksqldbServer; KsqldbServerAuth ksqldbServerAuth; KeystoreConfig ksqldbServerSsl; + List kafkaConnect; - MetricsConfigData metrics; - Map properties; - boolean readOnly = false; + List serde; String defaultKeySerde; String defaultValueSerde; - List masking; + + MetricsConfigData metrics; + Map properties; + boolean readOnly = false; + Long pollingThrottleRate; - TruststoreConfig ssl; + + List masking; + AuditProperties audit; } @@ -99,6 +108,16 @@ public static class SchemaRegistryAuth { public static class TruststoreConfig { String truststoreLocation; String truststorePassword; + boolean verifySsl = true; + } + + @Data + @NoArgsConstructor + @AllArgsConstructor + @ToString(exclude = {"keystorePassword"}) + public static class KeystoreConfig { + String keystoreLocation; + String keystorePassword; } @Data @@ -118,15 +137,6 @@ public static class KsqldbServerAuth { String password; } - @Data - @NoArgsConstructor - @AllArgsConstructor - @ToString(exclude = {"keystorePassword"}) - public static class KeystoreConfig { - String keystoreLocation; - String keystorePassword; - } - @Data public static class Masking { Type type; @@ -182,6 +192,7 @@ private void flattenClusterProperties() { } } + @SuppressWarnings("unchecked") private Map flattenClusterProperties(@Nullable String prefix, @Nullable Map propertiesMap) { Map flattened = new HashMap<>(); diff --git a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java index bc175b980..e3613c94e 100644 --- a/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java +++ b/api/src/main/java/io/kafbat/ui/service/AdminClientServiceImpl.java @@ -2,7 +2,7 @@ import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.model.KafkaCluster; -import io.kafbat.ui.util.SslPropertiesUtil; +import io.kafbat.ui.util.KafkaClientSslPropertiesUtil; import java.io.Closeable; import java.time.Instant; import java.util.Map; @@ -42,7 +42,7 @@ public Mono get(KafkaCluster cluster) { private Mono createAdminClient(KafkaCluster cluster) { return Mono.fromSupplier(() -> { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.putIfAbsent(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, clientTimeout); diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 452de4a59..d779dcfef 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -10,7 +10,7 @@ import io.kafbat.ui.model.SortOrderDTO; import io.kafbat.ui.service.rbac.AccessControlService; import io.kafbat.ui.util.ApplicationMetrics; -import io.kafbat.ui.util.SslPropertiesUtil; +import io.kafbat.ui.util.KafkaClientSslPropertiesUtil; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -254,7 +254,7 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster) { public EnhancedConsumer createConsumer(KafkaCluster cluster, Map properties) { Properties props = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), props); props.putAll(cluster.getProperties()); props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafbat-ui-consumer-" + System.currentTimeMillis()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); diff --git a/api/src/main/java/io/kafbat/ui/service/MessagesService.java b/api/src/main/java/io/kafbat/ui/service/MessagesService.java index 2f6192e11..c94472d56 100644 --- a/api/src/main/java/io/kafbat/ui/service/MessagesService.java +++ b/api/src/main/java/io/kafbat/ui/service/MessagesService.java @@ -23,7 +23,7 @@ import io.kafbat.ui.model.TopicMessageEventDTO; import io.kafbat.ui.serdes.ConsumerRecordDeserializer; import io.kafbat.ui.serdes.ProducerRecordCreator; -import io.kafbat.ui.util.SslPropertiesUtil; +import io.kafbat.ui.util.KafkaClientSslPropertiesUtil; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; @@ -199,7 +199,7 @@ private Mono sendMessageImpl(KafkaCluster cluster, public static KafkaProducer createProducer(KafkaCluster cluster, Map additionalProps) { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(cluster.getOriginalProperties().getSsl(), properties); properties.putAll(cluster.getProperties()); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers()); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); diff --git a/api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java b/api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java index 3a0b46c81..90192eb2d 100644 --- a/api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java @@ -130,8 +130,8 @@ private Flux executeSelect(String ksql, Map s * Some version of ksqldb (?..0.24) can cut off json streaming without respect proper array ending like

* [{"header":{"queryId":"...","schema":"..."}}, ] * which will cause json parsing error and will be propagated to UI. - * This is a know issue(https://github.com/confluentinc/ksql/issues/8746), but we don't know when it will be fixed. - * To workaround this we need to check DecodingException err msg. + * This is a known issue(...), but we don't know when it will be fixed. + * To work around this we need to check DecodingException err msg. */ private boolean isUnexpectedJsonArrayEndCharException(Throwable th) { return th instanceof DecodingException diff --git a/api/src/main/java/io/kafbat/ui/util/KafkaClientSslPropertiesUtil.java b/api/src/main/java/io/kafbat/ui/util/KafkaClientSslPropertiesUtil.java new file mode 100644 index 000000000..324e2e4d0 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/KafkaClientSslPropertiesUtil.java @@ -0,0 +1,35 @@ +package io.kafbat.ui.util; + +import io.kafbat.ui.config.ClustersProperties; +import java.util.Properties; +import javax.annotation.Nullable; +import org.apache.kafka.common.config.SslConfigs; + +public final class KafkaClientSslPropertiesUtil { + + private KafkaClientSslPropertiesUtil() { + } + + public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, + Properties sink) { + if (truststoreConfig == null) { + return; + } + + if (!truststoreConfig.isVerifySsl()) { + sink.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); + } + + if (truststoreConfig.getTruststoreLocation() == null) { + return; + } + + sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation()); + + if (truststoreConfig.getTruststorePassword() != null) { + sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword()); + } + + } + +} diff --git a/api/src/main/java/io/kafbat/ui/util/KafkaServicesValidation.java b/api/src/main/java/io/kafbat/ui/util/KafkaServicesValidation.java index 629d0f339..397fa3839 100644 --- a/api/src/main/java/io/kafbat/ui/util/KafkaServicesValidation.java +++ b/api/src/main/java/io/kafbat/ui/util/KafkaServicesValidation.java @@ -65,7 +65,7 @@ public static Mono validateClusterConnection(S @Nullable TruststoreConfig ssl) { Properties properties = new Properties(); - SslPropertiesUtil.addKafkaSslProperties(ssl, properties); + KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties); properties.putAll(clusterProps); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // editing properties to make validation faster diff --git a/api/src/main/java/io/kafbat/ui/util/SslPropertiesUtil.java b/api/src/main/java/io/kafbat/ui/util/SslPropertiesUtil.java deleted file mode 100644 index fda959a2b..000000000 --- a/api/src/main/java/io/kafbat/ui/util/SslPropertiesUtil.java +++ /dev/null @@ -1,23 +0,0 @@ -package io.kafbat.ui.util; - -import io.kafbat.ui.config.ClustersProperties; -import java.util.Properties; -import javax.annotation.Nullable; -import org.apache.kafka.common.config.SslConfigs; - -public final class SslPropertiesUtil { - - private SslPropertiesUtil() { - } - - public static void addKafkaSslProperties(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, - Properties sink) { - if (truststoreConfig != null && truststoreConfig.getTruststoreLocation() != null) { - sink.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.getTruststoreLocation()); - if (truststoreConfig.getTruststorePassword() != null) { - sink.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.getTruststorePassword()); - } - } - } - -} diff --git a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java index 5d364f6dc..1c289f54f 100644 --- a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java +++ b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java @@ -7,6 +7,7 @@ import io.kafbat.ui.exception.ValidationException; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.FileInputStream; import java.security.KeyStore; import java.util.function.Consumer; @@ -45,6 +46,10 @@ private static ObjectMapper defaultOM() { public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, @Nullable ClustersProperties.KeystoreConfig keystoreConfig) { + if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) { + return configureNoSsl(); + } + return configureSsl( keystoreConfig != null ? keystoreConfig.getKeystoreLocation() : null, keystoreConfig != null ? keystoreConfig.getKeystorePassword() : null, @@ -97,6 +102,17 @@ private WebClientConfigurator configureSsl( return this; } + @SneakyThrows + public WebClientConfigurator configureNoSsl() { + var contextBuilder = SslContextBuilder.forClient(); + contextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); + + SslContext context = contextBuilder.build(); + + httpClient = httpClient.secure(t -> t.sslContext(context)); + return this; + } + public WebClientConfigurator configureBasicAuth(@Nullable String username, @Nullable String password) { if (username != null && password != null) { builder.defaultHeaders(httpHeaders -> httpHeaders.setBasicAuth(username, password));