Skip to content

Commit

Permalink
[FLINK-33017] Remove dependency on shaded guava
Browse files Browse the repository at this point in the history
The bump in shaded guava in Flink 1.18 changed import paths and caused
the class loader fail when loading ManagedMemoryUtils.

Looking at the root cause of the issue, shading was used as a technique
to avoid dependency hell. As flink-connector-kafka should work with both
flink 1.17 and 1.18 that use different guava versions (and hence library
import paths), shading did not really solve the problem it was introduced
for in the first place.

There are several several options to work around the problem. First,
we could introduce our own shading for guava. Second, we could see if
the dependency on guava is necessary at all and maybe remove it
completely.

This patch takes the latter route and removes dependency on guava from
this connector.
  • Loading branch information
Gerrrr authored and tzulitai committed Sep 12, 2023
1 parent 59ac738 commit a81cbeb
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.flink.test.util.JobSubmission;
import org.apache.flink.util.TestLoggerExtension;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
Expand All @@ -56,6 +54,7 @@

import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -137,7 +136,7 @@ public void testKafka() throws Exception {
// create the required topics
final short replicationFactor = 1;
admin.createTopics(
Lists.newArrayList(
Arrays.asList(
new NewTopic(inputTopic, 1, replicationFactor),
new NewTopic(outputTopic, 1, replicationFactor)))
.all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricMutableWrapper;
import org.apache.flink.util.FlinkRuntimeException;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.shaded.guava30.com.google.common.io.Closer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -51,6 +47,7 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
Expand Down Expand Up @@ -106,9 +103,10 @@ class KafkaWriter<IN>
// producer pool only used for exactly once
private final Deque<FlinkKafkaInternalProducer<byte[], byte[]>> producerPool =
new ArrayDeque<>();
private final Closer closer = Closer.create();
private long lastCheckpointId;

private final Deque<AutoCloseable> producerCloseables = new ArrayDeque<>();

private boolean closed = false;
private long lastSync = System.currentTimeMillis();

Expand Down Expand Up @@ -180,7 +178,7 @@ class KafkaWriter<IN>
} else if (deliveryGuarantee == DeliveryGuarantee.AT_LEAST_ONCE
|| deliveryGuarantee == DeliveryGuarantee.NONE) {
this.currentProducer = new FlinkKafkaInternalProducer<>(this.kafkaProducerConfig, null);
closer.register(this.currentProducer);
producerCloseables.add(this.currentProducer);
initKafkaMetrics(this.currentProducer);
} else {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -239,21 +237,18 @@ public List<KafkaWriterState> snapshotState(long checkpointId) throws IOExceptio
currentProducer = getTransactionalProducer(checkpointId + 1);
currentProducer.beginTransaction();
}
return ImmutableList.of(kafkaWriterState);
return Collections.singletonList(kafkaWriterState);
}

