diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index bf7df41b339b3d..ae43f71f990cd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -67,11 +67,13 @@ import org.apache.flink.runtime.operators.coordination.CoordinatorStore; import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl; import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore; import org.apache.flink.runtime.scheduler.InternalFailuresListener; import org.apache.flink.runtime.scheduler.SsgNetworkMemoryCalculationUtils; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; @@ -107,11 +109,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -251,7 +255,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG private final VertexAttemptNumberStore initialAttemptCounts; - private final VertexParallelismStore parallelismStore; + private VertexParallelismStore parallelismStore; // ------ Fields that are relevant to the execution and need to be cleared before archiving // ------- @@ -306,6 +310,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG private final List jobStatusChangedListeners; + private final StreamGraphSchedulingContext streamGraphSchedulingContext; + // -------------------------------------------------------------------------------------------- // Constructors // -------------------------------------------------------------------------------------------- @@ -332,7 +338,8 @@ public DefaultExecutionGraph( List jobStatusHooks, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory, - List jobStatusChangedListeners) { + List jobStatusChangedListeners, + StreamGraphSchedulingContext streamGraphSchedulingContext) { this.jobType = jobType; this.executionGraphId = new ExecutionGraphID(); @@ -405,6 +412,8 @@ public DefaultExecutionGraph( this.jobStatusChangedListeners = checkNotNull(jobStatusChangedListeners); + this.streamGraphSchedulingContext = checkNotNull(streamGraphSchedulingContext); + LOG.info( "Created execution graph {} for job {}.", executionGraphId, @@ -835,7 +844,7 @@ public void setInternalTaskFailuresListener( @Override public void notifyNewlyInitializedJobVertices(List vertices) { - executionTopology.notifyExecutionGraphUpdated(this, vertices); + executionTopology.notifyExecutionGraphUpdatedWithInitializedJobVertices(this, vertices); } @Override @@ -864,6 +873,39 @@ public void attachJobGraph( partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology()); } + @Override + public void addNewJobVertices( + List topologicallySortedNewlyJobVertices, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + VertexParallelismStore newVerticesParallelismStore) + throws JobException { + // sanity check + Set existingKeys = new HashSet<>(this.tasks.keySet()); + Set newKeys = + topologicallySortedNewlyJobVertices.stream() + .map(JobVertex::getID) + .collect(Collectors.toSet()); + newKeys.retainAll(existingKeys); + if (!newKeys.isEmpty()) { + throw new IllegalArgumentException( + "Unexpected JobVertices that have already been added: " + newKeys); + } + + DefaultVertexParallelismStore vertexParallelismStore = new DefaultVertexParallelismStore(); + vertexParallelismStore.mergeParallelismStore(this.parallelismStore); + vertexParallelismStore.mergeParallelismStore(newVerticesParallelismStore); + this.parallelismStore = vertexParallelismStore; + + attachJobVertices(topologicallySortedNewlyJobVertices, jobManagerJobMetricGroup); + + List topologicallySortedJobVertices = + IterableUtils.toStream(getVerticesTopologically()) + .map(ExecutionJobVertex::getJobVertex) + .collect(Collectors.toList()); + executionTopology.notifyExecutionGraphUpdatedWithNewlyJobVertices( + topologicallySortedJobVertices); + } + /** Attach job vertices without initializing them. */ private void attachJobVertices( List topologicallySorted, JobManagerJobMetricGroup jobManagerJobMetricGroup) @@ -926,7 +968,9 @@ public void initializeJobVertex( executionHistorySizeLimit, rpcTimeout, createTimestamp, - this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId())); + this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()), + streamGraphSchedulingContext::getParallelism, + streamGraphSchedulingContext::getMaxParallelismOrDefault); ejv.connectToPredecessors(this.intermediateResults); @@ -1196,7 +1240,8 @@ public void initFailureCause(Throwable t, long timestamp) { public void jobVertexFinished() { assertRunningInJobMasterMainThread(); final int numFinished = ++numFinishedJobVertices; - if (numFinished == numJobVerticesTotal) { + if (numFinished == numJobVerticesTotal + && streamGraphSchedulingContext.getPendingOperatorCount() == 0) { FutureUtils.assertNoException( waitForAllExecutionsTermination().thenAccept(ignored -> jobFinished())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index ca5e6351cca28a..71fbb3fe845387 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageLoader; @@ -98,7 +99,8 @@ public static DefaultExecutionGraph buildGraph( ExecutionJobVertex.Factory executionJobVertexFactory, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, boolean nonFinishedHybridPartitionShouldBeUnknown, - JobManagerJobMetricGroup jobManagerJobMetricGroup) + JobManagerJobMetricGroup jobManagerJobMetricGroup, + StreamGraphSchedulingContext streamGraphSchedulingContext) throws JobExecutionException, JobException { checkNotNull(jobGraph, "job graph cannot be null"); @@ -170,7 +172,8 @@ public static DefaultExecutionGraph buildGraph( jobGraph.getJobStatusHooks(), markPartitionFinishedStrategy, taskDeploymentDescriptorFactory, - jobStatusChangedListeners); + jobStatusChangedListeners, + streamGraphSchedulingContext); // set the basic properties @@ -182,42 +185,8 @@ public static DefaultExecutionGraph buildGraph( executionGraph.setJsonPlan("{}"); } - // initialize the vertices that have a master initialization hook - // file output formats create directories here, input formats create splits - - final long initMasterStart = System.nanoTime(); - log.info("Running initialization on master for job {} ({}).", jobName, jobId); - - for (JobVertex vertex : jobGraph.getVertices()) { - String executableClass = vertex.getInvokableClassName(); - if (executableClass == null || executableClass.isEmpty()) { - throw new JobSubmissionException( - jobId, - "The vertex " - + vertex.getID() - + " (" - + vertex.getName() - + ") has no invokable class."); - } - - try { - vertex.initializeOnMaster( - new SimpleInitializeOnMasterContext( - classLoader, - vertexParallelismStore - .getParallelismInfo(vertex.getID()) - .getParallelism())); - } catch (Throwable t) { - throw new JobExecutionException( - jobId, - "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), - t); - } - } - - log.info( - "Successfully ran initialization on master in {} ms.", - (System.nanoTime() - initMasterStart) / 1_000_000); + initJobVerticesOnMaster( + jobGraph.getVertices(), classLoader, log, vertexParallelismStore, jobName, jobId); // topologically sort the job vertices and attach the graph to the existing one List sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); @@ -356,6 +325,52 @@ public static DefaultExecutionGraph buildGraph( return executionGraph; } + public static void initJobVerticesOnMaster( + Iterable jobVertices, + ClassLoader classLoader, + Logger log, + VertexParallelismStore vertexParallelismStore, + String jobName, + JobID jobId) + throws JobExecutionException { + // initialize the vertices that have a master initialization hook + // file output formats create directories here, input formats create splits + + final long initMasterStart = System.nanoTime(); + log.info("Running initialization on master for job {} ({}).", jobName, jobId); + + for (JobVertex vertex : jobVertices) { + String executableClass = vertex.getInvokableClassName(); + if (executableClass == null || executableClass.isEmpty()) { + throw new JobSubmissionException( + jobId, + "The vertex " + + vertex.getID() + + " (" + + vertex.getName() + + ") has no invokable class."); + } + + try { + vertex.initializeOnMaster( + new SimpleInitializeOnMasterContext( + classLoader, + vertexParallelismStore + .getParallelismInfo(vertex.getID()) + .getParallelism())); + } catch (Throwable t) { + throw new JobExecutionException( + jobId, + "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), + t); + } + } + + log.info( + "Successfully ran initialization on master in {} ms.", + (System.nanoTime() - initMasterStart) / 1_000_000); + } + public static boolean isCheckpointingEnabled(JobGraph jobGraph) { return jobGraph.getCheckpointingSettings() != null; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 2f39f939fffbaa..9f48b908d374ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.scheduler.InternalFailuresListener; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; @@ -248,6 +249,23 @@ void initializeJobVertex( */ void notifyNewlyInitializedJobVertices(List vertices); + /** + * Adds new job vertices to the execution graph based on the provided list of topologically + * sorted job vertices. + * + * @param topologicallySortedNewlyJobVertices a list of job vertices that are to be added, + * defined in topological order. + * @param jobManagerJobMetricGroup the metric group associated with the job manager for + * monitoring and metrics collection. + * @param newVerticesParallelismStore a store that maintains parallelism information for the + * newly added job vertices. + */ + void addNewJobVertices( + List topologicallySortedNewlyJobVertices, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + VertexParallelismStore newVerticesParallelismStore) + throws JobException; + Optional findVertexWithAttempt(final ExecutionAttemptID attemptId); Optional findExecution(final ExecutionAttemptID attemptId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 2521914c872dce..e035ebb91efcf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -185,12 +185,34 @@ public ExecutionJobVertex( } } + @VisibleForTesting protected void initialize( int executionHistorySizeLimit, Duration timeout, long createTimestamp, SubtaskAttemptNumberStore initialAttemptCounts) throws JobException { + initialize( + executionHistorySizeLimit, + timeout, + createTimestamp, + initialAttemptCounts, + i -> { + throw new UnsupportedOperationException(); + }, + i -> { + throw new UnsupportedOperationException(); + }); + } + + protected void initialize( + int executionHistorySizeLimit, + Duration timeout, + long createTimestamp, + SubtaskAttemptNumberStore initialAttemptCounts, + Function streamNodeParallelismRetriever, + Function streamNodeMaxParallelismRetriever) + throws JobException { checkState(parallelismInfo.getParallelism() > 0); checkState(!isInitialized()); @@ -211,7 +233,9 @@ protected void initialize( result, this, this.parallelismInfo.getParallelism(), - result.getResultType()); + result.getResultType(), + streamNodeParallelismRetriever, + streamNodeMaxParallelismRetriever); } // create all task vertices diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index dc7dac2e375c42..042ea658002bde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -32,12 +32,15 @@ import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.streaming.api.graph.StreamEdge; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -71,14 +74,16 @@ public class IntermediateResult { private final Map shuffleDescriptorCache; - /** All consumer job vertex ids of this dataset. */ - private final List consumerVertices = new ArrayList<>(); + private final Function consumerStreamNodeParallelismRetriever; + private final Function consumerStreamNodeMaxParallelismRetriever; public IntermediateResult( IntermediateDataSet intermediateDataSet, ExecutionJobVertex producer, int numParallelProducers, - ResultPartitionType resultType) { + ResultPartitionType resultType, + Function consumerStreamNodeParallelismRetriever, + Function consumerStreamNodeMaxParallelismRetriever) { this.intermediateDataSet = checkNotNull(intermediateDataSet); this.id = checkNotNull(intermediateDataSet.getId()); @@ -102,9 +107,15 @@ public IntermediateResult( this.shuffleDescriptorCache = new HashMap<>(); - intermediateDataSet - .getConsumers() - .forEach(jobEdge -> consumerVertices.add(jobEdge.getTarget().getID())); + this.consumerStreamNodeParallelismRetriever = + checkNotNull(consumerStreamNodeParallelismRetriever); + + this.consumerStreamNodeMaxParallelismRetriever = + checkNotNull(consumerStreamNodeMaxParallelismRetriever); + } + + public boolean isAllConsumerVerticesCreated() { + return intermediateDataSet.isAllConsumerVerticesCreated(); } public void setPartition(int partitionNumber, IntermediateResultPartition partition) { @@ -135,7 +146,9 @@ public IntermediateResultPartition[] getPartitions() { } public List getConsumerVertices() { - return consumerVertices; + return intermediateDataSet.getConsumers().stream() + .map(jobEdge -> jobEdge.getTarget().getID()) + .collect(Collectors.toList()); } /** @@ -182,13 +195,23 @@ int getNumParallelProducers() { */ int getConsumersParallelism() { List consumers = intermediateDataSet.getConsumers(); - checkState(!consumers.isEmpty()); - InternalExecutionGraphAccessor graph = getProducer().getGraph(); - int consumersParallelism = - graph.getJobVertex(consumers.get(0).getTarget().getID()).getParallelism(); - if (consumers.size() == 1) { - return consumersParallelism; + Set consumerParallelisms; + if (consumers.isEmpty()) { + List consumerStreamEdges = intermediateDataSet.getOutputStreamEdges(); + checkState(!consumerStreamEdges.isEmpty()); + consumerParallelisms = + consumerStreamEdges.stream() + .map(StreamEdge::getTargetId) + .map(consumerStreamNodeParallelismRetriever) + .collect(Collectors.toSet()); + } else { + InternalExecutionGraphAccessor graph = getProducer().getGraph(); + consumerParallelisms = + getConsumerVertices().stream() + .map(graph::getJobVertex) + .map(ExecutionJobVertex::getParallelism) + .collect(Collectors.toSet()); } // sanity check, all consumer vertices must have the same parallelism: @@ -196,32 +219,35 @@ int getConsumersParallelism() { // graph), the parallelisms will all be -1 (parallelism not decided yet) // 2. for vertices that are initially assigned a parallelism, the parallelisms must be the // same, which is guaranteed at compilation phase - for (JobVertexID jobVertexID : consumerVertices) { - checkState( - consumersParallelism == graph.getJobVertex(jobVertexID).getParallelism(), - "Consumers must have the same parallelism."); - } - return consumersParallelism; + checkState(consumerParallelisms.size() == 1); + return consumerParallelisms.iterator().next(); } int getConsumersMaxParallelism() { List consumers = intermediateDataSet.getConsumers(); - checkState(!consumers.isEmpty()); - InternalExecutionGraphAccessor graph = getProducer().getGraph(); - int consumersMaxParallelism = - graph.getJobVertex(consumers.get(0).getTarget().getID()).getMaxParallelism(); - if (consumers.size() == 1) { - return consumersMaxParallelism; + Set consumerMaxParallelisms; + if (consumers.isEmpty()) { + List consumerStreamEdges = intermediateDataSet.getOutputStreamEdges(); + checkState(!consumerStreamEdges.isEmpty()); + consumerMaxParallelisms = + consumerStreamEdges.stream() + .map(StreamEdge::getTargetId) + .map(consumerStreamNodeMaxParallelismRetriever) + .collect(Collectors.toSet()); + } else { + InternalExecutionGraphAccessor graph = getProducer().getGraph(); + consumerMaxParallelisms = + getConsumerVertices().stream() + .map(graph::getJobVertex) + .map(ExecutionJobVertex::getMaxParallelism) + .collect(Collectors.toSet()); } - // sanity check, all consumer vertices must have the same max parallelism - for (JobVertexID jobVertexID : consumerVertices) { - checkState( - consumersMaxParallelism == graph.getJobVertex(jobVertexID).getMaxParallelism(), - "Consumers must have the same max parallelism."); - } - return consumersMaxParallelism; + checkState( + consumerMaxParallelisms.size() == 1, + "Consumers must have the same max parallelism."); + return consumerMaxParallelisms.iterator().next(); } public DistributionPattern getConsumingDistributionPattern() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 5f0c07165fea80..14cbe3cf7e16ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -88,13 +88,18 @@ public boolean canBeReleased() { != edgeManager.getNumberOfConsumedPartitionGroupsById(partitionId)) { return false; } + + // for dynamic graph, if any consumer vertex is still not initialized or not transfer to + // job vertex, this result partition can not be released + if (!totalResult.isAllConsumerVerticesCreated()) { + return false; + } for (JobVertexID jobVertexId : totalResult.getConsumerVertices()) { - // for dynamic graph, if any consumer vertex is still not initialized, this result - // partition can not be released if (!producer.getExecutionGraphAccessor().getJobVertex(jobVertexId).isInitialized()) { return false; } } + return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java index fd756a242f593b..3c908f72dfce72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java @@ -76,6 +76,10 @@ public List getConsumers() { return this.consumers; } + public boolean isAllConsumerVerticesCreated() { + return outputStreamEdges.isEmpty() || outputStreamEdges.size() == consumers.size(); + } + public boolean isBroadcast() { return isBroadcast; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/ExecutionJobVertexFinishedEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/ExecutionJobVertexFinishedEvent.java new file mode 100644 index 00000000000000..c7f7529d576e28 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/ExecutionJobVertexFinishedEvent.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.event; + +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** This class is used to record the completion info of {@link ExecutionJobVertex}. */ +public class ExecutionJobVertexFinishedEvent implements JobEvent { + + private final JobVertexID vertexId; + + private final Map resultInfo; + + public ExecutionJobVertexFinishedEvent( + JobVertexID vertexId, Map resultInfo) { + this.vertexId = checkNotNull(vertexId); + this.resultInfo = checkNotNull(resultInfo); + } + + public JobVertexID getVertexId() { + return vertexId; + } + + public Map getResultInfo() { + return resultInfo; + } + + @Override + public String toString() { + return "ExecutionJobVertexFinishedEvent{" + + "vertexId=" + + vertexId + + ", resultInfos=" + + resultInfo + + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/JobEvents.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/JobEvents.java index 593b759f4d1cac..45def8abdcde29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/JobEvents.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/JobEvents.java @@ -33,6 +33,7 @@ public class JobEvents { jobEventTypeIdMapping.put(ExecutionVertexFinishedEvent.class, 1); jobEventTypeIdMapping.put(ExecutionVertexResetEvent.class, 2); jobEventTypeIdMapping.put(ExecutionJobVertexInitializedEvent.class, 3); + jobEventTypeIdMapping.put(ExecutionJobVertexFinishedEvent.class, 4); } public static int getTypeID(Class clazz) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index 7a97a05c12ed8f..a83af3821fe897 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.slf4j.Logger; @@ -137,6 +138,7 @@ public ExecutionGraph createAndRestoreExecutionGraph( VertexParallelismStore vertexParallelismStore, ExecutionStateUpdateListener executionStateUpdateListener, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, + StreamGraphSchedulingContext streamGraphSchedulingContext, Logger log) throws Exception { ExecutionDeploymentListener executionDeploymentListener = @@ -175,7 +177,8 @@ public ExecutionGraph createAndRestoreExecutionGraph( executionJobVertexFactory, markPartitionFinishedStrategy, nonFinishedHybridPartitionShouldBeUnknown, - jobManagerJobMetricGroup); + jobManagerJobMetricGroup, + streamGraphSchedulingContext); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index b1b6c2dddd9e74..3ebf387e3d5c25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; @@ -136,7 +137,8 @@ protected DefaultScheduler( final ShuffleMaster shuffleMaster, final Duration rpcTimeout, final VertexParallelismStore vertexParallelismStore, - final ExecutionDeployer.Factory executionDeployerFactory) + final ExecutionDeployer.Factory executionDeployerFactory, + StreamGraphSchedulingContext streamGraphSchedulingContext) throws Exception { super( @@ -152,7 +154,8 @@ protected DefaultScheduler( mainThreadExecutor, jobStatusListener, executionGraphFactory, - vertexParallelismStore); + vertexParallelismStore, + streamGraphSchedulingContext); this.log = log; @@ -255,6 +258,10 @@ protected void onTaskFinished(final Execution execution, final IOMetrics ioMetri schedulingStrategy.onExecutionStateChange(executionVertexId, ExecutionState.FINISHED); } + protected ClassLoader getUserCodeLoader() { + return userCodeLoader; + } + @Override protected void onTaskFailed(final Execution execution) { checkState(execution.getState() == ExecutionState.FAILED); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index e1eccf509bb9df..20878b082681ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.streaming.api.graph.ExecutionPlan; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -170,7 +171,8 @@ public SchedulerNG createInstance( shuffleMaster, rpcTimeout, computeVertexParallelismStore(jobGraph), - new DefaultExecutionDeployer.Factory()); + new DefaultExecutionDeployer.Factory(), + DummyStreamGraphSchedulingContext.INSTANCE); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java index 19cc64c536cb5b..905d105c8e4f88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -87,4 +88,14 @@ public VertexParallelismInformation getParallelismInfo(JobVertexID vertexId) { "No parallelism information set for vertex %s", vertexId))); } + + @Override + public Map getAllParallelismInfo() { + return Collections.unmodifiableMap(vertexToParallelismInfo); + } + + @Override + public void mergeParallelismStore(VertexParallelismStore parallelismStore) { + parallelismStore.getAllParallelismInfo().forEach(this::setParallelismInfo); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java index 0d3bf5c0926cb7..cb5d6c719a0cdc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy; import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.slf4j.Logger; @@ -52,6 +53,8 @@ public interface ExecutionGraphFactory { * @param executionStateUpdateListener listener for state transitions of the individual * executions * @param log log to use for logging + * @param StreamGraphSchedulingContext stream graph topology context that retrieve execution + * context details for adaptive batch jobs * @return restored {@link ExecutionGraph} * @throws Exception if the {@link ExecutionGraph} could not be created and restored */ @@ -67,6 +70,7 @@ ExecutionGraph createAndRestoreExecutionGraph( VertexParallelismStore vertexParallelismStore, ExecutionStateUpdateListener executionStateUpdateListener, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, + StreamGraphSchedulingContext streamGraphSchedulingContext, Logger log) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java index fc222474bf1216..ca68b2ec8e0216 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java @@ -29,4 +29,11 @@ public interface MutableVertexParallelismStore extends VertexParallelismStore { * @param info parallelism information for the given vertex */ void setParallelismInfo(JobVertexID vertexId, VertexParallelismInformation info); + + /** + * Merges the given parallelism store into the current store. + * + * @param parallelismStore The parallelism store to be merged. + */ + void mergeParallelismStore(VertexParallelismStore parallelismStore); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 2b001cc072e2ba..dcc3f4eb8a701a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -89,6 +89,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics; @@ -194,7 +195,8 @@ public SchedulerBase( final ComponentMainThreadExecutor mainThreadExecutor, final JobStatusListener jobStatusListener, final ExecutionGraphFactory executionGraphFactory, - final VertexParallelismStore vertexParallelismStore) + final VertexParallelismStore vertexParallelismStore, + StreamGraphSchedulingContext streamGraphSchedulingContext) throws Exception { this.log = checkNotNull(log); @@ -240,7 +242,8 @@ public SchedulerBase( initializationTimestamp, mainThreadExecutor, jobStatusListener, - vertexParallelismStore); + vertexParallelismStore, + streamGraphSchedulingContext); this.schedulingTopology = executionGraph.getSchedulingTopology(); @@ -299,8 +302,12 @@ private static int normalizeParallelism(int parallelism) { * @return the computed max parallelism */ public static int getDefaultMaxParallelism(JobVertex vertex) { + return getDefaultMaxParallelism(vertex.getParallelism()); + } + + public static int getDefaultMaxParallelism(int parallelism) { return KeyGroupRangeAssignment.computeDefaultMaxParallelism( - normalizeParallelism(vertex.getParallelism())); + normalizeParallelism(parallelism)); } public static VertexParallelismStore computeVertexParallelismStore( @@ -387,7 +394,8 @@ private ExecutionGraph createAndRestoreExecutionGraph( long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, - VertexParallelismStore vertexParallelismStore) + VertexParallelismStore vertexParallelismStore, + StreamGraphSchedulingContext streamGraphSchedulingContext) throws Exception { final ExecutionGraph newExecutionGraph = @@ -404,6 +412,7 @@ private ExecutionGraph createAndRestoreExecutionGraph( vertexParallelismStore, deploymentStateTimeMetrics, getMarkPartitionFinishedStrategy(), + streamGraphSchedulingContext, log); newExecutionGraph.setInternalTaskFailuresListener( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java index 8bd5adbcf314ec..d65589b8d93f82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java @@ -20,6 +20,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; +import java.util.Map; + /** * Contains the max parallelism per vertex, along with metadata about how these maxes were * calculated. @@ -33,4 +35,11 @@ public interface VertexParallelismStore { * @throws IllegalStateException if there is no parallelism information for the given vertex */ VertexParallelismInformation getParallelismInfo(JobVertexID vertexId); + + /** + * Gets a map of all vertex parallelism information. + * + * @return A map containing JobVertexID and corresponding VertexParallelismInformation. + */ + Map getAllParallelismInfo(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java index b99c49fbd52a03..c83bf96f24a4e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java @@ -85,8 +85,7 @@ public class DefaultExecutionTopology implements SchedulingTopology { private final Supplier> sortedExecutionVertexIds; - private final Map - logicalPipelinedRegionsByJobVertexId; + private Map logicalPipelinedRegionsByJobVertexId; /** Listeners that will be notified whenever the scheduling topology is updated. */ private final List schedulingTopologyListeners = new ArrayList<>(); @@ -163,12 +162,9 @@ public EdgeManager getEdgeManager() { return edgeManager; } - private static Map - computeLogicalPipelinedRegionsByJobVertexId(final ExecutionGraph executionGraph) { - List topologicallySortedJobVertices = - IterableUtils.toStream(executionGraph.getVerticesTopologically()) - .map(ExecutionJobVertex::getJobVertex) - .collect(Collectors.toList()); + public static Map + computeLogicalPipelinedRegionsByJobVertexId( + final List topologicallySortedJobVertices) { Iterable logicalPipelinedRegions = DefaultLogicalTopology.fromTopologicallySortedJobVertices( @@ -186,7 +182,14 @@ public EdgeManager getEdgeManager() { return logicalPipelinedRegionsByJobVertexId; } - public void notifyExecutionGraphUpdated( + public void notifyExecutionGraphUpdatedWithNewlyJobVertices( + List topologicallySortedJobVertices) { + this.logicalPipelinedRegionsByJobVertexId = + DefaultExecutionTopology.computeLogicalPipelinedRegionsByJobVertexId( + topologicallySortedJobVertices); + } + + public void notifyExecutionGraphUpdatedWithInitializedJobVertices( final DefaultExecutionGraph executionGraph, final List newlyInitializedJobVertices) { @@ -245,9 +248,12 @@ public static DefaultExecutionTopology fromExecutionGraph( .map(ExecutionVertex::getID) .collect(Collectors.toList()), edgeManager, - computeLogicalPipelinedRegionsByJobVertexId(executionGraph)); + computeLogicalPipelinedRegionsByJobVertexId( + IterableUtils.toStream(executionGraph.getVerticesTopologically()) + .map(ExecutionJobVertex::getJobVertex) + .collect(Collectors.toList()))); - schedulingTopology.notifyExecutionGraphUpdated( + schedulingTopology.notifyExecutionGraphUpdatedWithInitializedJobVertices( executionGraph, IterableUtils.toStream(executionGraph.getVerticesTopologically()) .filter(ExecutionJobVertex::isInitialized) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 040312dba9761f..fe099509a9b3f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -113,6 +113,7 @@ import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics; @@ -1436,6 +1437,7 @@ private ExecutionGraph createExecutionGraphAndRestoreState( // supports must be pipelined result partition, mark partition finish is // no need. rp -> false, + DummyStreamGraphSchedulingContext.INSTANCE, LOG); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index 204ed3428bfebf..f9d6054b488adc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; +import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -55,14 +56,14 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobEdge; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult; import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex; +import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.scheduler.DefaultExecutionDeployer; import org.apache.flink.runtime.scheduler.DefaultScheduler; @@ -111,14 +112,12 @@ * This scheduler decides the parallelism of JobVertex according to the data volume it consumes. A * dynamically built up ExecutionGraph is used for this purpose. */ -public class AdaptiveBatchScheduler extends DefaultScheduler { +public class AdaptiveBatchScheduler extends DefaultScheduler implements JobGraphUpdateListener { - private final DefaultLogicalTopology logicalTopology; + private DefaultLogicalTopology logicalTopology; private final VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider; - private final Map forwardGroupsByJobVertexId; - private final Map blockingResultInfos; private final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint; @@ -138,9 +137,13 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { private final BatchJobRecoveryHandler jobRecoveryHandler; + private final AdaptiveExecutionHandler adaptiveExecutionHandler; + + private final int defaultMaxParallelism; + public AdaptiveBatchScheduler( final Logger log, - final JobGraph jobGraph, + final AdaptiveExecutionHandler adaptiveExecutionHandler, final Executor ioExecutor, final Configuration jobMasterConfiguration, final Consumer startUpAction, @@ -166,13 +169,13 @@ public AdaptiveBatchScheduler( final int defaultMaxParallelism, final BlocklistOperations blocklistOperations, final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint, - final Map forwardGroupsByJobVertexId, - final BatchJobRecoveryHandler jobRecoveryHandler) + final BatchJobRecoveryHandler jobRecoveryHandler, + final StreamGraphSchedulingContext streamGraphSchedulingContext) throws Exception { super( log, - jobGraph, + adaptiveExecutionHandler.getJobGraph(), ioExecutor, jobMasterConfiguration, startUpAction, @@ -195,16 +198,21 @@ public AdaptiveBatchScheduler( shuffleMaster, rpcTimeout, computeVertexParallelismStoreForDynamicGraph( - jobGraph.getVertices(), defaultMaxParallelism), - new DefaultExecutionDeployer.Factory()); + adaptiveExecutionHandler.getJobGraph().getVertices(), + defaultMaxParallelism), + new DefaultExecutionDeployer.Factory(), + streamGraphSchedulingContext); - this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph); + this.adaptiveExecutionHandler = checkNotNull(adaptiveExecutionHandler); + adaptiveExecutionHandler.registerJobGraphUpdateListener(this); + + this.defaultMaxParallelism = defaultMaxParallelism; + + this.logicalTopology = DefaultLogicalTopology.fromJobGraph(getJobGraph()); this.vertexParallelismAndInputInfosDecider = checkNotNull(vertexParallelismAndInputInfosDecider); - this.forwardGroupsByJobVertexId = checkNotNull(forwardGroupsByJobVertexId); - this.blockingResultInfos = new HashMap<>(); this.hybridPartitionDataConsumeConstraint = hybridPartitionDataConsumeConstraint; @@ -241,6 +249,33 @@ private SpeculativeExecutionHandler createSpeculativeExecutionHandler( } } + @Override + public void onNewJobVerticesAdded(List newVertices, int pendingOperatorsCount) + throws Exception { + log.info("Received newly created job vertices: [{}]", newVertices); + + VertexParallelismStore vertexParallelismStore = + computeVertexParallelismStoreForDynamicGraph(newVertices, defaultMaxParallelism); + // 1. init vertex on master + DefaultExecutionGraphBuilder.initJobVerticesOnMaster( + newVertices, + getUserCodeLoader(), + log, + vertexParallelismStore, + getJobGraph().getName(), + getJobGraph().getJobID()); + + // 2. attach newly added job vertices + getExecutionGraph() + .addNewJobVertices(newVertices, jobManagerJobMetricGroup, vertexParallelismStore); + + // 3. update logical topology + logicalTopology = DefaultLogicalTopology.fromJobGraph(getJobGraph()); + + // 4. update json plan + getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(getJobGraph())); + } + @Override protected void startSchedulingInternal() { speculativeExecutionHandler.init( @@ -356,6 +391,8 @@ protected void onTaskFinished(final Execution execution, final IOMetrics ioMetri checkNotNull(ioMetrics); updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes()); + notifyJobVertexFinishedIfPossible(execution.getVertex().getJobVertex()); + ExecutionVertexVersion currentVersion = executionVertexVersioner.getExecutionVertexVersion(execution.getVertex().getID()); tryComputeSourceParallelismThenRunAsync( @@ -568,6 +605,17 @@ static CompletableFuture mergeDynamicParallelismFutures( (a, b) -> a.thenCombine(b, Math::max)); } + private void notifyJobVertexFinishedIfPossible(ExecutionJobVertex jobVertex) { + Optional> producedResultsInfo = + getProducedResultsInfo(jobVertex); + + producedResultsInfo.ifPresent( + resultInfo -> + adaptiveExecutionHandler.handleJobEvent( + new ExecutionJobVertexFinishedEvent( + jobVertex.getJobVertexId(), resultInfo))); + } + @VisibleForTesting public void initializeVerticesIfPossible() { final List newlyInitializedJobVertices = new ArrayList<>(); @@ -626,8 +674,8 @@ public void initializeVerticesIfPossible() { private ParallelismAndInputInfos tryDecideParallelismAndInputInfos( final ExecutionJobVertex jobVertex, List inputs) { int vertexInitialParallelism = jobVertex.getParallelism(); - JobVertexForwardGroup forwardGroup = - forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId()); + ForwardGroup forwardGroup = + adaptiveExecutionHandler.getForwardGroupByJobVertexId(jobVertex.getJobVertexId()); if (!jobVertex.isParallelismDecided() && forwardGroup != null) { checkState(!forwardGroup.isParallelismDecided()); } @@ -666,39 +714,38 @@ private ParallelismAndInputInfos tryDecideParallelismAndInputInfos( checkState(parallelismAndInputInfos.getParallelism() == vertexInitialParallelism); } - if (forwardGroup != null && !forwardGroup.isParallelismDecided()) { - forwardGroup.setParallelism(parallelismAndInputInfos.getParallelism()); - - // When the parallelism for a forward group is determined, we ensure that the - // parallelism for all job vertices within that group is also set. - // This approach ensures that each forward edge produces single subpartition. - // - // This setting is crucial because the Sink V2 committer relies on the interplay - // between the CommittableSummary and the CommittableWithLineage, which are sent by - // the upstream Sink V2 Writer. The committer expects to receive CommittableSummary - // before CommittableWithLineage. - // - // If the number of subpartitions produced by a forward edge is greater than one, - // the ordering of these elements received by the committer cannot be assured, which - // would break the assumption that CommittableSummary is received before - // CommittableWithLineage. - for (JobVertexID jobVertexId : forwardGroup.getJobVertexIds()) { - ExecutionJobVertex executionJobVertex = getExecutionJobVertex(jobVertexId); - if (!executionJobVertex.isParallelismDecided()) { - log.info( - "Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", - executionJobVertex.getName(), - executionJobVertex.getJobVertexId(), - parallelismAndInputInfos.getParallelism()); - changeJobVertexParallelism( - executionJobVertex, parallelismAndInputInfos.getParallelism()); - } else { - checkState( - parallelismAndInputInfos.getParallelism() - == executionJobVertex.getParallelism()); - } - } - } + // When the parallelism for a forward group is determined, we ensure that the + // parallelism for all job vertices or stream nodes within that group is also set. + // This approach ensures that each forward edge produces single subpartition. + // + // This setting is crucial because the Sink V2 committer relies on the interplay + // between the CommittableSummary and the CommittableWithLineage, which are sent by + // the upstream Sink V2 Writer. The committer expects to receive CommittableSummary + // before CommittableWithLineage. + // + // If the number of subpartitions produced by a forward edge is greater than one, + // the ordering of these elements received by the committer cannot be assured, which + // would break the assumption that CommittableSummary is received before + // CommittableWithLineage. + adaptiveExecutionHandler.updateForwardGroupParallelism( + jobVertex.getJobVertexId(), + parallelismAndInputInfos.getParallelism(), + (jobVertexId, newParallelism) -> { + ExecutionJobVertex executionJobVertex = getExecutionJobVertex(jobVertexId); + if (!executionJobVertex.isParallelismDecided()) { + log.info( + "Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", + executionJobVertex.getName(), + executionJobVertex.getJobVertexId(), + parallelismAndInputInfos.getParallelism()); + changeJobVertexParallelism( + executionJobVertex, parallelismAndInputInfos.getParallelism()); + } else { + checkState( + parallelismAndInputInfos.getParallelism() + == executionJobVertex.getParallelism()); + } + }); return parallelismAndInputInfos; } @@ -804,6 +851,26 @@ private Optional> tryGetConsumedResultsInfo( return Optional.of(consumableResultInfo); } + private Optional> getProducedResultsInfo( + final ExecutionJobVertex jobVertex) { + if (!jobVertex.isFinished()) { + return Optional.empty(); + } + + Map producedResultInfo = new HashMap<>(); + + DefaultLogicalVertex logicalVertex = logicalTopology.getVertex(jobVertex.getJobVertexId()); + Iterable producedResults = logicalVertex.getProducedResults(); + + for (DefaultLogicalResult producedResult : producedResults) { + BlockingResultInfo resultInfo = + checkNotNull(blockingResultInfos.get(producedResult.getId())); + producedResultInfo.put(producedResult.getId(), resultInfo); + } + + return Optional.of(producedResultInfo); + } + private boolean canInitialize(final ExecutionJobVertex jobVertex) { if (jobVertex.isInitialized() || !jobVertex.isParallelismDecided()) { return false; @@ -858,16 +925,18 @@ public static VertexParallelismStore computeVertexParallelismStoreForDynamicGrap // global default max parallelism. return computeVertexParallelismStore( vertices, - v -> { - if (v.getParallelism() > 0) { - return getDefaultMaxParallelism(v); - } else { - return defaultMaxParallelism; - } - }, + v -> computeMaxParallelism(v.getParallelism(), defaultMaxParallelism), Function.identity()); } + public static int computeMaxParallelism(int parallelism, int defaultMaxParallelism) { + if (parallelism > 0) { + return getDefaultMaxParallelism(parallelism); + } else { + return defaultMaxParallelism; + } + } + private static void resetDynamicParallelism(Iterable vertices) { for (JobVertex vertex : vertices) { if (vertex.isDynamicParallelism()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java index 91a578ea995697..f1ef35f9d01f63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java @@ -44,9 +44,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil; -import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup; import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore; import org.apache.flink.runtime.jobmaster.event.JobEventManager; @@ -75,7 +72,10 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.util.SlotSelectionStrategyUtils; import org.apache.flink.streaming.api.graph.ExecutionPlan; +import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -86,7 +86,6 @@ import java.time.Duration; import java.util.Collection; -import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -123,12 +122,13 @@ public SchedulerNG createInstance( Collection failureEnrichers, BlocklistOperations blocklistOperations) throws Exception { - JobGraph jobGraph; + ExecutionConfig executionConfig; if (executionPlan instanceof JobGraph) { - jobGraph = (JobGraph) executionPlan; + executionConfig = + executionPlan.getSerializedExecutionConfig().deserializeValue(userCodeLoader); } else if (executionPlan instanceof StreamGraph) { - jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader); + executionConfig = ((StreamGraph) executionPlan).getExecutionConfig(); } else { throw new FlinkException( "Unsupported execution plan " + executionPlan.getClass().getCanonicalName()); @@ -145,20 +145,17 @@ public SchedulerNG createInstance( final ExecutionSlotAllocatorFactory allocatorFactory = createExecutionSlotAllocatorFactory(jobMasterConfiguration, slotPool); - ExecutionConfig executionConfig = - jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader); - final RestartBackoffTimeStrategy restartBackoffTimeStrategy = RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory( - jobGraph.getJobConfiguration(), + executionPlan.getJobConfiguration(), jobMasterConfiguration, - jobGraph.isCheckpointingEnabled()) + executionPlan.isCheckpointingEnabled()) .create(); log.info( "Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, - jobGraph.getName(), - jobGraph.getJobID()); + executionPlan.getName(), + executionPlan.getJobID()); final boolean isJobRecoveryEnabled = jobMasterConfiguration.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED) @@ -167,7 +164,7 @@ public SchedulerNG createInstance( BatchJobRecoveryHandler jobRecoveryHandler; if (isJobRecoveryEnabled) { FileSystemJobEventStore jobEventStore = - new FileSystemJobEventStore(jobGraph.getJobID(), jobMasterConfiguration); + new FileSystemJobEventStore(executionPlan.getJobID(), jobMasterConfiguration); JobEventManager jobEventManager = new JobEventManager(jobEventStore); jobRecoveryHandler = new DefaultBatchJobRecoveryHandler(jobEventManager, jobMasterConfiguration); @@ -177,7 +174,7 @@ public SchedulerNG createInstance( return createScheduler( log, - jobGraph, + executionPlan, executionConfig, ioExecutor, jobMasterConfiguration, @@ -208,7 +205,7 @@ public SchedulerNG createInstance( @VisibleForTesting public static AdaptiveBatchScheduler createScheduler( Logger log, - JobGraph jobGraph, + ExecutionPlan executionPlan, ExecutionConfig executionConfig, Executor ioExecutor, Configuration jobMasterConfiguration, @@ -235,9 +232,9 @@ public static AdaptiveBatchScheduler createScheduler( throws Exception { checkState( - jobGraph.getJobType() == JobType.BATCH, + executionPlan.getJobType() == JobType.BATCH, "Adaptive batch scheduler only supports batch jobs"); - checkAllExchangesAreSupported(jobGraph); + checkAllExchangesAreSupported(executionPlan); final boolean enableSpeculativeExecution = jobMasterConfiguration.get(BatchExecutionOptions.SPECULATIVE_ENABLED); @@ -269,13 +266,13 @@ public static AdaptiveBatchScheduler createScheduler( int defaultMaxParallelism = getDefaultMaxParallelism(jobMasterConfiguration, executionConfig); - final Map forwardGroupsByJobVertexId = - ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism( - jobGraph.getVerticesSortedTopologicallyFromSources()); + AdaptiveExecutionHandler adaptiveExecutionHandler = + AdaptiveExecutionHandlerFactory.create( + executionPlan, userCodeLoader, futureExecutor); return new AdaptiveBatchScheduler( log, - jobGraph, + adaptiveExecutionHandler, ioExecutor, jobMasterConfiguration, componentMainThreadExecutor -> {}, @@ -301,8 +298,8 @@ public static AdaptiveBatchScheduler createScheduler( defaultMaxParallelism, blocklistOperations, hybridPartitionDataConsumeConstraint, - forwardGroupsByJobVertexId, - jobRecoveryHandler); + jobRecoveryHandler, + adaptiveExecutionHandler.createStreamGraphSchedulingContext(defaultMaxParallelism)); } public static InputConsumableDecider.Factory loadInputConsumableDeciderFactory( @@ -368,20 +365,31 @@ private static ExecutionJobVertex.Factory createExecutionJobVertexFactory( } } - private static void checkAllExchangesAreSupported(final JobGraph jobGraph) { - for (JobVertex jobVertex : jobGraph.getVertices()) { - for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) { - checkState( - dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition() - || dataSet.getResultType().isHybridResultPartition(), - String.format( - "At the moment, adaptive batch scheduler requires batch workloads " - + "to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. " - + "To do that, you need to configure '%s' to '%s' or '%s/%s'. ", - ExecutionOptions.BATCH_SHUFFLE_MODE.key(), - BatchShuffleMode.ALL_EXCHANGES_BLOCKING, - BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, - BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE)); + private static void checkAllExchangesAreSupported(final ExecutionPlan executionPlan) { + String errMsg = + String.format( + "At the moment, adaptive batch scheduler requires batch workloads " + + "to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. " + + "To do that, you need to configure '%s' to '%s' or '%s/%s'. ", + ExecutionOptions.BATCH_SHUFFLE_MODE.key(), + BatchShuffleMode.ALL_EXCHANGES_BLOCKING, + BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, + BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE); + if (executionPlan instanceof JobGraph) { + for (JobVertex jobVertex : ((JobGraph) executionPlan).getVertices()) { + for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) { + checkState( + dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition() + || dataSet.getResultType().isHybridResultPartition(), + errMsg); + } + } + } else { + for (StreamNode streamNode : ((StreamGraph) executionPlan).getStreamNodes()) { + for (StreamEdge edge : streamNode.getOutEdges()) { + checkState( + !edge.getExchangeMode().equals(StreamExchangeMode.PIPELINED), errMsg); + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandler.java new file mode 100644 index 00000000000000..05f0761081d0b9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; +import org.apache.flink.runtime.jobmaster.event.JobEvent; + +import java.util.function.BiConsumer; + +/** + * The {@code AdaptiveExecutionHandler} interface defines the operations for handling the adaptive + * execution of batch jobs. This includes acquiring the current job graph and dynamically adjusting + * the job topology in response to job events. + */ +public interface AdaptiveExecutionHandler { + + /** + * Returns the {@code JobGraph} representing the batch job. + * + * @return the JobGraph. + */ + JobGraph getJobGraph(); + + /** + * Handles the provided {@code JobEvent}, attempting dynamic modifications to the {@code + * StreamGraph} based on the specifics of the job event. + * + * @param jobEvent The job event to handle. This event contains the necessary information that + * might trigger adjustments to the StreamGraph. + */ + void handleJobEvent(JobEvent jobEvent); + + /** + * Registers a listener to receive updates when the {@code JobGraph} is modified. + * + * @param listener the listener to register for JobGraph updates. + */ + void registerJobGraphUpdateListener(JobGraphUpdateListener listener); + + /** + * Retrieves the {@code ForwardGroup} for a given JobVertex ID. + * + * @param jobVertexId the ID of the JobVertex. + * @return the corresponding ForwardGroup. + */ + ForwardGroup getForwardGroupByJobVertexId(JobVertexID jobVertexId); + + /** + * Updates the parallelism of the forward group associated with a specific JobVertex id. This + * update also applies to the parallelism of all StreamNodes or JobVertices that belong to this + * forward group. + * + * @param jobVertexId the ID of the JobVertex. + * @param newParallelism the new parallelism level to set. + * @param jobVertexParallelismUpdater a BiConsumer that updates the execution JobVertex + * parallelism. + */ + void updateForwardGroupParallelism( + JobVertexID jobVertexId, + int newParallelism, + BiConsumer jobVertexParallelismUpdater); + + /** + * Creates an instance of {@code StreamGraphSchedulingContext}. + * + * @param defaultMaxParallelism the default value for maximum parallelism. + * @return an instance of {@code StreamGraphSchedulingContext}. + */ + StreamGraphSchedulingContext createStreamGraphSchedulingContext(int defaultMaxParallelism); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java new file mode 100644 index 00000000000000..49fdd3e89c9cfb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.graph.ExecutionPlan; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A factory class for creating instances of {@link AdaptiveExecutionHandler}. This factory provides + * a method to create an appropriate handler based on the type of the execution plan. + */ +public class AdaptiveExecutionHandlerFactory { + + /** + * Creates an instance of {@link AdaptiveExecutionHandler} based on the provided execution plan. + * + * @param executionPlan The execution plan, which can be either a {@link JobGraph} or a {@link + * StreamGraph}. + * @param userClassLoader The class loader for the user code. + * @param serializationExecutor The executor used for serialization tasks. + * @return An instance of {@link AdaptiveExecutionHandler}. + * @throws IllegalArgumentException if the execution plan is neither a {@link JobGraph} nor a + * {@link StreamGraph}. + */ + public static AdaptiveExecutionHandler create( + ExecutionPlan executionPlan, + ClassLoader userClassLoader, + Executor serializationExecutor) { + if (executionPlan instanceof JobGraph) { + return new DummyAdaptiveExecutionHandler((JobGraph) executionPlan); + } else { + checkState(executionPlan instanceof StreamGraph, "Unsupported execution plan."); + return new DefaultAdaptiveExecutionHandler( + userClassLoader, (StreamGraph) executionPlan, serializationExecutor); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandler.java new file mode 100644 index 00000000000000..004ae2e0082fe8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandler.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; +import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup; +import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent; +import org.apache.flink.runtime.jobmaster.event.JobEvent; +import org.apache.flink.streaming.api.graph.AdaptiveGraphManager; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; + +/** Default implementation of {@link AdaptiveExecutionHandler}. */ +public class DefaultAdaptiveExecutionHandler implements AdaptiveExecutionHandler { + + private final Logger log = LoggerFactory.getLogger(DefaultAdaptiveExecutionHandler.class); + + private final List jobGraphUpdateListeners = new ArrayList<>(); + + private final AdaptiveGraphManager adaptiveGraphManager; + + public DefaultAdaptiveExecutionHandler( + ClassLoader userClassloader, StreamGraph streamGraph, Executor serializationExecutor) { + this.adaptiveGraphManager = + new AdaptiveGraphManager(userClassloader, streamGraph, serializationExecutor); + } + + @Override + public JobGraph getJobGraph() { + return adaptiveGraphManager.getJobGraph(); + } + + @Override + public void handleJobEvent(JobEvent jobEvent) { + try { + tryUpdateJobGraph(jobEvent); + } catch (Exception e) { + log.error("Failed to handle job event {}.", jobEvent, e); + throw new RuntimeException(e); + } + } + + private void tryUpdateJobGraph(JobEvent jobEvent) throws Exception { + if (jobEvent instanceof ExecutionJobVertexFinishedEvent) { + ExecutionJobVertexFinishedEvent event = (ExecutionJobVertexFinishedEvent) jobEvent; + + List newlyCreatedJobVertices = + adaptiveGraphManager.onJobVertexFinished(event.getVertexId()); + + if (!newlyCreatedJobVertices.isEmpty()) { + notifyJobGraphUpdated( + newlyCreatedJobVertices, adaptiveGraphManager.getPendingOperatorsCount()); + } + } + } + + private void notifyJobGraphUpdated(List jobVertices, int pendingOperatorsCount) + throws Exception { + for (JobGraphUpdateListener listener : jobGraphUpdateListeners) { + listener.onNewJobVerticesAdded(jobVertices, pendingOperatorsCount); + } + } + + @Override + public void registerJobGraphUpdateListener(JobGraphUpdateListener listener) { + jobGraphUpdateListeners.add(listener); + } + + @Override + public ForwardGroup getForwardGroupByJobVertexId(JobVertexID jobVertexId) { + return adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(jobVertexId); + } + + @Override + public void updateForwardGroupParallelism( + JobVertexID jobVertexId, + int newParallelism, + BiConsumer jobVertexParallelismUpdater) { + StreamNodeForwardGroup forwardGroup = + adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(jobVertexId); + + if (forwardGroup != null && !forwardGroup.isParallelismDecided()) { + forwardGroup.setParallelism(newParallelism); + + forwardGroup + .getChainedStreamNodeGroups() + .forEach( + chainedStreamNodeGroup -> + chainedStreamNodeGroup.forEach( + streamNode -> + adaptiveGraphManager + .updateStreamNodeParallelism( + streamNode.getId(), + newParallelism))); + forwardGroup + .getStartNodes() + .forEach( + streamNode -> + adaptiveGraphManager + .findVertexByStreamNodeId(streamNode.getId()) + .ifPresent( + id -> + jobVertexParallelismUpdater.accept( + id, newParallelism))); + } + } + + @Override + public StreamGraphSchedulingContext createStreamGraphSchedulingContext( + int defaultMaxParallelism) { + return new DefaultStreamGraphSchedulingContext(adaptiveGraphManager, defaultMaxParallelism); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContext.java new file mode 100644 index 00000000000000..668863127efc80 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContext.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.graph.AdaptiveGraphManager; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Default implementation of {@link StreamGraphSchedulingContext}. */ +public class DefaultStreamGraphSchedulingContext implements StreamGraphSchedulingContext { + + private final AdaptiveGraphManager adaptiveGraphManager; + private final int defaultMaxParallelism; + + public DefaultStreamGraphSchedulingContext( + AdaptiveGraphManager adaptiveGraphManager, int defaultMaxParallelism) { + this.adaptiveGraphManager = checkNotNull(adaptiveGraphManager); + this.defaultMaxParallelism = defaultMaxParallelism; + } + + @Override + public int getParallelism(int streamNodeId) { + return adaptiveGraphManager + .getStreamGraphContext() + .getStreamGraph() + .getStreamNode(streamNodeId) + .getParallelism(); + } + + @Override + public int getMaxParallelismOrDefault(int streamNodeId) { + ImmutableStreamNode streamNode = + adaptiveGraphManager + .getStreamGraphContext() + .getStreamGraph() + .getStreamNode(streamNodeId); + + if (streamNode.getMaxParallelism() == JobVertex.MAX_PARALLELISM_DEFAULT) { + return AdaptiveBatchScheduler.computeMaxParallelism( + streamNode.getParallelism(), defaultMaxParallelism); + } else { + return streamNode.getMaxParallelism(); + } + } + + @Override + public int getPendingOperatorCount() { + return adaptiveGraphManager.getPendingOperatorsCount(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java new file mode 100644 index 00000000000000..0e0feb4c12e001 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup; +import org.apache.flink.runtime.jobmaster.event.JobEvent; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A dummy implementation of {@link AdaptiveExecutionHandler}. */ +public class DummyAdaptiveExecutionHandler implements AdaptiveExecutionHandler { + + private final Logger log = LoggerFactory.getLogger(getClass()); + private final JobGraph jobGraph; + private final Map forwardGroupsByJobVertexId; + + public DummyAdaptiveExecutionHandler(JobGraph jobGraph) { + this.jobGraph = checkNotNull(jobGraph); + this.forwardGroupsByJobVertexId = + ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism( + getJobGraph().getVerticesSortedTopologicallyFromSources()); + } + + @Override + public JobGraph getJobGraph() { + return jobGraph; + } + + @Override + public void handleJobEvent(JobEvent jobEvent) { + // do nothing + } + + @Override + public void registerJobGraphUpdateListener(JobGraphUpdateListener listener) { + // do nothing + } + + @Override + public ForwardGroup getForwardGroupByJobVertexId(JobVertexID jobVertexId) { + return forwardGroupsByJobVertexId.get(jobVertexId); + } + + @Override + public void updateForwardGroupParallelism( + JobVertexID jobVertexId, + int newParallelism, + BiConsumer jobVertexParallelismUpdater) { + JobVertexForwardGroup forwardGroup = forwardGroupsByJobVertexId.get(jobVertexId); + + if (forwardGroup != null && !forwardGroup.isParallelismDecided()) { + forwardGroup.setParallelism(newParallelism); + + for (JobVertexID id : forwardGroup.getJobVertexIds()) { + jobVertexParallelismUpdater.accept(id, newParallelism); + } + } + } + + @Override + public StreamGraphSchedulingContext createStreamGraphSchedulingContext( + int defaultMaxParallelism) { + return DummyStreamGraphSchedulingContext.INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyStreamGraphSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyStreamGraphSchedulingContext.java new file mode 100644 index 00000000000000..71a7ec9f8bcbb7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyStreamGraphSchedulingContext.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +/** A dummy implementation of {@link StreamGraphSchedulingContext}. */ +public final class DummyStreamGraphSchedulingContext implements StreamGraphSchedulingContext { + + public static final DummyStreamGraphSchedulingContext INSTANCE = + new DummyStreamGraphSchedulingContext(); + + private DummyStreamGraphSchedulingContext() {} + + @Override + public int getParallelism(int streamNodeId) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMaxParallelismOrDefault(int streamNodeId) { + throw new UnsupportedOperationException(); + } + + @Override + public int getPendingOperatorCount() { + return 0; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/JobGraphUpdateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/JobGraphUpdateListener.java new file mode 100644 index 00000000000000..8265b5455af3e8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/JobGraphUpdateListener.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobVertex; + +import java.util.List; + +/** + * This interface defines operations for components that are interested in being notified when new + * job vertices are added to the job graph. + */ +@FunctionalInterface +public interface JobGraphUpdateListener { + /** + * Invoked when new {@link JobVertex} instances are added to the JobGraph of a specific job. + * This allows interested components to react to the addition of new vertices to the job + * topology. + * + * @param newVertices A list of newly added JobVertex instances. + * @param pendingOperatorsCount The number of pending operators. + */ + void onNewJobVerticesAdded(List newVertices, int pendingOperatorsCount) + throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphSchedulingContext.java new file mode 100644 index 00000000000000..742a7d21d328f3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphSchedulingContext.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +/** Interface for retrieving stream graph context details for adaptive batch jobs. */ +public interface StreamGraphSchedulingContext { + + /** + * Retrieves the parallelism of a given stream node. + * + * @param streamNodeId the ID of the stream node. + * @return the current parallelism of the stream node. + */ + int getParallelism(int streamNodeId); + + /** + * Retrieves the maximum parallelism for a specified stream node. If the maximum parallelism is + * not explicitly set, the default maximum parallelism is returned. + * + * @param streamNodeId the ID of the stream node. + * @return the maximum parallelism for the stream node, or the default if unspecified. + */ + int getMaxParallelismOrDefault(int streamNodeId); + + /** + * Retrieves the count of pending operators waiting to be transferred to job vertices. + * + * @return the number of pending operators. + */ + int getPendingOperatorCount(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java index 34ec7f3874d816..2e4e161521cc6d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java @@ -227,7 +227,7 @@ public Integer getProducerStreamNodeId(IntermediateDataSetID intermediateDataSet return intermediateDataSetIdToProducerMap.get(intermediateDataSetID); } - private Optional findVertexByStreamNodeId(int streamNodeId) { + public Optional findVertexByStreamNodeId(int streamNodeId) { if (frozenNodeToStartNodeMap.containsKey(streamNodeId)) { Integer startNodeId = frozenNodeToStartNodeMap.get(streamNodeId); return Optional.of(jobVerticesCache.get(startNodeId).getID()); @@ -694,4 +694,8 @@ private boolean isReadyToCreateJobVertex(OperatorChainInfo chainInfo) { } return true; } + + public void updateStreamNodeParallelism(int streamNodeId, int newParallelism) { + streamGraph.getStreamNode(streamNodeId).setParallelism(newParallelism); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java index 516bd2b433f3f4..c58e0d90cb024c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java @@ -67,4 +67,12 @@ public int getId() { public @Nullable StreamOperatorFactory getOperatorFactory() { return streamNode.getOperatorFactory(); } + + public int getMaxParallelism() { + return streamNode.getMaxParallelism(); + } + + public int getParallelism() { + return streamNode.getParallelism(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java index 5dd02043b80b0f..4ae63c3d3b8e47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -22,11 +22,14 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.VertexParallelismStore; @@ -34,6 +37,11 @@ import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; +import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; @@ -41,11 +49,13 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; +import static org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link IntermediateResultPartition}. */ @@ -158,6 +168,84 @@ void testReleasePartitionGroups() throws Exception { assertThat(partition1.canBeReleased()).isFalse(); } + @Test + void testReleasePartitionGroupsForDynamicGroup() throws Exception { + JobVertex source = new JobVertex("source"); + source.setParallelism(1); + source.setInvokableClass(NoOpInvokable.class); + + JobVertex sink1 = new JobVertex("sink1"); + sink1.setParallelism(1); + sink1.setInvokableClass(NoOpInvokable.class); + + JobVertex sink2 = new JobVertex("sink2"); + sink2.setParallelism(1); + sink2.setInvokableClass(NoOpInvokable.class); + + // At first, create job graph with topology: source -> sink1 + IntermediateDataSetID dataSetId = new IntermediateDataSetID(); + sink1.connectNewDataSetAsInput( + source, + DistributionPattern.ALL_TO_ALL, + ResultPartitionType.BLOCKING, + dataSetId, + false); + JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink1); + jobGraph.setDynamic(true); + + SchedulerBase scheduler = + new DefaultSchedulerBuilder( + jobGraph, + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new DirectScheduledExecutorService()) + .buildAdaptiveBatchJobScheduler(); + + IntermediateDataSet dataSet = source.getProducedDataSets().get(0); + ExecutionJobVertex sourceVertex = scheduler.getExecutionJobVertex(source.getID()); + ExecutionJobVertex sinkVertex1 = scheduler.getExecutionJobVertex(sink1.getID()); + scheduler.getExecutionGraph().initializeJobVertex(sourceVertex, 1L); + scheduler.getExecutionGraph().initializeJobVertex(sinkVertex1, 1L); + + // add two stream edges + StreamNode sourceVertexDummy = + new StreamNode( + 0, null, null, (StreamOperator) null, null, SourceStreamTask.class); + StreamNode targetVertexDummy = + new StreamNode( + 0, null, null, (StreamOperator) null, null, SourceStreamTask.class); + dataSet.addOutputStreamEdge( + new StreamEdge( + sourceVertexDummy, targetVertexDummy, 0, new ShufflePartitioner<>(), null)); + dataSet.addOutputStreamEdge( + new StreamEdge( + sourceVertexDummy, targetVertexDummy, 0, new ShufflePartitioner<>(), null)); + + // mark partition can be released + IntermediateResultPartition partition = + sourceVertex.getProducedDataSets()[0].getPartitions()[0]; + assertThat(partition.canBeReleased()).isFalse(); + List consumedPartitionGroup = + partition.getConsumedPartitionGroups(); + partition.markPartitionGroupReleasable(consumedPartitionGroup.get(0)); + + // because not all job vertices are created, partition can not be released + assertThat(partition.canBeReleased()).isFalse(); + + // add a new job vertex + dataSet.addConsumer(new JobEdge(dataSet, sink2, DistributionPattern.ALL_TO_ALL, false)); + scheduler + .getExecutionGraph() + .addNewJobVertices( + Collections.singletonList(sink2), + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), + computeVertexParallelismStoreForDynamicGraph( + Collections.singletonList(sink2), 1)); + ExecutionJobVertex sinkVertex2 = scheduler.getExecutionJobVertex(sink2.getID()); + scheduler.getExecutionGraph().initializeJobVertex(sinkVertex2, 1L); + + assertThat(partition.canBeReleased()).isTrue(); + } + @Test void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph() throws Exception { testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false, Arrays.asList(7, 7)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index 0280410892d5c9..9b29edc2190bf1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -41,6 +41,8 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleTestUtils; @@ -86,6 +88,8 @@ public static TestingDefaultExecutionGraphBuilder newBuilder() { checkpointStatsTrackerFactory = metricGroup -> NoOpCheckpointStatsTracker.INSTANCE; private boolean nonFinishedHybridPartitionShouldBeUnknown = false; + private StreamGraphSchedulingContext streamGraphSchedulingContext = + DummyStreamGraphSchedulingContext.INSTANCE; private TestingDefaultExecutionGraphBuilder() {} @@ -180,6 +184,12 @@ public TestingDefaultExecutionGraphBuilder setCheckpointStatsTracker( return this; } + public TestingDefaultExecutionGraphBuilder setStreamGraphSchedulingContext( + StreamGraphSchedulingContext streamGraphSchedulingContext) { + this.streamGraphSchedulingContext = streamGraphSchedulingContext; + return this; + } + private DefaultExecutionGraph build( boolean isDynamicGraph, ScheduledExecutorService executorService) throws JobException, JobExecutionException { @@ -212,7 +222,8 @@ private DefaultExecutionGraph build( executionJobVertexFactory, markPartitionFinishedStrategy, nonFinishedHybridPartitionShouldBeUnknown, - metricGroup); + metricGroup, + streamGraphSchedulingContext); } public DefaultExecutionGraph build(ScheduledExecutorService executorService) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java index 0ce37609d97b1f..7c8db0452a40fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java @@ -39,12 +39,14 @@ import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.TestUtils; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleTestUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.testutils.TestingUtils; @@ -67,7 +69,9 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; @@ -116,6 +120,7 @@ void testRestoringModifiedJobFromSavepointFails() throws Exception { jobGraphWithNewOperator), (execution, previousState, newState) -> {}, rp -> false, + DummyStreamGraphSchedulingContext.INSTANCE, log)) .withFailMessage( "Expected ExecutionGraph creation to fail because of non restored state.") @@ -145,6 +150,7 @@ void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() th SchedulerBase.computeVertexParallelismStore(jobGraphWithNewOperator), (execution, previousState, newState) -> {}, rp -> false, + DummyStreamGraphSchedulingContext.INSTANCE, log); final CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint(); @@ -183,11 +189,23 @@ public void addSpan(SpanBuilder spanBuilder) { TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, new DefaultVertexAttemptNumberStore(), - vertexId -> - new DefaultVertexParallelismInfo( - 1, 1337, integer -> Optional.empty()), + new VertexParallelismStore() { + @Override + public VertexParallelismInformation getParallelismInfo( + JobVertexID vertexId) { + return new DefaultVertexParallelismInfo( + 1, 1337, integer -> Optional.empty()); + } + + @Override + public Map + getAllParallelismInfo() { + return Collections.emptyMap(); + } + }, (execution, previousState, newState) -> {}, rp -> false, + DummyStreamGraphSchedulingContext.INSTANCE, log); checkpointStatsTracker.reportRestoredCheckpoint( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java index d648d6e5570e9c..43b3949dc1cd79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler; import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo; import org.apache.flink.runtime.scheduler.adaptivebatch.DummyBatchJobRecoveryHandler; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider; import org.apache.flink.runtime.scheduler.strategy.AllFinishedInputConsumableDecider; import org.apache.flink.runtime.scheduler.strategy.InputConsumableDecider; @@ -326,7 +327,8 @@ public DefaultScheduler build() throws Exception { shuffleMaster, rpcTimeout, computeVertexParallelismStore(jobGraph), - executionDeployerFactory); + executionDeployerFactory, + DummyStreamGraphSchedulingContext.INSTANCE); } public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java index eb109b8300b36f..9134657fce692f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java @@ -49,6 +49,42 @@ void testSetInfo() { assertThat(storedInfo).isEqualTo(info); } + @Test + void testGetAllInfos() { + JobVertexID id = new JobVertexID(); + JobVertexID id2 = new JobVertexID(); + VertexParallelismInformation info = new MockVertexParallelismInfo(); + VertexParallelismInformation info2 = new MockVertexParallelismInfo(); + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + + store.setParallelismInfo(id, info); + store.setParallelismInfo(id2, info2); + + assertThat(store.getParallelismInfo(id)).isEqualTo(info); + assertThat(store.getParallelismInfo(id2)).isEqualTo(info2); + } + + @Test + void testMergeParallelismStore() { + JobVertexID id = new JobVertexID(); + JobVertexID id2 = new JobVertexID(); + VertexParallelismInformation info = new MockVertexParallelismInfo(); + VertexParallelismInformation info2 = new MockVertexParallelismInfo(); + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + DefaultVertexParallelismStore store2 = new DefaultVertexParallelismStore(); + store.setParallelismInfo(id, info); + store2.setParallelismInfo(id2, info2); + + assertThat(store.getParallelismInfo(id)).isEqualTo(info); + assertThatThrownBy(() -> store.getParallelismInfo(id2)) + .isInstanceOf(IllegalStateException.class); + + store.mergeParallelismStore(store2); + + assertThat(store.getParallelismInfo(id)).isEqualTo(info); + assertThat(store.getParallelismInfo(id2)).isEqualTo(info2); + } + private static final class MockVertexParallelismInfo implements VertexParallelismInformation { @Override public int getMinParallelism() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java index b054913b51a4b9..faabd723a7c0b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java @@ -26,16 +26,25 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState; import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; +import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.IterableUtils; @@ -47,6 +56,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -60,6 +70,7 @@ import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL; +import static org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -169,7 +180,7 @@ void testErrorIfCoLocatedTasksAreNotInSameRegion() { } @Test - void testUpdateTopology() throws Exception { + void testUpdateTopologyWithInitializedJobVertices() throws Exception { final JobVertex[] jobVertices = createJobVertices(BLOCKING); executionGraph = createDynamicGraph(jobVertices); adapter = DefaultExecutionTopology.fromExecutionGraph(executionGraph); @@ -178,16 +189,78 @@ void testUpdateTopology() throws Exception { final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID()); executionGraph.initializeJobVertex(ejv1, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv1)); assertThat(adapter.getVertices()).hasSize(3); executionGraph.initializeJobVertex(ejv2, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv2)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv2)); assertThat(adapter.getVertices()).hasSize(6); assertGraphEquals(executionGraph, adapter); } + @Test + void testUpdateTopologyWithNewlyAddedJobVertices() throws Exception { + final int parallelism = 3; + JobVertex jobVertex1 = createNoOpVertex(parallelism); + IntermediateDataSetID dataSetId = new IntermediateDataSetID(); + + StreamNode sourceVertexDummy = + new StreamNode( + 0, null, null, (StreamOperator) null, null, SourceStreamTask.class); + StreamNode targetVertexDummy = + new StreamNode(0, null, null, (StreamOperator) null, null, StreamTask.class); + jobVertex1 + .getOrCreateResultDataSet(dataSetId, BLOCKING) + .addOutputStreamEdge( + new StreamEdge( + sourceVertexDummy, + targetVertexDummy, + 0, + new ShufflePartitioner(), + null)); + + executionGraph = + createDynamicGraph( + new TestingStreamGraphSchedulingContext(parallelism, parallelism), + jobVertex1); + adapter = DefaultExecutionTopology.fromExecutionGraph(executionGraph); + + // Operating execution graph: 1. Initialize job vertex1 + assertThat(executionGraph.getAllVertices()).hasSize(1); + final ExecutionJobVertex ejv1 = executionGraph.getJobVertex(jobVertex1.getID()); + executionGraph.initializeJobVertex(ejv1, 0L); + + // Operating execution graph: 2. Add job vertex2 + JobVertex jobVertex2 = createNoOpVertex(parallelism); + jobVertex2.connectNewDataSetAsInput(jobVertex1, ALL_TO_ALL, BLOCKING, dataSetId, false); + + // Operating execution graph: 3. Initialize job vertex2 + List newJobVertices = Collections.singletonList(jobVertex2); + executionGraph.addNewJobVertices( + newJobVertices, + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), + computeVertexParallelismStoreForDynamicGraph(newJobVertices, parallelism)); + final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertex2.getID()); + executionGraph.initializeJobVertex(ejv2, 0L); + + // Operating execution topology graph: 1. notify job vertex1 initialized + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv1)); + + // Operating execution topology graph: 2. notify job vertex2 added + adapter.notifyExecutionGraphUpdatedWithNewlyJobVertices( + Arrays.asList(jobVertex1, jobVertex2)); + + // Operating execution topology graph: 3. notify job vertex2 initialized + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv2)); + + assertGraphEquals(executionGraph, adapter); + } + @Test void testErrorIfUpdateTopologyWithNewVertexPipelinedConnectedToOldOnes() throws Exception { final JobVertex[] jobVertices = createJobVertices(PIPELINED); @@ -198,12 +271,13 @@ void testErrorIfUpdateTopologyWithNewVertexPipelinedConnectedToOldOnes() throws final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID()); executionGraph.initializeJobVertex(ejv1, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv1)); executionGraph.initializeJobVertex(ejv2, 0L); assertThatThrownBy( () -> - adapter.notifyExecutionGraphUpdated( + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( executionGraph, Collections.singletonList(ejv2))) .isInstanceOf(IllegalStateException.class); } @@ -218,12 +292,14 @@ void testExistingRegionsAreNotAffectedDuringTopologyUpdate() throws Exception { final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID()); executionGraph.initializeJobVertex(ejv1, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv1)); SchedulingPipelinedRegion regionOld = adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0)); executionGraph.initializeJobVertex(ejv2, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv2)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv2)); SchedulingPipelinedRegion regionNew = adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0)); @@ -246,6 +322,15 @@ private DefaultExecutionGraph createDynamicGraph(JobVertex... jobVertices) throw .buildDynamicGraph(EXECUTOR_RESOURCE.getExecutor()); } + private DefaultExecutionGraph createDynamicGraph( + StreamGraphSchedulingContext streamGraphSchedulingContext, JobVertex... jobVertices) + throws Exception { + return TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobGraph(new JobGraph(new JobID(), "TestJob", jobVertices)) + .setStreamGraphSchedulingContext(streamGraphSchedulingContext) + .buildDynamicGraph(EXECUTOR_RESOURCE.getExecutor()); + } + private void assertRegionContainsAllVertices( final DefaultSchedulingPipelinedRegion pipelinedRegionOfVertex) { final Set allVertices = @@ -349,4 +434,31 @@ private static void assertPartitionEquals( assertThat(adaptedPartition.getProducer().getId()) .isEqualTo(originalPartition.getProducer().getID()); } + + private static class TestingStreamGraphSchedulingContext + implements StreamGraphSchedulingContext { + + private final int parallelism; + private final int maxParallelism; + + private TestingStreamGraphSchedulingContext(int parallelism, int maxParallelism) { + this.parallelism = parallelism; + this.maxParallelism = maxParallelism; + } + + @Override + public int getParallelism(int streamNodeId) { + return parallelism; + } + + @Override + public int getMaxParallelismOrDefault(int streamNodeId) { + return maxParallelism; + } + + @Override + public int getPendingOperatorCount() { + return 0; + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java index 544595ee2cc56a..ce49ff5bded0e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java @@ -57,6 +57,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.scheduler.InternalFailuresListener; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; @@ -404,6 +405,15 @@ public void notifyNewlyInitializedJobVertices(List vertices) throw new UnsupportedOperationException(); } + @Override + public void addNewJobVertices( + List sortedJobVertices, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + VertexParallelismStore newVerticesParallelismStore) + throws JobException { + throw new UnsupportedOperationException(); + } + public void registerExecution(TestingAccessExecution execution) { executions.put(execution.getAttemptId(), execution); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest.java new file mode 100644 index 00000000000000..ce384600ce2086 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link DefaultAdaptiveExecutionHandler}. */ +class DefaultAdaptiveExecutionHandlerTest { + + @RegisterExtension + static final TestExecutorExtension EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + @Test + void testGetJobGraph() { + JobGraph jobGraph = createAdaptiveExecutionHandler().getJobGraph(); + + assertThat(jobGraph).isNotNull(); + assertThat(jobGraph.getNumberOfVertices()).isOne(); + assertThat(jobGraph.getVertices().iterator().next().getName()).contains("Source"); + } + + @Test + void testHandleJobEvent() { + List newAddedJobVertices = new ArrayList<>(); + AtomicInteger pendingOperators = new AtomicInteger(); + + DefaultAdaptiveExecutionHandler handler = + createAdaptiveExecutionHandler( + (newVertices, pendingOperatorsCount) -> { + newAddedJobVertices.addAll(newVertices); + pendingOperators.set(pendingOperatorsCount); + }, + createStreamGraph()); + + JobGraph jobGraph = handler.getJobGraph(); + JobVertex source = jobGraph.getVertices().iterator().next(); + + // notify Source node is finished + ExecutionJobVertexFinishedEvent event1 = + new ExecutionJobVertexFinishedEvent(source.getID(), Collections.emptyMap()); + handler.handleJobEvent(event1); + assertThat(newAddedJobVertices).hasSize(1); + assertThat(newAddedJobVertices.get(0).getName()).contains("Map"); + assertThat(pendingOperators.get()).isOne(); + + // notify Map node is finished + ExecutionJobVertexFinishedEvent event2 = + new ExecutionJobVertexFinishedEvent( + newAddedJobVertices.get(0).getID(), Collections.emptyMap()); + handler.handleJobEvent(event2); + assertThat(newAddedJobVertices).hasSize(2); + assertThat(newAddedJobVertices.get(1).getName()).contains("Sink"); + assertThat(pendingOperators.get()).isZero(); + } + + @Test + void testUpdateForwardGroupParallelism() { + StreamGraph streamGraph = createStreamGraph(); + DefaultAdaptiveExecutionHandler handler = + createAdaptiveExecutionHandler( + (newVertices, pendingOperatorsCount) -> {}, streamGraph); + JobGraph jobGraph = handler.getJobGraph(); + JobVertexID sourceId = jobGraph.getVertices().iterator().next().getID(); + + Map result = new HashMap<>(); + handler.updateForwardGroupParallelism(sourceId, 2, result::put); + + assertThat(handler.getForwardGroupByJobVertexId(sourceId).getParallelism()).isEqualTo(2); + assertThat( + streamGraph.getStreamNodes().stream() + .filter(node -> node.getOperatorName().equals("Map")) + .findFirst() + .get() + .getParallelism()) + .isEqualTo(2); + assertThat(result.getOrDefault(sourceId, -1)).isEqualTo(2); + } + + private DefaultAdaptiveExecutionHandler createAdaptiveExecutionHandler() { + return createAdaptiveExecutionHandler( + (newVertices, pendingOperatorsCount) -> {}, createStreamGraph()); + } + + /** + * Create a stream graph with the following topology. + * + *
+     *     Source -- forward --> Map -- rescale --> Sink
+     * 
+ */ + private StreamGraph createStreamGraph() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.disableOperatorChaining(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + + env.fromSequence(0, 1) + .name("Source") + .forward() + .map(i -> i) + .name("Map") + .rescale() + .print() + .name("Sink") + .disableChaining(); + env.setParallelism(1); + + return env.getStreamGraph(); + } + + /** + * Create an {@link DefaultAdaptiveExecutionHandler} with a given {@link JobGraphUpdateListener} + * and a given {@link StreamGraph}. + */ + private DefaultAdaptiveExecutionHandler createAdaptiveExecutionHandler( + JobGraphUpdateListener listener, StreamGraph streamGraph) { + DefaultAdaptiveExecutionHandler handler = + new DefaultAdaptiveExecutionHandler( + getClass().getClassLoader(), streamGraph, EXECUTOR_RESOURCE.getExecutor()); + handler.registerJobGraphUpdateListener(listener); + + return handler; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java new file mode 100644 index 00000000000000..30801a9b0f9866 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.runtime.scheduler.SchedulerBase.getDefaultMaxParallelism; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link DefaultStreamGraphSchedulingContext}. */ +class DefaultStreamGraphSchedulingContextTest { + + @RegisterExtension + private static final TestExecutorExtension EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + @Test + void testGetParallelismAndMaxParallelism() { + int sourceParallelism = 3; + int sinkParallelism = 4; + int sourceMaxParallelism = 5; + int sinkMaxParallelism = 5; + + StreamGraph streamGraph = + getStreamGraph( + sourceParallelism, + sourceMaxParallelism, + sinkParallelism, + sinkMaxParallelism); + + StreamGraphSchedulingContext streamGraphSchedulingContext = + getStreamGraphSchedulingContext(streamGraph, 100); + + Iterator iterator = streamGraph.getStreamNodes().iterator(); + StreamNode source = iterator.next(); + assertThat(streamGraphSchedulingContext.getParallelism(source.getId())) + .isEqualTo(sourceParallelism); + assertThat(streamGraphSchedulingContext.getMaxParallelismOrDefault(source.getId())) + .isEqualTo(sourceMaxParallelism); + + StreamNode sink = iterator.next(); + assertThat(streamGraphSchedulingContext.getParallelism(sink.getId())) + .isEqualTo(sinkParallelism); + assertThat(streamGraphSchedulingContext.getMaxParallelismOrDefault(sink.getId())) + .isEqualTo(sinkMaxParallelism); + } + + @Test + void testGetDefaultMaxParallelism() { + int sourceParallelism = 3; + int sinkParallelism = -1; + int sourceMaxParallelism = -1; + int sinkMaxParallelism = -1; + int defaultMaxParallelism = 100; + + StreamGraph streamGraph = + getStreamGraph( + sourceParallelism, + sourceMaxParallelism, + sinkParallelism, + sinkMaxParallelism); + + StreamGraphSchedulingContext streamGraphSchedulingContext = + getStreamGraphSchedulingContext(streamGraph, defaultMaxParallelism); + + Iterator iterator = streamGraph.getStreamNodes().iterator(); + StreamNode source = iterator.next(); + // the source max parallelism will not fall back to the default max parallelism because its + // parallelism is greater than zero. + assertThat(streamGraphSchedulingContext.getMaxParallelismOrDefault(source.getId())) + .isEqualTo(getDefaultMaxParallelism(sourceParallelism)); + + StreamNode sink = iterator.next(); + // the sink max parallelism will fall back to default max parallelism + assertThat(streamGraphSchedulingContext.getMaxParallelismOrDefault(sink.getId())) + .isEqualTo(defaultMaxParallelism); + } + + @Test + public void testGetPendingOperatorCount() { + StreamGraph streamGraph = getStreamGraph(); + DefaultAdaptiveExecutionHandler executionHandler = + getDefaultAdaptiveExecutionHandler(streamGraph); + StreamGraphSchedulingContext schedulingContext = + getStreamGraphSchedulingContext(executionHandler, 1); + + assertThat(schedulingContext.getPendingOperatorCount()).isEqualTo(1); + + JobGraph jobGraph = executionHandler.getJobGraph(); + JobVertex source = jobGraph.getVertices().iterator().next(); + executionHandler.handleJobEvent( + new ExecutionJobVertexFinishedEvent(source.getID(), Collections.emptyMap())); + + assertThat(schedulingContext.getPendingOperatorCount()).isEqualTo(0); + } + + private static StreamGraph getStreamGraph() { + return getStreamGraph(1, 1, 2, 2); + } + + private static StreamGraph getStreamGraph( + int sourceParallelism, + int sourceMaxParallelism, + int sinkParallelism, + int sinkMaxParallelism) { + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); + env.fromSequence(0L, 1L).disableChaining().print(); + StreamGraph streamGraph = env.getStreamGraph(); + + Iterator iterator = streamGraph.getStreamNodes().iterator(); + + StreamNode source = iterator.next(); + StreamNode sink = iterator.next(); + + source.setParallelism(sourceParallelism); + sink.setParallelism(sinkParallelism); + + if (sourceMaxParallelism > 0) { + source.setMaxParallelism(sourceMaxParallelism); + } + + if (sinkMaxParallelism > 0) { + sink.setMaxParallelism(sinkMaxParallelism); + } + return streamGraph; + } + + private static DefaultAdaptiveExecutionHandler getDefaultAdaptiveExecutionHandler( + StreamGraph streamGraph) { + return new DefaultAdaptiveExecutionHandler( + Thread.currentThread().getContextClassLoader(), + streamGraph, + EXECUTOR_RESOURCE.getExecutor()); + } + + private static StreamGraphSchedulingContext getStreamGraphSchedulingContext( + StreamGraph streamGraph, int defaultMaxParallelism) { + return getDefaultAdaptiveExecutionHandler(streamGraph) + .createStreamGraphSchedulingContext(defaultMaxParallelism); + } + + private static StreamGraphSchedulingContext getStreamGraphSchedulingContext( + AdaptiveExecutionHandler adaptiveExecutionHandler, int defaultMaxParallelism) { + return adaptiveExecutionHandler.createStreamGraphSchedulingContext(defaultMaxParallelism); + } +}