From 3c04316da8b2f1e2e2f602c17146fbf5220fe390 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Wed, 28 Feb 2024 15:23:11 +0100 Subject: [PATCH] [BP-1.19][FLINK-34274][runtime] Implicitly disable resource wait timeout for AdaptiveSchedulerTest (#24400) --- .../adaptive/AdaptiveSchedulerBuilder.java | 7 +++++++ .../adaptive/AdaptiveSchedulerTest.java | 18 +++++++++--------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java index b814144ad9632..fca3c7a854833 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java @@ -52,6 +52,7 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; /** Builder for {@link AdaptiveScheduler}. */ public class AdaptiveSchedulerBuilder { @@ -117,6 +118,12 @@ public AdaptiveSchedulerBuilder setJobMasterConfiguration( return this; } + public AdaptiveSchedulerBuilder withConfigurationOverride( + Function modifyFn) { + this.jobMasterConfiguration = modifyFn.apply(jobMasterConfiguration); + return this; + } + public AdaptiveSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) { this.userCodeLoader = userCodeLoader; return this; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index e854cd7d571b9..979d16a29a037 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -1216,6 +1216,13 @@ void testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure( final AdaptiveScheduler scheduler = prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool) + .withConfigurationOverride( + conf -> { + conf.set( + JobManagerOptions.RESOURCE_WAIT_TIMEOUT, + Duration.ofMillis(1)); + return conf; + }) .setJobResourceRequirements(initialJobResourceRequirements) .build(); @@ -1256,14 +1263,6 @@ void testRequirementLowerBoundDecreaseAfterResourceScarcityBelowAvailableSlots() startJobWithSlotsMatchingParallelism( scheduler, declarativeSlotPool, taskManagerGateway, availableSlots); - // at this point we'd ideally check that the job is stuck in WaitingForResources, but we - // can't differentiate between waiting due to the minimum requirements not being fulfilled - // and the resource timeout not being elapsed - // We just continue here, as the following tests validate that the lower bound can prevent - // a job from running: - // - #testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure() - // - #testRequirementLowerBoundIncreaseBeyondCurrentParallelismAttemptsImmediateRescale() - // unlock job by decreasing the parallelism JobResourceRequirements newJobResourceRequirements = createRequirementsWithLowerAndUpperParallelism(availableSlots, PARALLELISM); @@ -1275,7 +1274,8 @@ void testRequirementLowerBoundDecreaseAfterResourceScarcityBelowAvailableSlots() private static Configuration createConfigurationWithNoTimeouts() { return new Configuration() - .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L)) + .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(-1L)) + .set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(1L)) .set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(1L)); }