@Override
public void close() throws Exception {
closed = true;
LOG.debug("Closing writer with {}", currentProducer);
closeAll(
this::abortCurrentProducer,
closer,
producerPool::clear,
() -> {
checkState(currentProducer.isClosed());
currentProducer = null;
});
closeAll(this::abortCurrentProducer, producerPool::clear);
closeAll(producerCloseables);
checkState(
currentProducer.isClosed(), "Could not close current producer " + currentProducer);
currentProducer = null;

// Rethrow exception for the case in which close is called before writer() and flush().
checkAsyncException();
Expand Down Expand Up @@ -282,7 +277,8 @@ FlinkKafkaInternalProducer<byte[], byte[]> getCurrentProducer() {

void abortLingeringTransactions(
Collection<KafkaWriterState> recoveredStates, long startCheckpointId) {
List<String> prefixesToAbort = Lists.newArrayList(transactionalIdPrefix);
List<String> prefixesToAbort = new ArrayList<>();
prefixesToAbort.add(transactionalIdPrefix);

final Optional<KafkaWriterState> lastStateOpt = recoveredStates.stream().findFirst();
if (lastStateOpt.isPresent()) {
Expand Down Expand Up @@ -340,7 +336,7 @@ private FlinkKafkaInternalProducer<byte[], byte[]> getOrCreateTransactionalProdu
FlinkKafkaInternalProducer<byte[], byte[]> producer = producerPool.poll();
if (producer == null) {
producer = new FlinkKafkaInternalProducer<>(kafkaProducerConfig, transactionalId);
closer.register(producer);
producerCloseables.add(producer);
producer.initTransactions();
initKafkaMetrics(producer);
} else {
Expand Down Expand Up @@ -455,6 +451,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
asyncProducerException = decorateException(metadata, exception, producer);
}

// Checking for exceptions from previous writes
mailboxExecutor.submit(
() -> {
// Checking for exceptions from previous writes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TemporaryClassLoaderContext;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
Expand Down Expand Up @@ -1200,8 +1198,10 @@ public void initializeState(FunctionInitializationContext context) throws Except
if (semantic != FlinkKafkaProducer.Semantic.EXACTLY_ONCE) {
nextTransactionalIdHint = null;
} else {
ArrayList<FlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints =
Lists.newArrayList(nextTransactionalIdHintState.get());
List<FlinkKafkaProducer.NextTransactionalIdHint> transactionalIdHints =
new ArrayList<>();
nextTransactionalIdHintState.get().forEach(transactionalIdHints::add);

if (transactionalIdHints.size() > 1) {
throw new IllegalStateException(
"There should be at most one next transactional id hint written by the first subtask");
Expand Down Expand Up @@ -1444,8 +1444,9 @@ private void migrateNextTransactionalIdHindState(FunctionInitializationContext c
context.getOperatorStateStore()
.getUnionListState(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR_V2);

ArrayList<NextTransactionalIdHint> oldTransactionalIdHints =
Lists.newArrayList(oldNextTransactionalIdHintState.get());
List<NextTransactionalIdHint> oldTransactionalIdHints = new ArrayList<>();
oldNextTransactionalIdHintState.get().forEach(oldTransactionalIdHints::add);

if (!oldTransactionalIdHints.isEmpty()) {
nextTransactionalIdHintState.addAll(oldTransactionalIdHints);
// clear old state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
Expand Down Expand Up @@ -51,10 +49,12 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

/** Internal flink kafka producer. */
@PublicEvolving
Expand Down Expand Up @@ -169,7 +169,9 @@ public void close(Duration duration) {
LOG.debug(
"Closed internal KafkaProducer {}. Stacktrace: {}",
System.identityHashCode(this),
Joiner.on("\n").join(Thread.currentThread().getStackTrace()));
Arrays.stream(Thread.currentThread().getStackTrace())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n")));
}
closed = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.flink.util.TestLoggerExtension;

import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -43,6 +41,7 @@
import org.testcontainers.junit.jupiter.Testcontainers;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.function.Consumer;
Expand Down Expand Up @@ -173,7 +172,7 @@ private static Properties getProperties() {
}

private static List<Consumer<FlinkKafkaInternalProducer<?, ?>>> provideTransactionsFinalizer() {
return Lists.newArrayList(
return Arrays.asList(
FlinkKafkaInternalProducer::commitTransaction,
FlinkKafkaInternalProducer::abortTransaction);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.serialization.Deserializer;
Expand All @@ -34,6 +31,7 @@
import org.junit.Before;
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -162,7 +160,7 @@ public void testSerializeRecordWithKey() {

@Test
public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception {
final Map<String, String> config = ImmutableMap.of("simpleKey", "simpleValue");
final Map<String, String> config = Collections.singletonMap("simpleKey", "simpleValue");
final KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
.setTopic(DEFAULT_TOPIC)
Expand All @@ -179,7 +177,7 @@ public void testKafkaKeySerializerWrapperWithoutConfigurable() throws Exception

@Test
public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exception {
final Map<String, String> config = ImmutableMap.of("simpleKey", "simpleValue");
final Map<String, String> config = Collections.singletonMap("simpleKey", "simpleValue");
final KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
.setTopic(DEFAULT_TOPIC)
Expand All @@ -193,7 +191,7 @@ public void testKafkaValueSerializerWrapperWithoutConfigurable() throws Exceptio

@Test
public void testSerializeRecordWithKafkaSerializer() throws Exception {
final Map<String, String> config = ImmutableMap.of("configKey", "configValue");
final Map<String, String> config = Collections.singletonMap("configKey", "configValue");
final KafkaRecordSerializationSchema<String> schema =
KafkaRecordSerializationSchema.builder()
.setTopic(DEFAULT_TOPIC)
Expand Down Expand Up @@ -261,7 +259,7 @@ private static void assertOnlyOneSerializerAllowed(
KafkaRecordSerializationSchemaBuilder<String>,
KafkaRecordSerializationSchemaBuilder<String>>>
valueSerializationSetter() {
return ImmutableList.of(
return Arrays.asList(
(b) -> b.setKafkaValueSerializer(StringSerializer.class),
(b) -> b.setValueSerializationSchema(new SimpleStringSchema()),
(b) ->
Expand All @@ -274,7 +272,7 @@ private static void assertOnlyOneSerializerAllowed(
KafkaRecordSerializationSchemaBuilder<String>,
KafkaRecordSerializationSchemaBuilder<String>>>
keySerializationSetter() {
return ImmutableList.of(
return Arrays.asList(
(b) -> b.setKafkaKeySerializer(StringSerializer.class),
(b) -> b.setKeySerializationSchema(new SimpleStringSchema()),
(b) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@
import org.apache.flink.util.DockerImageVersions;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
Expand All @@ -88,6 +86,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -624,7 +623,11 @@ private void checkProducerLeak() throws InterruptedException {
}

private String format(Map.Entry<Thread, StackTraceElement[]> leak) {
return leak.getKey().getName() + ":\n" + Joiner.on("\n").join(leak.getValue());
String stackTrace =
Arrays.stream(leak.getValue())
.map(StackTraceElement::toString)
.collect(Collectors.joining("\n"));
return leak.getKey().getName() + ":\n" + stackTrace;
}

private boolean findAliveKafkaThread(Map.Entry<Thread, StackTraceElement[]> threadStackTrace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.UserCodeClassLoader;

import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -60,6 +58,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -545,7 +544,7 @@ private KafkaWriter<Integer> createWriterWithConfiguration(
new SinkInitContext(sinkWriterMetricGroup, timeService, metadataConsumer),
new DummyRecordSerializer(),
new DummySchemaContext(),
ImmutableList.of());
Collections.emptyList());
}

private KafkaWriter<Integer> createWriterWithConfiguration(
Expand All @@ -557,7 +556,7 @@ private KafkaWriter<Integer> createWriterWithConfiguration(
sinkInitContext,
new DummyRecordSerializer(),
new DummySchemaContext(),
ImmutableList.of());
Collections.emptyList());
}

private static Properties getKafkaClientConfiguration() {
Expand Down
Loading

0 comments on commit a81cbeb

Please sign in to comment.