Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-25509][connector/kafka] Add RecordEvaluator to dynamically stop source based on de-serialized records #76

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumStateSerializer;
Expand All @@ -42,6 +43,8 @@
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import javax.annotation.Nullable;

import java.util.Properties;

/**
Expand Down Expand Up @@ -87,6 +90,7 @@ public class DynamicKafkaSource<T>
private final OffsetsInitializer stoppingOffsetsInitializer;
private final Properties properties;
private final Boundedness boundedness;
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;

DynamicKafkaSource(
KafkaStreamSubscriber kafkaStreamSubscriber,
Expand All @@ -95,14 +99,16 @@ public class DynamicKafkaSource<T>
OffsetsInitializer startingOffsetsInitializer,
OffsetsInitializer stoppingOffsetsInitializer,
Properties properties,
Boundedness boundedness) {
Boundedness boundedness,
@Nullable RecordEvaluator<T> eofRecordEvaluator) {
this.kafkaStreamSubscriber = kafkaStreamSubscriber;
this.deserializationSchema = deserializationSchema;
this.properties = properties;
this.kafkaMetadataService = kafkaMetadataService;
this.startingOffsetsInitializer = startingOffsetsInitializer;
this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
this.boundedness = boundedness;
this.eofRecordEvaluator = eofRecordEvaluator;
}

/**
Expand Down Expand Up @@ -134,7 +140,8 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<T, DynamicKafkaSourceSplit> createReader(
SourceReaderContext readerContext) {
return new DynamicKafkaSourceReader<>(readerContext, deserializationSchema, properties);
return new DynamicKafkaSourceReader<>(
readerContext, deserializationSchema, properties, eofRecordEvaluator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber;
Expand Down Expand Up @@ -52,6 +53,7 @@ public class DynamicKafkaSourceBuilder<T> {
private OffsetsInitializer stoppingOffsetsInitializer;
private Boundedness boundedness;
private final Properties props;
private RecordEvaluator<T> eofRecordEvaluator;

DynamicKafkaSourceBuilder() {
this.kafkaStreamSubscriber = null;
Expand Down Expand Up @@ -140,6 +142,18 @@ public DynamicKafkaSourceBuilder<T> setDeserializer(
return this;
}

/**
* Set the {@link RecordEvaluator}.
*
* @param eofRecordEvaluator the {@link RecordEvaluator}.
* @return the builder.
*/
public DynamicKafkaSourceBuilder<T> setEofRecordEvaluator(
RecordEvaluator<T> eofRecordEvaluator) {
this.eofRecordEvaluator = eofRecordEvaluator;
return this;
}

