Skip to content

Commit

Permalink
[BP-1.19][FLINK-34274][runtime] Implicitly disable resource wait time…
Browse files Browse the repository at this point in the history
…out for AdaptiveSchedulerTest (#24400)
  • Loading branch information
XComp authored Feb 28, 2024
1 parent 62d1b8f commit 3c04316
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;

/** Builder for {@link AdaptiveScheduler}. */
public class AdaptiveSchedulerBuilder {
Expand Down Expand Up @@ -117,6 +118,12 @@ public AdaptiveSchedulerBuilder setJobMasterConfiguration(
return this;
}

public AdaptiveSchedulerBuilder withConfigurationOverride(
Function<Configuration, Configuration> modifyFn) {
this.jobMasterConfiguration = modifyFn.apply(jobMasterConfiguration);
return this;
}

public AdaptiveSchedulerBuilder setUserCodeLoader(final ClassLoader userCodeLoader) {
this.userCodeLoader = userCodeLoader;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,13 @@ void testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure(

final AdaptiveScheduler scheduler =
prepareSchedulerWithNoTimeouts(jobGraph, declarativeSlotPool)
.withConfigurationOverride(
conf -> {
conf.set(
JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1));
return conf;
})
.setJobResourceRequirements(initialJobResourceRequirements)
.build();

Expand Down Expand Up @@ -1256,14 +1263,6 @@ void testRequirementLowerBoundDecreaseAfterResourceScarcityBelowAvailableSlots()
startJobWithSlotsMatchingParallelism(
scheduler, declarativeSlotPool, taskManagerGateway, availableSlots);

// at this point we'd ideally check that the job is stuck in WaitingForResources, but we
// can't differentiate between waiting due to the minimum requirements not being fulfilled
// and the resource timeout not being elapsed
// We just continue here, as the following tests validate that the lower bound can prevent
// a job from running:
// - #testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure()
// - #testRequirementLowerBoundIncreaseBeyondCurrentParallelismAttemptsImmediateRescale()

// unlock job by decreasing the parallelism
JobResourceRequirements newJobResourceRequirements =
createRequirementsWithLowerAndUpperParallelism(availableSlots, PARALLELISM);
Expand All @@ -1275,7 +1274,8 @@ void testRequirementLowerBoundDecreaseAfterResourceScarcityBelowAvailableSlots()

private static Configuration createConfigurationWithNoTimeouts() {
return new Configuration()
.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(1L))
.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT, Duration.ofMillis(-1L))
.set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT, Duration.ofMillis(1L))
.set(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN, Duration.ofMillis(1L));
}

Expand Down

0 comments on commit 3c04316

Please sign in to comment.