Skip to content

Commit

Permalink
[hotfix] Fix configuration through TernaryBoolean in EmbeddedRocksDBS…
Browse files Browse the repository at this point in the history
…tateBackend.
  • Loading branch information
StefanRRichter committed Feb 28, 2024
1 parent b906922 commit 5be8b3e
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 14 deletions.
18 changes: 18 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/TernaryBoolean.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.util;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -71,6 +73,22 @@ public Boolean getAsBoolean() {
return this == UNDEFINED ? null : (this == TRUE ? Boolean.TRUE : Boolean.FALSE);
}

/**
* Merges an existing value with a config, accepting the config's value only if the existing
* value is undefined.
*
* @param original the value to merge with the config.
* @param configOption the config option to merge with from the config.
* @param config the config to merge with.
*/
public static TernaryBoolean mergeTernaryBooleanWithConfig(
TernaryBoolean original, ConfigOption<Boolean> configOption, ReadableConfig config) {
if (original != TernaryBoolean.UNDEFINED) {
return original;
}
return TernaryBoolean.fromBoxedBoolean(config.getOptional(configOption).orElse(null));
}

// ------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,20 @@ private EmbeddedRocksDBStateBackend(
"Overlap fraction threshold of restoring should be between 0 and 1");

incrementalRestoreAsyncCompactAfterRescale =
original.incrementalRestoreAsyncCompactAfterRescale == TernaryBoolean.UNDEFINED
? TernaryBoolean.fromBoxedBoolean(
config.get(INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE))
: original.incrementalRestoreAsyncCompactAfterRescale;
TernaryBoolean.mergeTernaryBooleanWithConfig(
original.incrementalRestoreAsyncCompactAfterRescale,
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE,
config);

useIngestDbRestoreMode =
original.useIngestDbRestoreMode == TernaryBoolean.UNDEFINED
? TernaryBoolean.fromBoxedBoolean(config.get(USE_INGEST_DB_RESTORE_MODE))
: TernaryBoolean.fromBoolean(original.getUseIngestDbRestoreMode());
TernaryBoolean.mergeTernaryBooleanWithConfig(
original.useIngestDbRestoreMode, USE_INGEST_DB_RESTORE_MODE, config);

rescalingUseDeleteFilesInRange =
original.rescalingUseDeleteFilesInRange == TernaryBoolean.UNDEFINED
? TernaryBoolean.fromBoxedBoolean(
config.get(USE_DELETE_FILES_IN_RANGE_DURING_RESCALING))
: original.rescalingUseDeleteFilesInRange;
TernaryBoolean.mergeTernaryBooleanWithConfig(
original.rescalingUseDeleteFilesInRange,
USE_DELETE_FILES_IN_RANGE_DURING_RESCALING,
config);

this.rocksDBMemoryFactory = original.rocksDBMemoryFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
Expand Down Expand Up @@ -92,9 +93,12 @@
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;

import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.reset;
Expand Down Expand Up @@ -186,14 +190,19 @@ protected ConfigurableStateBackend getStateBackend() throws IOException {
dbPath = TempDirUtils.newFolder(tempFolder).getAbsolutePath();
EmbeddedRocksDBStateBackend backend =
new EmbeddedRocksDBStateBackend(enableIncrementalCheckpointing);
Configuration configuration = createBackendConfig();
backend = backend.configure(configuration, Thread.currentThread().getContextClassLoader());
backend.setDbStoragePath(dbPath);
return backend;
}

private Configuration createBackendConfig() {
Configuration configuration = new Configuration();
configuration.set(USE_INGEST_DB_RESTORE_MODE, useIngestDB);
configuration.set(
RocksDBOptions.TIMER_SERVICE_FACTORY,
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
backend = backend.configure(configuration, Thread.currentThread().getContextClassLoader());
backend.setDbStoragePath(dbPath);
return backend;
return configuration;
}

@Override
Expand Down Expand Up @@ -656,6 +665,48 @@ public void testMapStateClear() throws Exception {
assertThatThrownBy(state::clear).isInstanceOf(FlinkRuntimeException.class);
}

/** Test for all configs that use {@link org.apache.flink.util.TernaryBoolean}. */
@TestTemplate
public void testConfigureTernaryBooleanConfigs() throws Exception {
ConfigurableStateBackend stateBackend = getStateBackend();
if (!(stateBackend instanceof EmbeddedRocksDBStateBackend)) {
return;
}
EmbeddedRocksDBStateBackend rocksDBStateBackend =
(EmbeddedRocksDBStateBackend) stateBackend;
Configuration baseConfig = createBackendConfig();
Configuration testConfig = new Configuration();
testConfig.setBoolean(
USE_INGEST_DB_RESTORE_MODE, !USE_INGEST_DB_RESTORE_MODE.defaultValue());
testConfig.setBoolean(
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE,
!INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue());
testConfig.setBoolean(
USE_DELETE_FILES_IN_RANGE_DURING_RESCALING,
!USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue());
EmbeddedRocksDBStateBackend configuredBackend =
rocksDBStateBackend.configure(
testConfig, Thread.currentThread().getContextClassLoader());

checkBooleanWithBaseConf(
baseConfig,
USE_INGEST_DB_RESTORE_MODE,
configuredBackend.getUseIngestDbRestoreMode());
checkBooleanWithBaseConf(
baseConfig,
USE_DELETE_FILES_IN_RANGE_DURING_RESCALING,
configuredBackend.isRescalingUseDeleteFilesInRange());
checkBooleanWithBaseConf(
baseConfig,
INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE,
configuredBackend.getIncrementalRestoreAsyncCompactAfterRescale());
}

private void checkBooleanWithBaseConf(
Configuration testConfig, ConfigOption<Boolean> option, boolean value) {
assertEquals(testConfig.getOptional(option).orElse(!option.defaultValue()), value);
}

private void runStateUpdates() throws Exception {
for (int i = 50; i < 150; ++i) {
if (i % 10 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,31 @@ public void testConfigureUseIngestDB() {
assertTrue(rocksDBStateBackend.getUseIngestDbRestoreMode());
}

@Test
public void testDefaultUseDeleteFilesInRange() {
EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
assertEquals(
RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING
.defaultValue(),
rocksDBStateBackend.isRescalingUseDeleteFilesInRange());
}

@Test
public void testConfigureUseFilesInRange() {
EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
Configuration configuration = new Configuration();
configuration.set(
RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING,
!RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING
.defaultValue());
rocksDBStateBackend =
rocksDBStateBackend.configure(configuration, getClass().getClassLoader());
assertEquals(
!RocksDBConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING
.defaultValue(),
rocksDBStateBackend.isRescalingUseDeleteFilesInRange());
}

@Test
public void testDefaultIncrementalRestoreInstanceBufferSize() {
EmbeddedRocksDBStateBackend rocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
Expand Down

0 comments on commit 5be8b3e

Please sign in to comment.