diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index f0c20cfc0..e87989cf1 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -61,7 +61,7 @@ public class KafkaSinkBuilder { private static final Logger LOG = LoggerFactory.getLogger(KafkaSinkBuilder.class); - private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofHours(1); + private static final Duration DEFAULT_KAFKA_TRANSACTION_TIMEOUT = Duration.ofMinutes(15); private static final String[] warnKeys = new String[] { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java index eeecc84df..28cf248f2 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilderTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Test; +import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.function.Consumer; @@ -87,6 +88,27 @@ public void testBootstrapServerSetting() { p -> assertThat(p).containsKeys(DEFAULT_KEYS)); } + @Test + public void testTransactionTimeoutSetting() { + validateProducerConfig( + getBasicBuilder(), + p -> { + assertThat(p.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) + .isEqualTo((int) Duration.ofMinutes(15).toMillis()); + }); + + Properties testConf = new Properties(); + testConf.put( + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) Duration.ofHours(1).toMillis()); + + validateProducerConfig( + getBasicBuilder().setKafkaProducerConfig(testConf), + p -> { + assertThat(p.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) + .isEqualTo((int) Duration.ofHours(1).toMillis()); + }); + } + private void validateProducerConfig( KafkaSinkBuilder builder, Consumer validator) { validator.accept(builder.build().getKafkaProducerConfig());