/**
* Set the starting offsets of the stream. This will be applied to all clusters.
*
Expand Down Expand Up @@ -217,7 +231,8 @@ public DynamicKafkaSource<T> build() {
startingOffsetsInitializer,
stoppingOffsetsInitializer,
props,
boundedness);
boundedness,
eofRecordEvaluator);
}

// Below are utility methods, code and structure are mostly copied over from KafkaSourceBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
Expand Down Expand Up @@ -54,6 +55,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -95,11 +98,13 @@ public class DynamicKafkaSourceReader<T> implements SourceReader<T, DynamicKafka
private boolean isActivelyConsumingSplits;
private boolean isNoMoreSplits;
private AtomicBoolean restartingReaders;
@Nullable private final RecordEvaluator<T> eofRecordEvaluator;

public DynamicKafkaSourceReader(
SourceReaderContext readerContext,
KafkaRecordDeserializationSchema<T> deserializationSchema,
Properties properties) {
Properties properties,
@Nullable RecordEvaluator<T> eofRecordEvaluator) {
this.readerContext = readerContext;
this.clusterReaderMap = new TreeMap<>();
this.deserializationSchema = deserializationSchema;
Expand All @@ -116,6 +121,7 @@ public DynamicKafkaSourceReader(
this.isActivelyConsumingSplits = false;
this.restartingReaders = new AtomicBoolean();
this.clustersProperties = new HashMap<>();
this.eofRecordEvaluator = eofRecordEvaluator;
}

/**
Expand Down Expand Up @@ -448,7 +454,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}
});

KafkaRecordEmitter<T> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
KafkaRecordEmitter<T> recordEmitter =
new KafkaRecordEmitter<>(deserializationSchema, eofRecordEvaluator);
return new KafkaSourceReader<>(
elementsQueue,
new KafkaSourceFetcherManager(
Expand All @@ -463,7 +470,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
recordEmitter,
toConfiguration(readerSpecificProperties),
readerContext,
kafkaSourceReaderMetrics);
kafkaSourceReaderMetrics,
eofRecordEvaluator);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
Expand Down Expand Up @@ -102,11 +103,13 @@ public class KafkaSource<OUT>
private final Properties props;
// Client rackId callback
private final SerializableSupplier<String> rackIdSupplier;
@Nullable private RecordEvaluator<OUT> eofRecordEvaluator;

KafkaSource(
KafkaSubscriber subscriber,
OffsetsInitializer startingOffsetsInitializer,
@Nullable OffsetsInitializer stoppingOffsetsInitializer,
@Nullable RecordEvaluator<OUT> eofRecordEvaluator,
Boundedness boundedness,
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
Properties props,
Expand All @@ -118,6 +121,7 @@ public class KafkaSource<OUT>
this.deserializationSchema = deserializationSchema;
this.props = props;
this.rackIdSupplier = rackIdSupplier;
this.eofRecordEvaluator = eofRecordEvaluator;
}

/**
Expand Down Expand Up @@ -171,7 +175,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
Optional.ofNullable(rackIdSupplier)
.map(Supplier::get)
.orElse(null));
KafkaRecordEmitter<OUT> recordEmitter = new KafkaRecordEmitter<>(deserializationSchema);
KafkaRecordEmitter<OUT> recordEmitter =
new KafkaRecordEmitter<>(deserializationSchema, eofRecordEvaluator);

return new KafkaSourceReader<>(
elementsQueue,
Expand All @@ -180,7 +185,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
recordEmitter,
toConfiguration(props),
readerContext,
kafkaSourceReaderMetrics);
kafkaSourceReaderMetrics,
eofRecordEvaluator);
}

@Internal
Expand Down Expand Up @@ -251,4 +257,10 @@ KafkaSubscriber getKafkaSubscriber() {
OffsetsInitializer getStoppingOffsetsInitializer() {
return stoppingOffsetsInitializer;
}

@VisibleForTesting
@Nullable
RecordEvaluator<OUT> getEofRecordEvaluator() {
return eofRecordEvaluator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.base.source.reader.RecordEvaluator;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializerValidator;
Expand Down Expand Up @@ -107,6 +108,7 @@ public class KafkaSourceBuilder<OUT> {
protected Properties props;
// Client rackId supplier
private SerializableSupplier<String> rackIdSupplier;
private RecordEvaluator<OUT> eofRecordEvaluator;

KafkaSourceBuilder() {
this.subscriber = null;
Expand Down Expand Up @@ -353,6 +355,26 @@ public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
return this;
}

/**
* Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource.
*
* <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
* whether the corresponding split has reached end of stream. If a record is matched by the
* evaluator, the source would not emit this record as well as the following records in the same
* split.
*
* <p>Note that the evaluator works jointly with the stopping offsets specified by the {@link
* #setBounded(OffsetsInitializer)} or the {@link #setUnbounded(OffsetsInitializer)}. The source
* stops consuming from a split when any of these conditions is met.
*
* @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
* @return this KafkaSourceBuilder.
*/
public KafkaSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this condition is met, do we expect the Flink job to go into FINISHED state? If so, I think we need to only support this in bounded mode, as unbounded mode can't reach FINISHED state. This is because the KafkaSource and DynamicKafkaSource only send "notifyNoMoreSplits" for the bounded case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this comment, @mas-chen .
Yes, we should make the source FINISHED. I will fix this part later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @mas-chen .

I read some parts about the stoppingOffsetsInitializer. Actually the eofRecordEvaluator does the same job as stoppingOffsetsInitializer. We want to use both eofRecordEvaluator and stoppingOffsetsInitializer to decide when to stop reading and support to use them in one same source together.

But I find the method KafkaSourceBuilder<OUT> setUnbounded(OffsetsInitializer stoppingOffsetsInitializer) in builder, which supports to set a stoppingOffsetsInitializer in an unbounded source. I want to keep the same behavior as stoppingOffsetsInitializer and do not add the limitation for eofRecordEvaluator. What do you think?

Looking forward to your option.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, then we should be fine. The job won't terminate (and I guess this is desired in some situations, just learned this) but the source will terminate eventually

