diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index 58ddd3796524e..0f82aa2a6310b 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -124,15 +124,17 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio } // randomize ITTests for enabling state change log - if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { - if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) { + if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) { + if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true); - miniCluster.overrideRestoreModeForChangelogStateBackend(); + } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) { + randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false); } - } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) { - boolean enabled = - randomize(conf, StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true, false); - if (enabled) { + } + + // randomize periodic materialization when enabling state change log + if (conf.get(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) { + if (!conf.contains(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED)) { // More situations about enabling periodic materialization should be tested randomize( conf, @@ -141,6 +143,8 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio true, true, false); + } + if (!conf.contains(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL)) { randomize( conf, StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, @@ -148,8 +152,8 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio Duration.ofMillis(500), Duration.ofSeconds(1), Duration.ofSeconds(5)); - miniCluster.overrideRestoreModeForChangelogStateBackend(); } + miniCluster.overrideRestoreModeForChangelogStateBackend(); } }