diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java index bdb94466de940..67bf02cd31a51 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBIncrementalCheckpointUtils.java @@ -29,21 +29,28 @@ import org.rocksdb.LiveFileMetaData; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; /** Utils for RocksDB Incremental Checkpoint. */ public class RocksDBIncrementalCheckpointUtils { + private static final Logger logger = + LoggerFactory.getLogger(RocksDBIncrementalCheckpointUtils.class); + /** * Evaluates state handle's "score" regarding the target range when choosing the best state * handle to init the initial db for recovery, if the overlap fraction is less than @@ -112,7 +119,7 @@ public int compareTo(@Nullable Score other) { * @param targetKeyGroupRange the target key group range. * @param currentKeyGroupRange the key group range of the db instance. * @param keyGroupPrefixBytes Number of bytes required to prefix the key groups. - * @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean up redundant files. + * @param useDeleteFilesInRange whether to call db.deleteFilesInRanges for the deleted ranges. */ public static void clipDBWithKeyGroupRange( @Nonnull RocksDB db, @@ -122,31 +129,45 @@ public static void clipDBWithKeyGroupRange( @Nonnegative int keyGroupPrefixBytes, boolean useDeleteFilesInRange) throws RocksDBException { - List deletedRanges = new ArrayList<>(4); + + List deleteFilesRanges = new ArrayList<>(4); if (currentKeyGroupRange.getStartKeyGroup() < targetKeyGroupRange.getStartKeyGroup()) { - final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; - final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - currentKeyGroupRange.getStartKeyGroup(), beginKeyGroupBytes); - CompositeKeySerializationUtils.serializeKeyGroup( - targetKeyGroupRange.getStartKeyGroup(), endKeyGroupBytes); - deletedRanges.add(beginKeyGroupBytes); - deletedRanges.add(endKeyGroupBytes); + prepareRangeDeletes( + keyGroupPrefixBytes, + currentKeyGroupRange.getStartKeyGroup(), + targetKeyGroupRange.getStartKeyGroup(), + deleteFilesRanges); } if (currentKeyGroupRange.getEndKeyGroup() > targetKeyGroupRange.getEndKeyGroup()) { - final byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; - final byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; - CompositeKeySerializationUtils.serializeKeyGroup( - targetKeyGroupRange.getEndKeyGroup() + 1, beginKeyGroupBytes); - CompositeKeySerializationUtils.serializeKeyGroup( - currentKeyGroupRange.getEndKeyGroup() + 1, endKeyGroupBytes); - deletedRanges.add(beginKeyGroupBytes); - deletedRanges.add(endKeyGroupBytes); + prepareRangeDeletes( + keyGroupPrefixBytes, + targetKeyGroupRange.getEndKeyGroup() + 1, + currentKeyGroupRange.getEndKeyGroup() + 1, + deleteFilesRanges); } - deleteRangeData(db, columnFamilyHandles, deletedRanges, useDeleteFilesInRange); + logger.info( + "Performing range delete for backend with target key-groups range {} with boundaries set {} - deleteFilesInRanges = {}.", + targetKeyGroupRange.prettyPrintInterval(), + deleteFilesRanges.stream().map(Arrays::toString).collect(Collectors.toList()), + useDeleteFilesInRange); + + deleteRangeData(db, columnFamilyHandles, deleteFilesRanges, useDeleteFilesInRange); + } + + private static void prepareRangeDeletes( + int keyGroupPrefixBytes, + int beginKeyGroup, + int endKeyGroup, + List deleteFilesRangesOut) { + byte[] beginKeyGroupBytes = new byte[keyGroupPrefixBytes]; + byte[] endKeyGroupBytes = new byte[keyGroupPrefixBytes]; + CompositeKeySerializationUtils.serializeKeyGroup(beginKeyGroup, beginKeyGroupBytes); + CompositeKeySerializationUtils.serializeKeyGroup(endKeyGroup, endKeyGroupBytes); + deleteFilesRangesOut.add(beginKeyGroupBytes); + deleteFilesRangesOut.add(endKeyGroupBytes); } /** @@ -156,7 +177,7 @@ public static void clipDBWithKeyGroupRange( * @param columnFamilyHandles the column family need to be clipped. * @param deleteRanges - pairs of deleted ranges (from1, to1, from2, to2, ...). For each pair * [from, to), the startKey ('from') is inclusive, the endKey ('to') is exclusive. - * @param useDeleteFilesInRange Whether to use deleteFilesInRange to clean up redundant files. + * @param useDeleteFilesInRange whether to use deleteFilesInRange to clean up redundant files. */ private static void deleteRangeData( RocksDB db, @@ -165,6 +186,11 @@ private static void deleteRangeData( boolean useDeleteFilesInRange) throws RocksDBException { + if (deleteRanges.isEmpty()) { + // nothing to do. + return; + } + Preconditions.checkArgument(deleteRanges.size() % 2 == 0); for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { // First delete the files in ranges