Skip to content

Commit

Permalink
[FLINK-34484][tests] Add UT/IT tests to verify localBackup and localR…
Browse files Browse the repository at this point in the history
…ecovery configs
  • Loading branch information
ljz2051 authored and fredia committed Mar 14, 2024
1 parent ed75795 commit 649e2b4
Show file tree
Hide file tree
Showing 8 changed files with 94 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ void testCreationFromConfigDefault() throws Exception {
}

@Test
void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception {
void testLocalStateNoCreateDirWhenDisabledLocalBackupAndRecovery() throws Exception {
JobID jobID = new JobID();
JobVertexID jobVertexID = new JobVertexID();
AllocationID allocationID = new AllocationID();
Expand All @@ -165,9 +165,11 @@ void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception {
};

boolean localRecoveryEnabled = false;
boolean localBackupEnabled = false;
TaskExecutorLocalStateStoresManager storesManager =
new TaskExecutorLocalStateStoresManager(
localRecoveryEnabled,
localBackupEnabled,
Reference.owned(rootDirs),
Executors.directExecutor());

Expand Down Expand Up @@ -196,6 +198,21 @@ void testLocalStateNoCreateDirWhenDisabledLocalRecovery() throws Exception {
*/
@Test
void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
testSubtaskStateStoreDirectoryCreateAndDelete(true, true);
}

@Test
void testStateStoreDirectoryCreateAndDeleteWithLocalRecoveryEnabled() throws Exception {
testSubtaskStateStoreDirectoryCreateAndDelete(true, false);
}

@Test
void testStateStoreDirectoryCreateAndDeleteWithLocalBackupEnabled() throws Exception {
testSubtaskStateStoreDirectoryCreateAndDelete(false, true);
}

private void testSubtaskStateStoreDirectoryCreateAndDelete(
boolean localRecoveryEnabled, boolean localBackupEnabled) throws Exception {

JobID jobID = new JobID();
JobVertexID jobVertexID = new JobVertexID();
Expand All @@ -209,7 +226,10 @@ void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception {
};
TaskExecutorLocalStateStoresManager storesManager =
new TaskExecutorLocalStateStoresManager(
true, Reference.owned(rootDirs), Executors.directExecutor());
localRecoveryEnabled,
localBackupEnabled,
Reference.owned(rootDirs),
Executors.directExecutor());

TaskLocalStateStore taskLocalStateStore =
storesManager.localStateStoreForSubtask(
Expand Down Expand Up @@ -305,7 +325,10 @@ void testOwnedLocalStateDirectoriesAreDeletedOnShutdown() throws IOException {

final TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager =
new TaskExecutorLocalStateStoresManager(
true, Reference.owned(localStateDirectories), Executors.directExecutor());
true,
true,
Reference.owned(localStateDirectories),
Executors.directExecutor());

for (File localStateDirectory : localStateDirectories) {
assertThat(localStateDirectory).exists();
Expand All @@ -327,6 +350,7 @@ void testBorrowedLocalStateDirectoriesAreNotDeletedOnShutdown() throws IOExcepti

final TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager =
new TaskExecutorLocalStateStoresManager(
true,
true,
Reference.borrowed(localStateDirectories),
Executors.directExecutor());
Expand All @@ -348,6 +372,7 @@ void testRetainLocalStateForAllocationsDeletesUnretainedAllocationDirectories()
final File localStateStore = TempDirUtils.newFolder(temporaryFolder.toPath());
final TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager =
new TaskExecutorLocalStateStoresManager(
true,
true,
Reference.owned(new File[] {localStateStore}),
Executors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,28 @@ void retrieveNullIfNoPersistedLocalState() {
assertThat(taskLocalStateStore.retrieveLocalState(0)).isNull();
}

@Test
void retrieveNullIfDisableLocalRecovery() {
LocalSnapshotDirectoryProvider directoryProvider =
new LocalSnapshotDirectoryProviderImpl(
allocationBaseDirs, jobID, jobVertexID, subtaskIdx);
LocalRecoveryConfig localRecoveryConfig =
new LocalRecoveryConfig(false, true, directoryProvider);
TaskLocalStateStoreImpl localStateStore =
new TaskLocalStateStoreImpl(
jobID,
allocationID,
jobVertexID,
subtaskIdx,
localRecoveryConfig,
Executors.directExecutor());

final TaskStateSnapshot taskStateSnapshot = createTaskStateSnapshot();
final long checkpointId = 1L;
localStateStore.storeLocalState(checkpointId, taskStateSnapshot);
assertThat(localStateStore.retrieveLocalState(checkpointId)).isNull();
}

@Test
void retrievePersistedLocalStateFromDisc() {
final TaskStateSnapshot taskStateSnapshot = createTaskStateSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ void testDeployedExecutionReporting() throws Exception {
setupResourceManagerGateway(initialSlotReportFuture);
final TaskExecutorLocalStateStoresManager localStateStoresManager =
new TaskExecutorLocalStateStoresManager(
false,
false,
Reference.owned(new File[] {TempDirUtils.newFolder(tempDir)}),
Executors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ private void internalTestPartitionRelease(

final TaskExecutorLocalStateStoresManager localStateStoresManager =
new TaskExecutorLocalStateStoresManager(
false,
false,
Reference.owned(new File[] {TempDirUtils.newFolder(tempDir)}),
Executors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ private TaskExecutor createTaskExecutor(
private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager()
throws IOException {
return new TaskExecutorLocalStateStoresManager(
false,
false,
Reference.owned(new File[] {TempDirUtils.newFolder(tempDir)}),
Executors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {

final TaskExecutorLocalStateStoresManager localStateStoresManager =
new TaskExecutorLocalStateStoresManager(
false,
false,
Reference.borrowed(ioManager.getSpillingDirectories()),
Executors.directExecutor());
Expand Down Expand Up @@ -2740,6 +2741,7 @@ void testReleaseInactiveSlots() throws Exception {
private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager()
throws IOException {
return new TaskExecutorLocalStateStoresManager(
false,
false,
Reference.owned(new File[] {TempDirUtils.newFolder(tempDir)}),
Executors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ private TaskSubmissionTestEnvironment(

TaskExecutorLocalStateStoresManager localStateStoresManager =
new TaskExecutorLocalStateStoresManager(
false,
false,
Reference.owned(new File[] {temporaryFolder.newFolder()}),
Executors.directExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.test.checkpointing;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.util.TestLogger;
Expand All @@ -30,8 +32,10 @@
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum;
import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.FILE;
Expand All @@ -52,16 +56,40 @@ public class LocalRecoveryITCase extends TestLogger {

@Parameterized.Parameter public StateBackendEnum backendEnum;

@Parameterized.Parameters(name = "statebackend type ={0}")
public static Collection<StateBackendEnum> parameter() {
return Arrays.asList(ROCKSDB_FULL, ROCKSDB_INCREMENTAL_ZK, FILE);
@Parameterized.Parameter(1)
public boolean localRecoveryEnabled;

@Parameterized.Parameter(2)
public boolean localBackupEnabled;

private static final List<StateBackendEnum> STATE_BACKEND_ENUMS =
Arrays.asList(ROCKSDB_FULL, ROCKSDB_INCREMENTAL_ZK, FILE);

private static final List<Tuple2<Boolean, Boolean>> LOCAL_BACKUP_AND_RECOVERY_CONFIGS =
Arrays.asList(Tuple2.of(true, true), Tuple2.of(true, false), Tuple2.of(false, true));

@Parameterized.Parameters(
name = "stateBackendType = {0}, localBackupEnabled = {1}, localRecoveryEnabled = {2}")
public static Collection<Object[]> parameter() {
List<Object[]> parameterList = new ArrayList<>();
for (StateBackendEnum stateBackend : STATE_BACKEND_ENUMS) {
for (Tuple2<Boolean, Boolean> backupAndRecoveryConfig :
LOCAL_BACKUP_AND_RECOVERY_CONFIGS) {
parameterList.add(
new Object[] {
stateBackend, backupAndRecoveryConfig.f0, backupAndRecoveryConfig.f1
});
}
}
return parameterList;
}

@Test
public final void executeTest() throws Exception {
EventTimeWindowCheckpointingITCase.tempFolder.create();
EventTimeWindowCheckpointingITCase windowChkITCase =
new EventTimeWindowCheckpointingITCaseInstance(backendEnum, true);
new EventTimeWindowCheckpointingITCaseInstance(
backendEnum, localBackupEnabled, localRecoveryEnabled);

executeTest(windowChkITCase);
}
Expand Down Expand Up @@ -97,13 +125,16 @@ private static class EventTimeWindowCheckpointingITCaseInstance
extends EventTimeWindowCheckpointingITCase {

private final StateBackendEnum backendEnum;
private final boolean localBackupEnable;
private final boolean localRecoveryEnabled;

public EventTimeWindowCheckpointingITCaseInstance(
StateBackendEnum backendEnum, boolean localRecoveryEnabled) {
StateBackendEnum backendEnum,
boolean localBackupEnable,
boolean localRecoveryEnabled) {
super(backendEnum, 2);

this.backendEnum = backendEnum;
this.localBackupEnable = localBackupEnable;
this.localRecoveryEnabled = localRecoveryEnabled;
}

Expand All @@ -115,9 +146,8 @@ protected StateBackendEnum getStateBackend() {
@Override
protected Configuration createClusterConfig() throws IOException {
Configuration config = super.createClusterConfig();

config.set(StateRecoveryOptions.LOCAL_RECOVERY, localRecoveryEnabled);

config.set(CheckpointingOptions.LOCAL_BACKUP_ENABLED, localBackupEnable);
return config;
}
}
Expand Down

0 comments on commit 649e2b4

Please sign in to comment.