Skip to content

Commit

Permalink
[FLINK-36068][runtime] Support scheduling based on adaptive job graph.
Browse files Browse the repository at this point in the history
  • Loading branch information
JunRuiLee committed Nov 16, 2024
1 parent c715d60 commit 41d8672
Show file tree
Hide file tree
Showing 40 changed files with 1,743 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@
import org.apache.flink.runtime.operators.coordination.CoordinatorStore;
import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
import org.apache.flink.runtime.scheduler.InternalFailuresListener;
import org.apache.flink.runtime.scheduler.SsgNetworkMemoryCalculationUtils;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology;
import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
Expand Down Expand Up @@ -107,11 +109,13 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -251,7 +255,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG

private final VertexAttemptNumberStore initialAttemptCounts;

private final VertexParallelismStore parallelismStore;
private VertexParallelismStore parallelismStore;

// ------ Fields that are relevant to the execution and need to be cleared before archiving
// -------
Expand Down Expand Up @@ -306,6 +310,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG

private final List<JobStatusChangedListener> jobStatusChangedListeners;

private final StreamGraphSchedulingContext streamGraphSchedulingContext;

// --------------------------------------------------------------------------------------------
// Constructors
// --------------------------------------------------------------------------------------------
Expand All @@ -332,7 +338,8 @@ public DefaultExecutionGraph(
List<JobStatusHook> jobStatusHooks,
MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory,
List<JobStatusChangedListener> jobStatusChangedListeners) {
List<JobStatusChangedListener> jobStatusChangedListeners,
StreamGraphSchedulingContext streamGraphSchedulingContext) {

this.jobType = jobType;
this.executionGraphId = new ExecutionGraphID();
Expand Down Expand Up @@ -405,6 +412,8 @@ public DefaultExecutionGraph(

this.jobStatusChangedListeners = checkNotNull(jobStatusChangedListeners);

this.streamGraphSchedulingContext = checkNotNull(streamGraphSchedulingContext);

LOG.info(
"Created execution graph {} for job {}.",
executionGraphId,
Expand Down Expand Up @@ -835,7 +844,7 @@ public void setInternalTaskFailuresListener(

@Override
public void notifyNewlyInitializedJobVertices(List<ExecutionJobVertex> vertices) {
executionTopology.notifyExecutionGraphUpdated(this, vertices);
executionTopology.notifyExecutionGraphUpdatedWithInitializedJobVertices(this, vertices);
}

@Override
Expand Down Expand Up @@ -864,6 +873,39 @@ public void attachJobGraph(
partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology());
}

@Override
public void addNewJobVertices(
List<JobVertex> topologicallySortedNewlyJobVertices,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
VertexParallelismStore newVerticesParallelismStore)
throws JobException {
// sanity check
Set<JobVertexID> existingKeys = new HashSet<>(this.tasks.keySet());
Set<JobVertexID> newKeys =
topologicallySortedNewlyJobVertices.stream()
.map(JobVertex::getID)
.collect(Collectors.toSet());
newKeys.retainAll(existingKeys);
if (!newKeys.isEmpty()) {
throw new IllegalArgumentException(
"Unexpected JobVertices that have already been added: " + newKeys);
}

DefaultVertexParallelismStore vertexParallelismStore = new DefaultVertexParallelismStore();
vertexParallelismStore.mergeParallelismStore(this.parallelismStore);
vertexParallelismStore.mergeParallelismStore(newVerticesParallelismStore);
this.parallelismStore = vertexParallelismStore;

attachJobVertices(topologicallySortedNewlyJobVertices, jobManagerJobMetricGroup);

List<JobVertex> topologicallySortedJobVertices =
IterableUtils.toStream(getVerticesTopologically())
.map(ExecutionJobVertex::getJobVertex)
.collect(Collectors.toList());
executionTopology.notifyExecutionGraphUpdatedWithNewlyJobVertices(
topologicallySortedJobVertices);
}

/** Attach job vertices without initializing them. */
private void attachJobVertices(
List<JobVertex> topologicallySorted, JobManagerJobMetricGroup jobManagerJobMetricGroup)
Expand Down Expand Up @@ -926,7 +968,9 @@ public void initializeJobVertex(
executionHistorySizeLimit,
rpcTimeout,
createTimestamp,
this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()));
this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()),
streamGraphSchedulingContext::getParallelism,
streamGraphSchedulingContext::getMaxParallelismOrDefault);

ejv.connectToPredecessors(this.intermediateResults);

