Skip to content

Commit

Permalink
[FLINK-33231] [source] Properly evict offsetsToCommit cache on checkp…
Browse files Browse the repository at this point in the history
…oint complete if no offsets exist

Prior to this fix, if the offsets to commit for a given checkpoint is empty,
which can be the case if no starting offsets were retrieved from Kafka yet,
then on checkpoint completion the cache is not properly evicted up to the
given checkpoint.

This change fixes this such that in notifyOnCheckpointComplete, we shortcut
the method execution to not need to try to commit the offsets since its
empty anyways, and always remember to evict the cache up to the completed
checkpoint.

(cherry picked from commit b0f15f2)
  • Loading branch information
tzulitai authored and MartijnVisser committed Oct 11, 2023
1 parent 20c854c commit b316d8c
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,13 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
Map<TopicPartition, OffsetAndMetadata> committedPartitions =
offsetsToCommit.get(checkpointId);
if (committedPartitions == null) {
LOG.debug(
"Offsets for checkpoint {} either do not exist or have already been committed.",
checkpointId);
LOG.debug("Offsets for checkpoint {} have already been committed.", checkpointId);
return;
}

if (committedPartitions.isEmpty()) {
LOG.debug("There are no offsets to commit for checkpoint {}.", checkpointId);
removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
return;
}

Expand Down Expand Up @@ -168,14 +172,17 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
entry ->
committedPartitions.containsKey(
entry.getKey()));
while (!offsetsToCommit.isEmpty()
&& offsetsToCommit.firstKey() <= checkpointId) {
offsetsToCommit.remove(offsetsToCommit.firstKey());
}
removeAllOffsetsToCommitUpToCheckpoint(checkpointId);
}
});
}

private void removeAllOffsetsToCommitUpToCheckpoint(long checkpointId) {
while (!offsetsToCommit.isEmpty() && offsetsToCommit.firstKey() <= checkpointId) {
offsetsToCommit.remove(offsetsToCommit.firstKey());
}
}

@Override
protected KafkaPartitionSplitState initializedState(KafkaPartitionSplit split) {
return new KafkaPartitionSplitState(split);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -183,7 +184,26 @@ void testCommitEmptyOffsets() throws Exception {
(KafkaSourceReader<Integer>)
createReader(Boundedness.CONTINUOUS_UNBOUNDED, groupId)) {
reader.snapshotState(100L);
reader.notifyCheckpointComplete(100L);
reader.snapshotState(101L);
reader.snapshotState(102L);

// After each snapshot, a new entry should have been added to the offsets-to-commit
// cache for the checkpoint
final Map<Long, Map<TopicPartition, OffsetAndMetadata>> expectedOffsetsToCommit =
new HashMap<>();
expectedOffsetsToCommit.put(100L, new HashMap<>());
expectedOffsetsToCommit.put(101L, new HashMap<>());
expectedOffsetsToCommit.put(102L, new HashMap<>());
assertThat(reader.getOffsetsToCommit()).isEqualTo(expectedOffsetsToCommit);

// only notify up to checkpoint 101L; all offsets prior to 101L should be evicted from
// cache, leaving only 102L
reader.notifyCheckpointComplete(101L);

final Map<Long, Map<TopicPartition, OffsetAndMetadata>>
expectedOffsetsToCommitAfterNotify = new HashMap<>();
expectedOffsetsToCommitAfterNotify.put(102L, new HashMap<>());
assertThat(reader.getOffsetsToCommit()).isEqualTo(expectedOffsetsToCommitAfterNotify);
}
// Verify the committed offsets.
try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
Expand Down

0 comments on commit b316d8c

Please sign in to comment.