this.eofRecordEvaluator = eofRecordEvaluator;
return this;
}

/**
* Sets the client id prefix of this KafkaSource.
*
Expand Down Expand Up @@ -435,6 +457,7 @@ public KafkaSource<OUT> build() {
subscriber,
startingOffsetsInitializer,
stoppingOffsetsInitializer,
eofRecordEvaluator,
boundedness,
deserializationSchema,
props,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsRemoval;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class KafkaPartitionSplitReader
// Tracking empty splits that has not been added to finished splits in fetch()
private final Set<String> emptySplits = new HashSet<>();

// Tracking removed splits that has not been added to finished splits in fetch()
private final Set<String> removedSplits = new HashSet<>();

public KafkaPartitionSplitReader(
Properties props,
SourceReaderContext context,
Expand Down Expand Up @@ -116,7 +120,7 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(
ConsumerRecords.empty(), kafkaSourceReaderMetrics);
markEmptySplitsAsFinished(recordsBySplits);
markSplitsAsFinished(recordsBySplits);
return recordsBySplits;
}
KafkaPartitionSplitRecords recordsBySplits =
Expand Down Expand Up @@ -148,7 +152,7 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, trackTp);
});

markEmptySplitsAsFinished(recordsBySplits);
markSplitsAsFinished(recordsBySplits);

// Unassign the partitions that has finished.
if (!finishedPartitions.isEmpty()) {
Expand All @@ -162,25 +166,55 @@ public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOExce
return recordsBySplits;
}

private void markEmptySplitsAsFinished(KafkaPartitionSplitRecords recordsBySplits) {
private void markSplitsAsFinished(KafkaPartitionSplitRecords recordsBySplits) {
// Some splits are discovered as empty when handling split additions. These splits should be
// added to finished splits to clean up states in split fetcher and source reader.
markSplitsAsFinished(emptySplits, recordsBySplits);

// Some splits are removed when handling split changes. These splits should be
// added to finished splits to clean up states in split fetcher and source reader.
markSplitsAsFinished(removedSplits, recordsBySplits);
}

private void markSplitsAsFinished(
Set<String> splits, KafkaPartitionSplitRecords recordsBySplits) {
// Some splits are discovered as empty when handling split additions. These splits should be
// added to finished splits to clean up states in split fetcher and source reader.
if (!emptySplits.isEmpty()) {
recordsBySplits.finishedSplits.addAll(emptySplits);
emptySplits.clear();
if (!splits.isEmpty()) {
recordsBySplits.finishedSplits.addAll(splits);
splits.clear();
}
}

@Override
public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
// Get all the partition assignments and stopping offsets.
if (!(splitsChange instanceof SplitsAddition)) {
if (splitsChange instanceof SplitsAddition) {
// Get all the partition assignments and stopping offsets.
handleSplitsAddition(splitsChange);
} else if (splitsChange instanceof SplitsRemoval) {
handleSplitsRemoval(splitsChange);
} else {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChange.getClass()));
}
}

private void handleSplitsRemoval(SplitsChange<KafkaPartitionSplit> splitsRemoval) {
removedSplits.addAll(
splitsRemoval.splits().stream()
.map(KafkaPartitionSplit::splitId)
.collect(Collectors.toSet()));
List<TopicPartition> finishedPartitions =
splitsRemoval.splits().stream()
.map(KafkaPartitionSplit::getTopicPartition)
.collect(Collectors.toList());
finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric);
unassignPartitions(finishedPartitions);
}

private void handleSplitsAddition(SplitsChange<KafkaPartitionSplit> splitsAddition) {
// Assignment.
List<TopicPartition> newPartitionAssignments = new ArrayList<>();
// Starting offsets.
Expand All @@ -192,7 +226,7 @@ public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange)
Set<TopicPartition> partitionsStoppingAtCommitted = new HashSet<>();

// Parse the starting and stopping offsets.
splitsChange
splitsAddition
.splits()
.forEach(
s -> {
Expand Down Expand Up @@ -223,7 +257,7 @@ public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange)
// After acquiring the starting and stopping offsets, remove the empty splits if necessary.
removeEmptySplits();

maybeLogSplitChangesHandlingResult(splitsChange);
maybeLogSplitChangesHandlingResult(splitsAddition);
}

@Override
Expand Down
Loading
Loading