Expand Down Expand Up @@ -1196,7 +1240,8 @@ public void initFailureCause(Throwable t, long timestamp) {
public void jobVertexFinished() {
assertRunningInJobMasterMainThread();
final int numFinished = ++numFinishedJobVertices;
if (numFinished == numJobVerticesTotal) {
if (numFinished == numJobVerticesTotal
&& streamGraphSchedulingContext.getPendingOperatorCount() == 0) {
FutureUtils.assertNoException(
waitForAllExecutionsTermination().thenAccept(ignored -> jobFinished()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
Expand Down Expand Up @@ -98,7 +99,8 @@ public static DefaultExecutionGraph buildGraph(
ExecutionJobVertex.Factory executionJobVertexFactory,
MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
boolean nonFinishedHybridPartitionShouldBeUnknown,
JobManagerJobMetricGroup jobManagerJobMetricGroup)
JobManagerJobMetricGroup jobManagerJobMetricGroup,
StreamGraphSchedulingContext streamGraphSchedulingContext)
throws JobExecutionException, JobException {

checkNotNull(jobGraph, "job graph cannot be null");
Expand Down Expand Up @@ -170,7 +172,8 @@ public static DefaultExecutionGraph buildGraph(
jobGraph.getJobStatusHooks(),
markPartitionFinishedStrategy,
taskDeploymentDescriptorFactory,
jobStatusChangedListeners);
jobStatusChangedListeners,
streamGraphSchedulingContext);

// set the basic properties

Expand All @@ -182,42 +185,8 @@ public static DefaultExecutionGraph buildGraph(
executionGraph.setJsonPlan("{}");
}

// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits

final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);

for (JobVertex vertex : jobGraph.getVertices()) {
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(
jobId,
"The vertex "
+ vertex.getID()
+ " ("
+ vertex.getName()
+ ") has no invokable class.");
}

try {
vertex.initializeOnMaster(
new SimpleInitializeOnMasterContext(
classLoader,
vertexParallelismStore
.getParallelismInfo(vertex.getID())
.getParallelism()));
} catch (Throwable t) {
throw new JobExecutionException(
jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
t);
}
}

log.info(
"Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
initJobVerticesOnMaster(
jobGraph.getVertices(), classLoader, log, vertexParallelismStore, jobName, jobId);

// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
Expand Down Expand Up @@ -356,6 +325,52 @@ public static DefaultExecutionGraph buildGraph(
return executionGraph;
}

public static void initJobVerticesOnMaster(
Iterable<JobVertex> jobVertices,
ClassLoader classLoader,
Logger log,
VertexParallelismStore vertexParallelismStore,
String jobName,
JobID jobId)
throws JobExecutionException {
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits

final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);

for (JobVertex vertex : jobVertices) {
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(
jobId,
"The vertex "
+ vertex.getID()
+ " ("
+ vertex.getName()
+ ") has no invokable class.");
}

try {
vertex.initializeOnMaster(
new SimpleInitializeOnMasterContext(
classLoader,
vertexParallelismStore
.getParallelismInfo(vertex.getID())
.getParallelism()));
} catch (Throwable t) {
throw new JobExecutionException(
jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
t);
}
}

log.info(
"Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
}

public static boolean isCheckpointingEnabled(JobGraph jobGraph) {
return jobGraph.getCheckpointingSettings() != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.scheduler.InternalFailuresListener;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
Expand Down Expand Up @@ -248,6 +249,23 @@ void initializeJobVertex(
*/
void notifyNewlyInitializedJobVertices(List<ExecutionJobVertex> vertices);

/**
* Adds new job vertices to the execution graph based on the provided list of topologically
* sorted job vertices.
*
* @param topologicallySortedNewlyJobVertices a list of job vertices that are to be added,
* defined in topological order.
* @param jobManagerJobMetricGroup the metric group associated with the job manager for
* monitoring and metrics collection.
* @param newVerticesParallelismStore a store that maintains parallelism information for the
* newly added job vertices.
*/
void addNewJobVertices(
List<JobVertex> topologicallySortedNewlyJobVertices,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
VertexParallelismStore newVerticesParallelismStore)
throws JobException;

Optional<String> findVertexWithAttempt(final ExecutionAttemptID attemptId);

Optional<AccessExecution> findExecution(final ExecutionAttemptID attemptId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,34 @@ public ExecutionJobVertex(
}
}

@VisibleForTesting
protected void initialize(
int executionHistorySizeLimit,
Duration timeout,
long createTimestamp,
SubtaskAttemptNumberStore initialAttemptCounts)
throws JobException {
initialize(
executionHistorySizeLimit,
timeout,
createTimestamp,
initialAttemptCounts,
i -> {
throw new UnsupportedOperationException();
},
i -> {
throw new UnsupportedOperationException();
});
}

protected void initialize(
int executionHistorySizeLimit,
Duration timeout,
long createTimestamp,
SubtaskAttemptNumberStore initialAttemptCounts,
Function<Integer, Integer> streamNodeParallelismRetriever,
Function<Integer, Integer> streamNodeMaxParallelismRetriever)
throws JobException {

checkState(parallelismInfo.getParallelism() > 0);
checkState(!isInitialized());
Expand All @@ -211,7 +233,9 @@ protected void initialize(
result,
this,
this.parallelismInfo.getParallelism(),
result.getResultType());
result.getResultType(),
streamNodeParallelismRetriever,
streamNodeMaxParallelismRetriever);
}

// create all task vertices
Expand Down
Loading

0 comments on commit 41d8672

Please sign in to comment.