diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java index 5f87de95bb..608bda3cef 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java @@ -36,6 +36,8 @@ import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; + import org.apache.commons.lang3.tuple.Pair; import org.apache.samza.SamzaException; import org.apache.samza.checkpoint.Checkpoint; @@ -51,6 +53,7 @@ import org.apache.samza.storage.TaskBackupManager; import org.apache.samza.storage.blobstore.diff.DirDiff; import org.apache.samza.storage.blobstore.index.DirIndex; +import org.apache.samza.storage.blobstore.index.FileIndex; import org.apache.samza.storage.blobstore.index.SnapshotIndex; import org.apache.samza.storage.blobstore.index.SnapshotMetadata; import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics; @@ -174,7 +177,7 @@ public CompletableFuture> upload(CheckpointId checkpointId, storeToSCMAndSnapshotIndexPairFutures = new HashMap<>(); // This map is used to return serialized State Checkpoint Markers to the caller Map> storeToSerializedSCMFuture = new HashMap<>(); - + BiPredicate areSameFile = DirDiffUtil.areSameFile(false); storesToBackup.forEach((storeName) -> { long storeUploadStartTime = System.nanoTime(); try { @@ -207,7 +210,7 @@ public CompletableFuture> upload(CheckpointId checkpointId, long dirDiffStartTime = System.nanoTime(); // get the diff between previous and current store directories - DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, DirDiffUtil.areSameFile(false)); + DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, areSameFile); metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() - dirDiffStartTime); DirDiff.Stats stats = DirDiff.getStats(dirDiff);