diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index bf7df41b339b3d..ae43f71f990cd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -67,11 +67,13 @@ import org.apache.flink.runtime.operators.coordination.CoordinatorStore; import org.apache.flink.runtime.operators.coordination.CoordinatorStoreImpl; import org.apache.flink.runtime.query.KvStateLocationRegistry; +import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore; import org.apache.flink.runtime.scheduler.InternalFailuresListener; import org.apache.flink.runtime.scheduler.SsgNetworkMemoryCalculationUtils; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; @@ -107,11 +109,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -251,7 +255,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG private final VertexAttemptNumberStore initialAttemptCounts; - private final VertexParallelismStore parallelismStore; + private VertexParallelismStore parallelismStore; // ------ Fields that are relevant to the execution and need to be cleared before archiving // ------- @@ -306,6 +310,8 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG private final List jobStatusChangedListeners; + private final StreamGraphSchedulingContext streamGraphSchedulingContext; + // -------------------------------------------------------------------------------------------- // Constructors // -------------------------------------------------------------------------------------------- @@ -332,7 +338,8 @@ public DefaultExecutionGraph( List jobStatusHooks, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory, - List jobStatusChangedListeners) { + List jobStatusChangedListeners, + StreamGraphSchedulingContext streamGraphSchedulingContext) { this.jobType = jobType; this.executionGraphId = new ExecutionGraphID(); @@ -405,6 +412,8 @@ public DefaultExecutionGraph( this.jobStatusChangedListeners = checkNotNull(jobStatusChangedListeners); + this.streamGraphSchedulingContext = checkNotNull(streamGraphSchedulingContext); + LOG.info( "Created execution graph {} for job {}.", executionGraphId, @@ -835,7 +844,7 @@ public void setInternalTaskFailuresListener( @Override public void notifyNewlyInitializedJobVertices(List vertices) { - executionTopology.notifyExecutionGraphUpdated(this, vertices); + executionTopology.notifyExecutionGraphUpdatedWithInitializedJobVertices(this, vertices); } @Override @@ -864,6 +873,39 @@ public void attachJobGraph( partitionGroupReleaseStrategyFactory.createInstance(getSchedulingTopology()); } + @Override + public void addNewJobVertices( + List topologicallySortedNewlyJobVertices, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + VertexParallelismStore newVerticesParallelismStore) + throws JobException { + // sanity check + Set existingKeys = new HashSet<>(this.tasks.keySet()); + Set newKeys = + topologicallySortedNewlyJobVertices.stream() + .map(JobVertex::getID) + .collect(Collectors.toSet()); + newKeys.retainAll(existingKeys); + if (!newKeys.isEmpty()) { + throw new IllegalArgumentException( + "Unexpected JobVertices that have already been added: " + newKeys); + } + + DefaultVertexParallelismStore vertexParallelismStore = new DefaultVertexParallelismStore(); + vertexParallelismStore.mergeParallelismStore(this.parallelismStore); + vertexParallelismStore.mergeParallelismStore(newVerticesParallelismStore); + this.parallelismStore = vertexParallelismStore; + + attachJobVertices(topologicallySortedNewlyJobVertices, jobManagerJobMetricGroup); + + List topologicallySortedJobVertices = + IterableUtils.toStream(getVerticesTopologically()) + .map(ExecutionJobVertex::getJobVertex) + .collect(Collectors.toList()); + executionTopology.notifyExecutionGraphUpdatedWithNewlyJobVertices( + topologicallySortedJobVertices); + } + /** Attach job vertices without initializing them. */ private void attachJobVertices( List topologicallySorted, JobManagerJobMetricGroup jobManagerJobMetricGroup) @@ -926,7 +968,9 @@ public void initializeJobVertex( executionHistorySizeLimit, rpcTimeout, createTimestamp, - this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId())); + this.initialAttemptCounts.getAttemptCounts(ejv.getJobVertexId()), + streamGraphSchedulingContext::getParallelism, + streamGraphSchedulingContext::getMaxParallelismOrDefault); ejv.connectToPredecessors(this.intermediateResults); @@ -1196,7 +1240,8 @@ public void initFailureCause(Throwable t, long timestamp) { public void jobVertexFinished() { assertRunningInJobMasterMainThread(); final int numFinished = ++numFinishedJobVertices; - if (numFinished == numJobVerticesTotal) { + if (numFinished == numJobVerticesTotal + && streamGraphSchedulingContext.getPendingOperatorCount() == 0) { FutureUtils.assertNoException( waitForAllExecutionsTermination().thenAccept(ignored -> jobFinished())); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java index ca5e6351cca28a..71fbb3fe845387 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java @@ -46,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageLoader; @@ -98,7 +99,8 @@ public static DefaultExecutionGraph buildGraph( ExecutionJobVertex.Factory executionJobVertexFactory, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, boolean nonFinishedHybridPartitionShouldBeUnknown, - JobManagerJobMetricGroup jobManagerJobMetricGroup) + JobManagerJobMetricGroup jobManagerJobMetricGroup, + StreamGraphSchedulingContext streamGraphSchedulingContext) throws JobExecutionException, JobException { checkNotNull(jobGraph, "job graph cannot be null"); @@ -170,7 +172,8 @@ public static DefaultExecutionGraph buildGraph( jobGraph.getJobStatusHooks(), markPartitionFinishedStrategy, taskDeploymentDescriptorFactory, - jobStatusChangedListeners); + jobStatusChangedListeners, + streamGraphSchedulingContext); // set the basic properties @@ -182,42 +185,8 @@ public static DefaultExecutionGraph buildGraph( executionGraph.setJsonPlan("{}"); } - // initialize the vertices that have a master initialization hook - // file output formats create directories here, input formats create splits - - final long initMasterStart = System.nanoTime(); - log.info("Running initialization on master for job {} ({}).", jobName, jobId); - - for (JobVertex vertex : jobGraph.getVertices()) { - String executableClass = vertex.getInvokableClassName(); - if (executableClass == null || executableClass.isEmpty()) { - throw new JobSubmissionException( - jobId, - "The vertex " - + vertex.getID() - + " (" - + vertex.getName() - + ") has no invokable class."); - } - - try { - vertex.initializeOnMaster( - new SimpleInitializeOnMasterContext( - classLoader, - vertexParallelismStore - .getParallelismInfo(vertex.getID()) - .getParallelism())); - } catch (Throwable t) { - throw new JobExecutionException( - jobId, - "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), - t); - } - } - - log.info( - "Successfully ran initialization on master in {} ms.", - (System.nanoTime() - initMasterStart) / 1_000_000); + initJobVerticesOnMaster( + jobGraph.getVertices(), classLoader, log, vertexParallelismStore, jobName, jobId); // topologically sort the job vertices and attach the graph to the existing one List sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources(); @@ -356,6 +325,52 @@ public static DefaultExecutionGraph buildGraph( return executionGraph; } + public static void initJobVerticesOnMaster( + Iterable jobVertices, + ClassLoader classLoader, + Logger log, + VertexParallelismStore vertexParallelismStore, + String jobName, + JobID jobId) + throws JobExecutionException { + // initialize the vertices that have a master initialization hook + // file output formats create directories here, input formats create splits + + final long initMasterStart = System.nanoTime(); + log.info("Running initialization on master for job {} ({}).", jobName, jobId); + + for (JobVertex vertex : jobVertices) { + String executableClass = vertex.getInvokableClassName(); + if (executableClass == null || executableClass.isEmpty()) { + throw new JobSubmissionException( + jobId, + "The vertex " + + vertex.getID() + + " (" + + vertex.getName() + + ") has no invokable class."); + } + + try { + vertex.initializeOnMaster( + new SimpleInitializeOnMasterContext( + classLoader, + vertexParallelismStore + .getParallelismInfo(vertex.getID()) + .getParallelism())); + } catch (Throwable t) { + throw new JobExecutionException( + jobId, + "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), + t); + } + } + + log.info( + "Successfully ran initialization on master in {} ms.", + (System.nanoTime() - initMasterStart) / 1_000_000); + } + public static boolean isCheckpointingEnabled(JobGraph jobGraph) { return jobGraph.getCheckpointingSettings() != null; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 2f39f939fffbaa..9f48b908d374ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.scheduler.InternalFailuresListener; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; @@ -248,6 +249,23 @@ void initializeJobVertex( */ void notifyNewlyInitializedJobVertices(List vertices); + /** + * Adds new job vertices to the execution graph based on the provided list of topologically + * sorted job vertices. + * + * @param topologicallySortedNewlyJobVertices a list of job vertices that are to be added, + * defined in topological order. + * @param jobManagerJobMetricGroup the metric group associated with the job manager for + * monitoring and metrics collection. + * @param newVerticesParallelismStore a store that maintains parallelism information for the + * newly added job vertices. + */ + void addNewJobVertices( + List topologicallySortedNewlyJobVertices, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + VertexParallelismStore newVerticesParallelismStore) + throws JobException; + Optional findVertexWithAttempt(final ExecutionAttemptID attemptId); Optional findExecution(final ExecutionAttemptID attemptId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index 2521914c872dce..e035ebb91efcf2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -185,12 +185,34 @@ public ExecutionJobVertex( } } + @VisibleForTesting protected void initialize( int executionHistorySizeLimit, Duration timeout, long createTimestamp, SubtaskAttemptNumberStore initialAttemptCounts) throws JobException { + initialize( + executionHistorySizeLimit, + timeout, + createTimestamp, + initialAttemptCounts, + i -> { + throw new UnsupportedOperationException(); + }, + i -> { + throw new UnsupportedOperationException(); + }); + } + + protected void initialize( + int executionHistorySizeLimit, + Duration timeout, + long createTimestamp, + SubtaskAttemptNumberStore initialAttemptCounts, + Function streamNodeParallelismRetriever, + Function streamNodeMaxParallelismRetriever) + throws JobException { checkState(parallelismInfo.getParallelism() > 0); checkState(!isInitialized()); @@ -211,7 +233,9 @@ protected void initialize( result, this, this.parallelismInfo.getParallelism(), - result.getResultType()); + result.getResultType(), + streamNodeParallelismRetriever, + streamNodeMaxParallelismRetriever); } // create all task vertices diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java index dc7dac2e375c42..042ea658002bde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java @@ -32,12 +32,15 @@ import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; +import org.apache.flink.streaming.api.graph.StreamEdge; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -71,14 +74,16 @@ public class IntermediateResult { private final Map shuffleDescriptorCache; - /** All consumer job vertex ids of this dataset. */ - private final List consumerVertices = new ArrayList<>(); + private final Function consumerStreamNodeParallelismRetriever; + private final Function consumerStreamNodeMaxParallelismRetriever; public IntermediateResult( IntermediateDataSet intermediateDataSet, ExecutionJobVertex producer, int numParallelProducers, - ResultPartitionType resultType) { + ResultPartitionType resultType, + Function consumerStreamNodeParallelismRetriever, + Function consumerStreamNodeMaxParallelismRetriever) { this.intermediateDataSet = checkNotNull(intermediateDataSet); this.id = checkNotNull(intermediateDataSet.getId()); @@ -102,9 +107,15 @@ public IntermediateResult( this.shuffleDescriptorCache = new HashMap<>(); - intermediateDataSet - .getConsumers() - .forEach(jobEdge -> consumerVertices.add(jobEdge.getTarget().getID())); + this.consumerStreamNodeParallelismRetriever = + checkNotNull(consumerStreamNodeParallelismRetriever); + + this.consumerStreamNodeMaxParallelismRetriever = + checkNotNull(consumerStreamNodeMaxParallelismRetriever); + } + + public boolean isAllConsumerVerticesCreated() { + return intermediateDataSet.isAllConsumerVerticesCreated(); } public void setPartition(int partitionNumber, IntermediateResultPartition partition) { @@ -135,7 +146,9 @@ public IntermediateResultPartition[] getPartitions() { } public List getConsumerVertices() { - return consumerVertices; + return intermediateDataSet.getConsumers().stream() + .map(jobEdge -> jobEdge.getTarget().getID()) + .collect(Collectors.toList()); } /** @@ -182,13 +195,23 @@ int getNumParallelProducers() { */ int getConsumersParallelism() { List consumers = intermediateDataSet.getConsumers(); - checkState(!consumers.isEmpty()); - InternalExecutionGraphAccessor graph = getProducer().getGraph(); - int consumersParallelism = - graph.getJobVertex(consumers.get(0).getTarget().getID()).getParallelism(); - if (consumers.size() == 1) { - return consumersParallelism; + Set consumerParallelisms; + if (consumers.isEmpty()) { + List consumerStreamEdges = intermediateDataSet.getOutputStreamEdges(); + checkState(!consumerStreamEdges.isEmpty()); + consumerParallelisms = + consumerStreamEdges.stream() + .map(StreamEdge::getTargetId) + .map(consumerStreamNodeParallelismRetriever) + .collect(Collectors.toSet()); + } else { + InternalExecutionGraphAccessor graph = getProducer().getGraph(); + consumerParallelisms = + getConsumerVertices().stream() + .map(graph::getJobVertex) + .map(ExecutionJobVertex::getParallelism) + .collect(Collectors.toSet()); } // sanity check, all consumer vertices must have the same parallelism: @@ -196,32 +219,35 @@ int getConsumersParallelism() { // graph), the parallelisms will all be -1 (parallelism not decided yet) // 2. for vertices that are initially assigned a parallelism, the parallelisms must be the // same, which is guaranteed at compilation phase - for (JobVertexID jobVertexID : consumerVertices) { - checkState( - consumersParallelism == graph.getJobVertex(jobVertexID).getParallelism(), - "Consumers must have the same parallelism."); - } - return consumersParallelism; + checkState(consumerParallelisms.size() == 1); + return consumerParallelisms.iterator().next(); } int getConsumersMaxParallelism() { List consumers = intermediateDataSet.getConsumers(); - checkState(!consumers.isEmpty()); - InternalExecutionGraphAccessor graph = getProducer().getGraph(); - int consumersMaxParallelism = - graph.getJobVertex(consumers.get(0).getTarget().getID()).getMaxParallelism(); - if (consumers.size() == 1) { - return consumersMaxParallelism; + Set consumerMaxParallelisms; + if (consumers.isEmpty()) { + List consumerStreamEdges = intermediateDataSet.getOutputStreamEdges(); + checkState(!consumerStreamEdges.isEmpty()); + consumerMaxParallelisms = + consumerStreamEdges.stream() + .map(StreamEdge::getTargetId) + .map(consumerStreamNodeMaxParallelismRetriever) + .collect(Collectors.toSet()); + } else { + InternalExecutionGraphAccessor graph = getProducer().getGraph(); + consumerMaxParallelisms = + getConsumerVertices().stream() + .map(graph::getJobVertex) + .map(ExecutionJobVertex::getMaxParallelism) + .collect(Collectors.toSet()); } - // sanity check, all consumer vertices must have the same max parallelism - for (JobVertexID jobVertexID : consumerVertices) { - checkState( - consumersMaxParallelism == graph.getJobVertex(jobVertexID).getMaxParallelism(), - "Consumers must have the same max parallelism."); - } - return consumersMaxParallelism; + checkState( + consumerMaxParallelisms.size() == 1, + "Consumers must have the same max parallelism."); + return consumerMaxParallelisms.iterator().next(); } public DistributionPattern getConsumingDistributionPattern() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 5f0c07165fea80..14cbe3cf7e16ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -88,13 +88,18 @@ public boolean canBeReleased() { != edgeManager.getNumberOfConsumedPartitionGroupsById(partitionId)) { return false; } + + // for dynamic graph, if any consumer vertex is still not initialized or not transfer to + // job vertex, this result partition can not be released + if (!totalResult.isAllConsumerVerticesCreated()) { + return false; + } for (JobVertexID jobVertexId : totalResult.getConsumerVertices()) { - // for dynamic graph, if any consumer vertex is still not initialized, this result - // partition can not be released if (!producer.getExecutionGraphAccessor().getJobVertex(jobVertexId).isInitialized()) { return false; } } + return true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java index fd756a242f593b..3c908f72dfce72 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java @@ -76,6 +76,10 @@ public List getConsumers() { return this.consumers; } + public boolean isAllConsumerVerticesCreated() { + return outputStreamEdges.isEmpty() || outputStreamEdges.size() == consumers.size(); + } + public boolean isBroadcast() { return isBroadcast; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java index 7a97a05c12ed8f..a83af3821fe897 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTrackerDeploymentListenerAdapter; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.slf4j.Logger; @@ -137,6 +138,7 @@ public ExecutionGraph createAndRestoreExecutionGraph( VertexParallelismStore vertexParallelismStore, ExecutionStateUpdateListener executionStateUpdateListener, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, + StreamGraphSchedulingContext streamGraphSchedulingContext, Logger log) throws Exception { ExecutionDeploymentListener executionDeploymentListener = @@ -175,7 +177,8 @@ public ExecutionGraph createAndRestoreExecutionGraph( executionJobVertexFactory, markPartitionFinishedStrategy, nonFinishedHybridPartitionShouldBeUnknown, - jobManagerJobMetricGroup); + jobManagerJobMetricGroup, + streamGraphSchedulingContext); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index b1b6c2dddd9e74..3ebf387e3d5c25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; @@ -136,7 +137,8 @@ protected DefaultScheduler( final ShuffleMaster shuffleMaster, final Duration rpcTimeout, final VertexParallelismStore vertexParallelismStore, - final ExecutionDeployer.Factory executionDeployerFactory) + final ExecutionDeployer.Factory executionDeployerFactory, + StreamGraphSchedulingContext streamGraphSchedulingContext) throws Exception { super( @@ -152,7 +154,8 @@ protected DefaultScheduler( mainThreadExecutor, jobStatusListener, executionGraphFactory, - vertexParallelismStore); + vertexParallelismStore, + streamGraphSchedulingContext); this.log = log; @@ -255,6 +258,10 @@ protected void onTaskFinished(final Execution execution, final IOMetrics ioMetri schedulingStrategy.onExecutionStateChange(executionVertexId, ExecutionState.FINISHED); } + protected ClassLoader getUserCodeLoader() { + return userCodeLoader; + } + @Override protected void onTaskFailed(final Execution execution) { checkState(execution.getState() == ExecutionState.FAILED); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index e1eccf509bb9df..20878b082681ca 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolService; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.streaming.api.graph.ExecutionPlan; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -170,7 +171,8 @@ public SchedulerNG createInstance( shuffleMaster, rpcTimeout, computeVertexParallelismStore(jobGraph), - new DefaultExecutionDeployer.Factory()); + new DefaultExecutionDeployer.Factory(), + DummyStreamGraphSchedulingContext.INSTANCE); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java index 19cc64c536cb5b..905d105c8e4f88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStore.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -87,4 +88,14 @@ public VertexParallelismInformation getParallelismInfo(JobVertexID vertexId) { "No parallelism information set for vertex %s", vertexId))); } + + @Override + public Map getAllParallelismInfo() { + return Collections.unmodifiableMap(vertexToParallelismInfo); + } + + @Override + public void mergeParallelismStore(VertexParallelismStore parallelismStore) { + parallelismStore.getAllParallelismInfo().forEach(this::setParallelismInfo); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java index 0d3bf5c0926cb7..cb5d6c719a0cdc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphFactory.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.MarkPartitionFinishedStrategy; import org.apache.flink.runtime.executiongraph.VertexAttemptNumberStore; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.slf4j.Logger; @@ -52,6 +53,8 @@ public interface ExecutionGraphFactory { * @param executionStateUpdateListener listener for state transitions of the individual * executions * @param log log to use for logging + * @param StreamGraphSchedulingContext stream graph topology context that retrieve execution + * context details for adaptive batch jobs * @return restored {@link ExecutionGraph} * @throws Exception if the {@link ExecutionGraph} could not be created and restored */ @@ -67,6 +70,7 @@ ExecutionGraph createAndRestoreExecutionGraph( VertexParallelismStore vertexParallelismStore, ExecutionStateUpdateListener executionStateUpdateListener, MarkPartitionFinishedStrategy markPartitionFinishedStrategy, + StreamGraphSchedulingContext streamGraphSchedulingContext, Logger log) throws Exception; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java index fc222474bf1216..ca68b2ec8e0216 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/MutableVertexParallelismStore.java @@ -29,4 +29,11 @@ public interface MutableVertexParallelismStore extends VertexParallelismStore { * @param info parallelism information for the given vertex */ void setParallelismInfo(JobVertexID vertexId, VertexParallelismInformation info); + + /** + * Merges the given parallelism store into the current store. + * + * @param parallelismStore The parallelism store to be merged. + */ + void mergeParallelismStore(VertexParallelismStore parallelismStore); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 635e869a88b07b..dcc3f4eb8a701a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -89,6 +89,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics; @@ -194,7 +195,8 @@ public SchedulerBase( final ComponentMainThreadExecutor mainThreadExecutor, final JobStatusListener jobStatusListener, final ExecutionGraphFactory executionGraphFactory, - final VertexParallelismStore vertexParallelismStore) + final VertexParallelismStore vertexParallelismStore, + StreamGraphSchedulingContext streamGraphSchedulingContext) throws Exception { this.log = checkNotNull(log); @@ -240,7 +242,8 @@ public SchedulerBase( initializationTimestamp, mainThreadExecutor, jobStatusListener, - vertexParallelismStore); + vertexParallelismStore, + streamGraphSchedulingContext); this.schedulingTopology = executionGraph.getSchedulingTopology(); @@ -391,7 +394,8 @@ private ExecutionGraph createAndRestoreExecutionGraph( long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, JobStatusListener jobStatusListener, - VertexParallelismStore vertexParallelismStore) + VertexParallelismStore vertexParallelismStore, + StreamGraphSchedulingContext streamGraphSchedulingContext) throws Exception { final ExecutionGraph newExecutionGraph = @@ -408,6 +412,7 @@ private ExecutionGraph createAndRestoreExecutionGraph( vertexParallelismStore, deploymentStateTimeMetrics, getMarkPartitionFinishedStrategy(), + streamGraphSchedulingContext, log); newExecutionGraph.setInternalTaskFailuresListener( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java index 8bd5adbcf314ec..d65589b8d93f82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java @@ -20,6 +20,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; +import java.util.Map; + /** * Contains the max parallelism per vertex, along with metadata about how these maxes were * calculated. @@ -33,4 +35,11 @@ public interface VertexParallelismStore { * @throws IllegalStateException if there is no parallelism information for the given vertex */ VertexParallelismInformation getParallelismInfo(JobVertexID vertexId); + + /** + * Gets a map of all vertex parallelism information. + * + * @return A map containing JobVertexID and corresponding VertexParallelismInformation. + */ + Map getAllParallelismInfo(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java index b99c49fbd52a03..c83bf96f24a4e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java @@ -85,8 +85,7 @@ public class DefaultExecutionTopology implements SchedulingTopology { private final Supplier> sortedExecutionVertexIds; - private final Map - logicalPipelinedRegionsByJobVertexId; + private Map logicalPipelinedRegionsByJobVertexId; /** Listeners that will be notified whenever the scheduling topology is updated. */ private final List schedulingTopologyListeners = new ArrayList<>(); @@ -163,12 +162,9 @@ public EdgeManager getEdgeManager() { return edgeManager; } - private static Map - computeLogicalPipelinedRegionsByJobVertexId(final ExecutionGraph executionGraph) { - List topologicallySortedJobVertices = - IterableUtils.toStream(executionGraph.getVerticesTopologically()) - .map(ExecutionJobVertex::getJobVertex) - .collect(Collectors.toList()); + public static Map + computeLogicalPipelinedRegionsByJobVertexId( + final List topologicallySortedJobVertices) { Iterable logicalPipelinedRegions = DefaultLogicalTopology.fromTopologicallySortedJobVertices( @@ -186,7 +182,14 @@ public EdgeManager getEdgeManager() { return logicalPipelinedRegionsByJobVertexId; } - public void notifyExecutionGraphUpdated( + public void notifyExecutionGraphUpdatedWithNewlyJobVertices( + List topologicallySortedJobVertices) { + this.logicalPipelinedRegionsByJobVertexId = + DefaultExecutionTopology.computeLogicalPipelinedRegionsByJobVertexId( + topologicallySortedJobVertices); + } + + public void notifyExecutionGraphUpdatedWithInitializedJobVertices( final DefaultExecutionGraph executionGraph, final List newlyInitializedJobVertices) { @@ -245,9 +248,12 @@ public static DefaultExecutionTopology fromExecutionGraph( .map(ExecutionVertex::getID) .collect(Collectors.toList()), edgeManager, - computeLogicalPipelinedRegionsByJobVertexId(executionGraph)); + computeLogicalPipelinedRegionsByJobVertexId( + IterableUtils.toStream(executionGraph.getVerticesTopologically()) + .map(ExecutionJobVertex::getJobVertex) + .collect(Collectors.toList()))); - schedulingTopology.notifyExecutionGraphUpdated( + schedulingTopology.notifyExecutionGraphUpdatedWithInitializedJobVertices( executionGraph, IterableUtils.toStream(executionGraph.getVerticesTopologically()) .filter(ExecutionJobVertex::isInitialized) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 6a3f0a4781f38f..a17602d6616e4f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -113,6 +113,7 @@ import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots; import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics; @@ -1441,6 +1442,7 @@ private ExecutionGraph createExecutionGraphAndRestoreState( // supports must be pipelined result partition, mark partition finish is // no need. rp -> false, + DummyStreamGraphSchedulingContext.INSTANCE, LOG); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index a0311cb7ba6337..f9d6054b488adc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; +import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -55,14 +56,14 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobEdge; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalResult; import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalVertex; +import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.scheduler.DefaultExecutionDeployer; import org.apache.flink.runtime.scheduler.DefaultScheduler; @@ -111,14 +112,12 @@ * This scheduler decides the parallelism of JobVertex according to the data volume it consumes. A * dynamically built up ExecutionGraph is used for this purpose. */ -public class AdaptiveBatchScheduler extends DefaultScheduler { +public class AdaptiveBatchScheduler extends DefaultScheduler implements JobGraphUpdateListener { - private final DefaultLogicalTopology logicalTopology; + private DefaultLogicalTopology logicalTopology; private final VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider; - private final Map forwardGroupsByJobVertexId; - private final Map blockingResultInfos; private final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint; @@ -138,9 +137,13 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { private final BatchJobRecoveryHandler jobRecoveryHandler; + private final AdaptiveExecutionHandler adaptiveExecutionHandler; + + private final int defaultMaxParallelism; + public AdaptiveBatchScheduler( final Logger log, - final JobGraph jobGraph, + final AdaptiveExecutionHandler adaptiveExecutionHandler, final Executor ioExecutor, final Configuration jobMasterConfiguration, final Consumer startUpAction, @@ -166,13 +169,13 @@ public AdaptiveBatchScheduler( final int defaultMaxParallelism, final BlocklistOperations blocklistOperations, final HybridPartitionDataConsumeConstraint hybridPartitionDataConsumeConstraint, - final Map forwardGroupsByJobVertexId, - final BatchJobRecoveryHandler jobRecoveryHandler) + final BatchJobRecoveryHandler jobRecoveryHandler, + final StreamGraphSchedulingContext streamGraphSchedulingContext) throws Exception { super( log, - jobGraph, + adaptiveExecutionHandler.getJobGraph(), ioExecutor, jobMasterConfiguration, startUpAction, @@ -195,16 +198,21 @@ public AdaptiveBatchScheduler( shuffleMaster, rpcTimeout, computeVertexParallelismStoreForDynamicGraph( - jobGraph.getVertices(), defaultMaxParallelism), - new DefaultExecutionDeployer.Factory()); + adaptiveExecutionHandler.getJobGraph().getVertices(), + defaultMaxParallelism), + new DefaultExecutionDeployer.Factory(), + streamGraphSchedulingContext); + + this.adaptiveExecutionHandler = checkNotNull(adaptiveExecutionHandler); + adaptiveExecutionHandler.registerJobGraphUpdateListener(this); + + this.defaultMaxParallelism = defaultMaxParallelism; - this.logicalTopology = DefaultLogicalTopology.fromJobGraph(jobGraph); + this.logicalTopology = DefaultLogicalTopology.fromJobGraph(getJobGraph()); this.vertexParallelismAndInputInfosDecider = checkNotNull(vertexParallelismAndInputInfosDecider); - this.forwardGroupsByJobVertexId = checkNotNull(forwardGroupsByJobVertexId); - this.blockingResultInfos = new HashMap<>(); this.hybridPartitionDataConsumeConstraint = hybridPartitionDataConsumeConstraint; @@ -241,6 +249,33 @@ private SpeculativeExecutionHandler createSpeculativeExecutionHandler( } } + @Override + public void onNewJobVerticesAdded(List newVertices, int pendingOperatorsCount) + throws Exception { + log.info("Received newly created job vertices: [{}]", newVertices); + + VertexParallelismStore vertexParallelismStore = + computeVertexParallelismStoreForDynamicGraph(newVertices, defaultMaxParallelism); + // 1. init vertex on master + DefaultExecutionGraphBuilder.initJobVerticesOnMaster( + newVertices, + getUserCodeLoader(), + log, + vertexParallelismStore, + getJobGraph().getName(), + getJobGraph().getJobID()); + + // 2. attach newly added job vertices + getExecutionGraph() + .addNewJobVertices(newVertices, jobManagerJobMetricGroup, vertexParallelismStore); + + // 3. update logical topology + logicalTopology = DefaultLogicalTopology.fromJobGraph(getJobGraph()); + + // 4. update json plan + getExecutionGraph().setJsonPlan(JsonPlanGenerator.generatePlan(getJobGraph())); + } + @Override protected void startSchedulingInternal() { speculativeExecutionHandler.init( @@ -356,6 +391,8 @@ protected void onTaskFinished(final Execution execution, final IOMetrics ioMetri checkNotNull(ioMetrics); updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes()); + notifyJobVertexFinishedIfPossible(execution.getVertex().getJobVertex()); + ExecutionVertexVersion currentVersion = executionVertexVersioner.getExecutionVertexVersion(execution.getVertex().getID()); tryComputeSourceParallelismThenRunAsync( @@ -568,6 +605,17 @@ static CompletableFuture mergeDynamicParallelismFutures( (a, b) -> a.thenCombine(b, Math::max)); } + private void notifyJobVertexFinishedIfPossible(ExecutionJobVertex jobVertex) { + Optional> producedResultsInfo = + getProducedResultsInfo(jobVertex); + + producedResultsInfo.ifPresent( + resultInfo -> + adaptiveExecutionHandler.handleJobEvent( + new ExecutionJobVertexFinishedEvent( + jobVertex.getJobVertexId(), resultInfo))); + } + @VisibleForTesting public void initializeVerticesIfPossible() { final List newlyInitializedJobVertices = new ArrayList<>(); @@ -626,8 +674,8 @@ public void initializeVerticesIfPossible() { private ParallelismAndInputInfos tryDecideParallelismAndInputInfos( final ExecutionJobVertex jobVertex, List inputs) { int vertexInitialParallelism = jobVertex.getParallelism(); - JobVertexForwardGroup forwardGroup = - forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId()); + ForwardGroup forwardGroup = + adaptiveExecutionHandler.getForwardGroupByJobVertexId(jobVertex.getJobVertexId()); if (!jobVertex.isParallelismDecided() && forwardGroup != null) { checkState(!forwardGroup.isParallelismDecided()); } @@ -666,39 +714,38 @@ private ParallelismAndInputInfos tryDecideParallelismAndInputInfos( checkState(parallelismAndInputInfos.getParallelism() == vertexInitialParallelism); } - if (forwardGroup != null && !forwardGroup.isParallelismDecided()) { - forwardGroup.setParallelism(parallelismAndInputInfos.getParallelism()); - - // When the parallelism for a forward group is determined, we ensure that the - // parallelism for all job vertices within that group is also set. - // This approach ensures that each forward edge produces single subpartition. - // - // This setting is crucial because the Sink V2 committer relies on the interplay - // between the CommittableSummary and the CommittableWithLineage, which are sent by - // the upstream Sink V2 Writer. The committer expects to receive CommittableSummary - // before CommittableWithLineage. - // - // If the number of subpartitions produced by a forward edge is greater than one, - // the ordering of these elements received by the committer cannot be assured, which - // would break the assumption that CommittableSummary is received before - // CommittableWithLineage. - for (JobVertexID jobVertexId : forwardGroup.getJobVertexIds()) { - ExecutionJobVertex executionJobVertex = getExecutionJobVertex(jobVertexId); - if (!executionJobVertex.isParallelismDecided()) { - log.info( - "Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", - executionJobVertex.getName(), - executionJobVertex.getJobVertexId(), - parallelismAndInputInfos.getParallelism()); - changeJobVertexParallelism( - executionJobVertex, parallelismAndInputInfos.getParallelism()); - } else { - checkState( - parallelismAndInputInfos.getParallelism() - == executionJobVertex.getParallelism()); - } - } - } + // When the parallelism for a forward group is determined, we ensure that the + // parallelism for all job vertices or stream nodes within that group is also set. + // This approach ensures that each forward edge produces single subpartition. + // + // This setting is crucial because the Sink V2 committer relies on the interplay + // between the CommittableSummary and the CommittableWithLineage, which are sent by + // the upstream Sink V2 Writer. The committer expects to receive CommittableSummary + // before CommittableWithLineage. + // + // If the number of subpartitions produced by a forward edge is greater than one, + // the ordering of these elements received by the committer cannot be assured, which + // would break the assumption that CommittableSummary is received before + // CommittableWithLineage. + adaptiveExecutionHandler.updateForwardGroupParallelism( + jobVertex.getJobVertexId(), + parallelismAndInputInfos.getParallelism(), + (jobVertexId, newParallelism) -> { + ExecutionJobVertex executionJobVertex = getExecutionJobVertex(jobVertexId); + if (!executionJobVertex.isParallelismDecided()) { + log.info( + "Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", + executionJobVertex.getName(), + executionJobVertex.getJobVertexId(), + parallelismAndInputInfos.getParallelism()); + changeJobVertexParallelism( + executionJobVertex, parallelismAndInputInfos.getParallelism()); + } else { + checkState( + parallelismAndInputInfos.getParallelism() + == executionJobVertex.getParallelism()); + } + }); return parallelismAndInputInfos; } @@ -804,6 +851,26 @@ private Optional> tryGetConsumedResultsInfo( return Optional.of(consumableResultInfo); } + private Optional> getProducedResultsInfo( + final ExecutionJobVertex jobVertex) { + if (!jobVertex.isFinished()) { + return Optional.empty(); + } + + Map producedResultInfo = new HashMap<>(); + + DefaultLogicalVertex logicalVertex = logicalTopology.getVertex(jobVertex.getJobVertexId()); + Iterable producedResults = logicalVertex.getProducedResults(); + + for (DefaultLogicalResult producedResult : producedResults) { + BlockingResultInfo resultInfo = + checkNotNull(blockingResultInfos.get(producedResult.getId())); + producedResultInfo.put(producedResult.getId(), resultInfo); + } + + return Optional.of(producedResultInfo); + } + private boolean canInitialize(final ExecutionJobVertex jobVertex) { if (jobVertex.isInitialized() || !jobVertex.isParallelismDecided()) { return false; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java index 91a578ea995697..f1ef35f9d01f63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java @@ -44,9 +44,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil; -import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup; import org.apache.flink.runtime.jobmaster.ExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.event.FileSystemJobEventStore; import org.apache.flink.runtime.jobmaster.event.JobEventManager; @@ -75,7 +72,10 @@ import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.util.SlotSelectionStrategyUtils; import org.apache.flink.streaming.api.graph.ExecutionPlan; +import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.ScheduledExecutor; @@ -86,7 +86,6 @@ import java.time.Duration; import java.util.Collection; -import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -123,12 +122,13 @@ public SchedulerNG createInstance( Collection failureEnrichers, BlocklistOperations blocklistOperations) throws Exception { - JobGraph jobGraph; + ExecutionConfig executionConfig; if (executionPlan instanceof JobGraph) { - jobGraph = (JobGraph) executionPlan; + executionConfig = + executionPlan.getSerializedExecutionConfig().deserializeValue(userCodeLoader); } else if (executionPlan instanceof StreamGraph) { - jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader); + executionConfig = ((StreamGraph) executionPlan).getExecutionConfig(); } else { throw new FlinkException( "Unsupported execution plan " + executionPlan.getClass().getCanonicalName()); @@ -145,20 +145,17 @@ public SchedulerNG createInstance( final ExecutionSlotAllocatorFactory allocatorFactory = createExecutionSlotAllocatorFactory(jobMasterConfiguration, slotPool); - ExecutionConfig executionConfig = - jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader); - final RestartBackoffTimeStrategy restartBackoffTimeStrategy = RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory( - jobGraph.getJobConfiguration(), + executionPlan.getJobConfiguration(), jobMasterConfiguration, - jobGraph.isCheckpointingEnabled()) + executionPlan.isCheckpointingEnabled()) .create(); log.info( "Using restart back off time strategy {} for {} ({}).", restartBackoffTimeStrategy, - jobGraph.getName(), - jobGraph.getJobID()); + executionPlan.getName(), + executionPlan.getJobID()); final boolean isJobRecoveryEnabled = jobMasterConfiguration.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED) @@ -167,7 +164,7 @@ public SchedulerNG createInstance( BatchJobRecoveryHandler jobRecoveryHandler; if (isJobRecoveryEnabled) { FileSystemJobEventStore jobEventStore = - new FileSystemJobEventStore(jobGraph.getJobID(), jobMasterConfiguration); + new FileSystemJobEventStore(executionPlan.getJobID(), jobMasterConfiguration); JobEventManager jobEventManager = new JobEventManager(jobEventStore); jobRecoveryHandler = new DefaultBatchJobRecoveryHandler(jobEventManager, jobMasterConfiguration); @@ -177,7 +174,7 @@ public SchedulerNG createInstance( return createScheduler( log, - jobGraph, + executionPlan, executionConfig, ioExecutor, jobMasterConfiguration, @@ -208,7 +205,7 @@ public SchedulerNG createInstance( @VisibleForTesting public static AdaptiveBatchScheduler createScheduler( Logger log, - JobGraph jobGraph, + ExecutionPlan executionPlan, ExecutionConfig executionConfig, Executor ioExecutor, Configuration jobMasterConfiguration, @@ -235,9 +232,9 @@ public static AdaptiveBatchScheduler createScheduler( throws Exception { checkState( - jobGraph.getJobType() == JobType.BATCH, + executionPlan.getJobType() == JobType.BATCH, "Adaptive batch scheduler only supports batch jobs"); - checkAllExchangesAreSupported(jobGraph); + checkAllExchangesAreSupported(executionPlan); final boolean enableSpeculativeExecution = jobMasterConfiguration.get(BatchExecutionOptions.SPECULATIVE_ENABLED); @@ -269,13 +266,13 @@ public static AdaptiveBatchScheduler createScheduler( int defaultMaxParallelism = getDefaultMaxParallelism(jobMasterConfiguration, executionConfig); - final Map forwardGroupsByJobVertexId = - ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism( - jobGraph.getVerticesSortedTopologicallyFromSources()); + AdaptiveExecutionHandler adaptiveExecutionHandler = + AdaptiveExecutionHandlerFactory.create( + executionPlan, userCodeLoader, futureExecutor); return new AdaptiveBatchScheduler( log, - jobGraph, + adaptiveExecutionHandler, ioExecutor, jobMasterConfiguration, componentMainThreadExecutor -> {}, @@ -301,8 +298,8 @@ public static AdaptiveBatchScheduler createScheduler( defaultMaxParallelism, blocklistOperations, hybridPartitionDataConsumeConstraint, - forwardGroupsByJobVertexId, - jobRecoveryHandler); + jobRecoveryHandler, + adaptiveExecutionHandler.createStreamGraphSchedulingContext(defaultMaxParallelism)); } public static InputConsumableDecider.Factory loadInputConsumableDeciderFactory( @@ -368,20 +365,31 @@ private static ExecutionJobVertex.Factory createExecutionJobVertexFactory( } } - private static void checkAllExchangesAreSupported(final JobGraph jobGraph) { - for (JobVertex jobVertex : jobGraph.getVertices()) { - for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) { - checkState( - dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition() - || dataSet.getResultType().isHybridResultPartition(), - String.format( - "At the moment, adaptive batch scheduler requires batch workloads " - + "to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. " - + "To do that, you need to configure '%s' to '%s' or '%s/%s'. ", - ExecutionOptions.BATCH_SHUFFLE_MODE.key(), - BatchShuffleMode.ALL_EXCHANGES_BLOCKING, - BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, - BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE)); + private static void checkAllExchangesAreSupported(final ExecutionPlan executionPlan) { + String errMsg = + String.format( + "At the moment, adaptive batch scheduler requires batch workloads " + + "to be executed with types of all edges being BLOCKING or HYBRID_FULL/HYBRID_SELECTIVE. " + + "To do that, you need to configure '%s' to '%s' or '%s/%s'. ", + ExecutionOptions.BATCH_SHUFFLE_MODE.key(), + BatchShuffleMode.ALL_EXCHANGES_BLOCKING, + BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, + BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE); + if (executionPlan instanceof JobGraph) { + for (JobVertex jobVertex : ((JobGraph) executionPlan).getVertices()) { + for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) { + checkState( + dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition() + || dataSet.getResultType().isHybridResultPartition(), + errMsg); + } + } + } else { + for (StreamNode streamNode : ((StreamGraph) executionPlan).getStreamNodes()) { + for (StreamEdge edge : streamNode.getOutEdges()) { + checkState( + !edge.getExchangeMode().equals(StreamExchangeMode.PIPELINED), errMsg); + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java index d687d9427ca2cd..0e0feb4c12e001 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java @@ -25,6 +25,9 @@ import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup; import org.apache.flink.runtime.jobmaster.event.JobEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.Map; import java.util.function.BiConsumer; @@ -33,6 +36,7 @@ /** A dummy implementation of {@link AdaptiveExecutionHandler}. */ public class DummyAdaptiveExecutionHandler implements AdaptiveExecutionHandler { + private final Logger log = LoggerFactory.getLogger(getClass()); private final JobGraph jobGraph; private final Map forwardGroupsByJobVertexId; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java index 5dd02043b80b0f..4ae63c3d3b8e47 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartitionTest.java @@ -22,11 +22,14 @@ import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.VertexParallelismStore; @@ -34,6 +37,11 @@ import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; +import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; @@ -41,11 +49,13 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; +import static org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link IntermediateResultPartition}. */ @@ -158,6 +168,84 @@ void testReleasePartitionGroups() throws Exception { assertThat(partition1.canBeReleased()).isFalse(); } + @Test + void testReleasePartitionGroupsForDynamicGroup() throws Exception { + JobVertex source = new JobVertex("source"); + source.setParallelism(1); + source.setInvokableClass(NoOpInvokable.class); + + JobVertex sink1 = new JobVertex("sink1"); + sink1.setParallelism(1); + sink1.setInvokableClass(NoOpInvokable.class); + + JobVertex sink2 = new JobVertex("sink2"); + sink2.setParallelism(1); + sink2.setInvokableClass(NoOpInvokable.class); + + // At first, create job graph with topology: source -> sink1 + IntermediateDataSetID dataSetId = new IntermediateDataSetID(); + sink1.connectNewDataSetAsInput( + source, + DistributionPattern.ALL_TO_ALL, + ResultPartitionType.BLOCKING, + dataSetId, + false); + JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(source, sink1); + jobGraph.setDynamic(true); + + SchedulerBase scheduler = + new DefaultSchedulerBuilder( + jobGraph, + ComponentMainThreadExecutorServiceAdapter.forMainThread(), + new DirectScheduledExecutorService()) + .buildAdaptiveBatchJobScheduler(); + + IntermediateDataSet dataSet = source.getProducedDataSets().get(0); + ExecutionJobVertex sourceVertex = scheduler.getExecutionJobVertex(source.getID()); + ExecutionJobVertex sinkVertex1 = scheduler.getExecutionJobVertex(sink1.getID()); + scheduler.getExecutionGraph().initializeJobVertex(sourceVertex, 1L); + scheduler.getExecutionGraph().initializeJobVertex(sinkVertex1, 1L); + + // add two stream edges + StreamNode sourceVertexDummy = + new StreamNode( + 0, null, null, (StreamOperator) null, null, SourceStreamTask.class); + StreamNode targetVertexDummy = + new StreamNode( + 0, null, null, (StreamOperator) null, null, SourceStreamTask.class); + dataSet.addOutputStreamEdge( + new StreamEdge( + sourceVertexDummy, targetVertexDummy, 0, new ShufflePartitioner<>(), null)); + dataSet.addOutputStreamEdge( + new StreamEdge( + sourceVertexDummy, targetVertexDummy, 0, new ShufflePartitioner<>(), null)); + + // mark partition can be released + IntermediateResultPartition partition = + sourceVertex.getProducedDataSets()[0].getPartitions()[0]; + assertThat(partition.canBeReleased()).isFalse(); + List consumedPartitionGroup = + partition.getConsumedPartitionGroups(); + partition.markPartitionGroupReleasable(consumedPartitionGroup.get(0)); + + // because not all job vertices are created, partition can not be released + assertThat(partition.canBeReleased()).isFalse(); + + // add a new job vertex + dataSet.addConsumer(new JobEdge(dataSet, sink2, DistributionPattern.ALL_TO_ALL, false)); + scheduler + .getExecutionGraph() + .addNewJobVertices( + Collections.singletonList(sink2), + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), + computeVertexParallelismStoreForDynamicGraph( + Collections.singletonList(sink2), 1)); + ExecutionJobVertex sinkVertex2 = scheduler.getExecutionJobVertex(sink2.getID()); + scheduler.getExecutionGraph().initializeJobVertex(sinkVertex2, 1L); + + assertThat(partition.canBeReleased()).isTrue(); + } + @Test void testGetNumberOfSubpartitionsForNonDynamicAllToAllGraph() throws Exception { testGetNumberOfSubpartitions(7, DistributionPattern.ALL_TO_ALL, false, Arrays.asList(7, 7)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java index 0280410892d5c9..9b29edc2190bf1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java @@ -41,6 +41,8 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.VertexParallelismStore; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleTestUtils; @@ -86,6 +88,8 @@ public static TestingDefaultExecutionGraphBuilder newBuilder() { checkpointStatsTrackerFactory = metricGroup -> NoOpCheckpointStatsTracker.INSTANCE; private boolean nonFinishedHybridPartitionShouldBeUnknown = false; + private StreamGraphSchedulingContext streamGraphSchedulingContext = + DummyStreamGraphSchedulingContext.INSTANCE; private TestingDefaultExecutionGraphBuilder() {} @@ -180,6 +184,12 @@ public TestingDefaultExecutionGraphBuilder setCheckpointStatsTracker( return this; } + public TestingDefaultExecutionGraphBuilder setStreamGraphSchedulingContext( + StreamGraphSchedulingContext streamGraphSchedulingContext) { + this.streamGraphSchedulingContext = streamGraphSchedulingContext; + return this; + } + private DefaultExecutionGraph build( boolean isDynamicGraph, ScheduledExecutorService executorService) throws JobException, JobExecutionException { @@ -212,7 +222,8 @@ private DefaultExecutionGraph build( executionJobVertexFactory, markPartitionFinishedStrategy, nonFinishedHybridPartitionShouldBeUnknown, - metricGroup); + metricGroup, + streamGraphSchedulingContext); } public DefaultExecutionGraph build(ScheduledExecutorService executorService) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java index 0ce37609d97b1f..7c8db0452a40fa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java @@ -39,12 +39,14 @@ import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker; import org.apache.flink.runtime.jobmaster.TestUtils; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; import org.apache.flink.runtime.shuffle.ShuffleTestUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.testutils.TestingUtils; @@ -67,7 +69,9 @@ import java.io.IOException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; @@ -116,6 +120,7 @@ void testRestoringModifiedJobFromSavepointFails() throws Exception { jobGraphWithNewOperator), (execution, previousState, newState) -> {}, rp -> false, + DummyStreamGraphSchedulingContext.INSTANCE, log)) .withFailMessage( "Expected ExecutionGraph creation to fail because of non restored state.") @@ -145,6 +150,7 @@ void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSucceeds() th SchedulerBase.computeVertexParallelismStore(jobGraphWithNewOperator), (execution, previousState, newState) -> {}, rp -> false, + DummyStreamGraphSchedulingContext.INSTANCE, log); final CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint(); @@ -183,11 +189,23 @@ public void addSpan(SpanBuilder spanBuilder) { TaskDeploymentDescriptorFactory.PartitionLocationConstraint.CAN_BE_UNKNOWN, 0L, new DefaultVertexAttemptNumberStore(), - vertexId -> - new DefaultVertexParallelismInfo( - 1, 1337, integer -> Optional.empty()), + new VertexParallelismStore() { + @Override + public VertexParallelismInformation getParallelismInfo( + JobVertexID vertexId) { + return new DefaultVertexParallelismInfo( + 1, 1337, integer -> Optional.empty()); + } + + @Override + public Map + getAllParallelismInfo() { + return Collections.emptyMap(); + } + }, (execution, previousState, newState) -> {}, rp -> false, + DummyStreamGraphSchedulingContext.INSTANCE, log); checkpointStatsTracker.reportRestoredCheckpoint( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java index d648d6e5570e9c..43b3949dc1cd79 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultSchedulerBuilder.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.scheduler.adaptivebatch.BatchJobRecoveryHandler; import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo; import org.apache.flink.runtime.scheduler.adaptivebatch.DummyBatchJobRecoveryHandler; +import org.apache.flink.runtime.scheduler.adaptivebatch.DummyStreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.adaptivebatch.VertexParallelismAndInputInfosDecider; import org.apache.flink.runtime.scheduler.strategy.AllFinishedInputConsumableDecider; import org.apache.flink.runtime.scheduler.strategy.InputConsumableDecider; @@ -326,7 +327,8 @@ public DefaultScheduler build() throws Exception { shuffleMaster, rpcTimeout, computeVertexParallelismStore(jobGraph), - executionDeployerFactory); + executionDeployerFactory, + DummyStreamGraphSchedulingContext.INSTANCE); } public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler() throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java index eb109b8300b36f..9134657fce692f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismStoreTest.java @@ -49,6 +49,42 @@ void testSetInfo() { assertThat(storedInfo).isEqualTo(info); } + @Test + void testGetAllInfos() { + JobVertexID id = new JobVertexID(); + JobVertexID id2 = new JobVertexID(); + VertexParallelismInformation info = new MockVertexParallelismInfo(); + VertexParallelismInformation info2 = new MockVertexParallelismInfo(); + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + + store.setParallelismInfo(id, info); + store.setParallelismInfo(id2, info2); + + assertThat(store.getParallelismInfo(id)).isEqualTo(info); + assertThat(store.getParallelismInfo(id2)).isEqualTo(info2); + } + + @Test + void testMergeParallelismStore() { + JobVertexID id = new JobVertexID(); + JobVertexID id2 = new JobVertexID(); + VertexParallelismInformation info = new MockVertexParallelismInfo(); + VertexParallelismInformation info2 = new MockVertexParallelismInfo(); + DefaultVertexParallelismStore store = new DefaultVertexParallelismStore(); + DefaultVertexParallelismStore store2 = new DefaultVertexParallelismStore(); + store.setParallelismInfo(id, info); + store2.setParallelismInfo(id2, info2); + + assertThat(store.getParallelismInfo(id)).isEqualTo(info); + assertThatThrownBy(() -> store.getParallelismInfo(id2)) + .isInstanceOf(IllegalStateException.class); + + store.mergeParallelismStore(store2); + + assertThat(store.getParallelismInfo(id)).isEqualTo(info); + assertThat(store.getParallelismInfo(id2)).isEqualTo(info2); + } + private static final class MockVertexParallelismInfo implements VertexParallelismInformation { @Override public int getMinParallelism() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java index b054913b51a4b9..faabd723a7c0b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java @@ -26,16 +26,25 @@ import org.apache.flink.runtime.executiongraph.IntermediateResultPartition; import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphSchedulingContext; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.runtime.scheduler.strategy.ResultPartitionState; import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion; +import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; +import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.IterableUtils; @@ -47,6 +56,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Iterator; @@ -60,6 +70,7 @@ import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.BLOCKING; import static org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED; import static org.apache.flink.runtime.jobgraph.DistributionPattern.ALL_TO_ALL; +import static org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.computeVertexParallelismStoreForDynamicGraph; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -169,7 +180,7 @@ void testErrorIfCoLocatedTasksAreNotInSameRegion() { } @Test - void testUpdateTopology() throws Exception { + void testUpdateTopologyWithInitializedJobVertices() throws Exception { final JobVertex[] jobVertices = createJobVertices(BLOCKING); executionGraph = createDynamicGraph(jobVertices); adapter = DefaultExecutionTopology.fromExecutionGraph(executionGraph); @@ -178,16 +189,78 @@ void testUpdateTopology() throws Exception { final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID()); executionGraph.initializeJobVertex(ejv1, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv1)); assertThat(adapter.getVertices()).hasSize(3); executionGraph.initializeJobVertex(ejv2, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv2)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv2)); assertThat(adapter.getVertices()).hasSize(6); assertGraphEquals(executionGraph, adapter); } + @Test + void testUpdateTopologyWithNewlyAddedJobVertices() throws Exception { + final int parallelism = 3; + JobVertex jobVertex1 = createNoOpVertex(parallelism); + IntermediateDataSetID dataSetId = new IntermediateDataSetID(); + + StreamNode sourceVertexDummy = + new StreamNode( + 0, null, null, (StreamOperator) null, null, SourceStreamTask.class); + StreamNode targetVertexDummy = + new StreamNode(0, null, null, (StreamOperator) null, null, StreamTask.class); + jobVertex1 + .getOrCreateResultDataSet(dataSetId, BLOCKING) + .addOutputStreamEdge( + new StreamEdge( + sourceVertexDummy, + targetVertexDummy, + 0, + new ShufflePartitioner(), + null)); + + executionGraph = + createDynamicGraph( + new TestingStreamGraphSchedulingContext(parallelism, parallelism), + jobVertex1); + adapter = DefaultExecutionTopology.fromExecutionGraph(executionGraph); + + // Operating execution graph: 1. Initialize job vertex1 + assertThat(executionGraph.getAllVertices()).hasSize(1); + final ExecutionJobVertex ejv1 = executionGraph.getJobVertex(jobVertex1.getID()); + executionGraph.initializeJobVertex(ejv1, 0L); + + // Operating execution graph: 2. Add job vertex2 + JobVertex jobVertex2 = createNoOpVertex(parallelism); + jobVertex2.connectNewDataSetAsInput(jobVertex1, ALL_TO_ALL, BLOCKING, dataSetId, false); + + // Operating execution graph: 3. Initialize job vertex2 + List newJobVertices = Collections.singletonList(jobVertex2); + executionGraph.addNewJobVertices( + newJobVertices, + UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup(), + computeVertexParallelismStoreForDynamicGraph(newJobVertices, parallelism)); + final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertex2.getID()); + executionGraph.initializeJobVertex(ejv2, 0L); + + // Operating execution topology graph: 1. notify job vertex1 initialized + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv1)); + + // Operating execution topology graph: 2. notify job vertex2 added + adapter.notifyExecutionGraphUpdatedWithNewlyJobVertices( + Arrays.asList(jobVertex1, jobVertex2)); + + // Operating execution topology graph: 3. notify job vertex2 initialized + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv2)); + + assertGraphEquals(executionGraph, adapter); + } + @Test void testErrorIfUpdateTopologyWithNewVertexPipelinedConnectedToOldOnes() throws Exception { final JobVertex[] jobVertices = createJobVertices(PIPELINED); @@ -198,12 +271,13 @@ void testErrorIfUpdateTopologyWithNewVertexPipelinedConnectedToOldOnes() throws final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID()); executionGraph.initializeJobVertex(ejv1, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv1)); executionGraph.initializeJobVertex(ejv2, 0L); assertThatThrownBy( () -> - adapter.notifyExecutionGraphUpdated( + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( executionGraph, Collections.singletonList(ejv2))) .isInstanceOf(IllegalStateException.class); } @@ -218,12 +292,14 @@ void testExistingRegionsAreNotAffectedDuringTopologyUpdate() throws Exception { final ExecutionJobVertex ejv2 = executionGraph.getJobVertex(jobVertices[1].getID()); executionGraph.initializeJobVertex(ejv1, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv1)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv1)); SchedulingPipelinedRegion regionOld = adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0)); executionGraph.initializeJobVertex(ejv2, 0L); - adapter.notifyExecutionGraphUpdated(executionGraph, Collections.singletonList(ejv2)); + adapter.notifyExecutionGraphUpdatedWithInitializedJobVertices( + executionGraph, Collections.singletonList(ejv2)); SchedulingPipelinedRegion regionNew = adapter.getPipelinedRegionOfVertex(new ExecutionVertexID(ejv1.getJobVertexId(), 0)); @@ -246,6 +322,15 @@ private DefaultExecutionGraph createDynamicGraph(JobVertex... jobVertices) throw .buildDynamicGraph(EXECUTOR_RESOURCE.getExecutor()); } + private DefaultExecutionGraph createDynamicGraph( + StreamGraphSchedulingContext streamGraphSchedulingContext, JobVertex... jobVertices) + throws Exception { + return TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobGraph(new JobGraph(new JobID(), "TestJob", jobVertices)) + .setStreamGraphSchedulingContext(streamGraphSchedulingContext) + .buildDynamicGraph(EXECUTOR_RESOURCE.getExecutor()); + } + private void assertRegionContainsAllVertices( final DefaultSchedulingPipelinedRegion pipelinedRegionOfVertex) { final Set allVertices = @@ -349,4 +434,31 @@ private static void assertPartitionEquals( assertThat(adaptedPartition.getProducer().getId()) .isEqualTo(originalPartition.getProducer().getID()); } + + private static class TestingStreamGraphSchedulingContext + implements StreamGraphSchedulingContext { + + private final int parallelism; + private final int maxParallelism; + + private TestingStreamGraphSchedulingContext(int parallelism, int maxParallelism) { + this.parallelism = parallelism; + this.maxParallelism = maxParallelism; + } + + @Override + public int getParallelism(int streamNodeId) { + return parallelism; + } + + @Override + public int getMaxParallelismOrDefault(int streamNodeId) { + return maxParallelism; + } + + @Override + public int getPendingOperatorCount() { + return 0; + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java index 544595ee2cc56a..ce49ff5bded0e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java @@ -57,6 +57,7 @@ import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.scheduler.InternalFailuresListener; +import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.exceptionhistory.TestingAccessExecution; import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; @@ -404,6 +405,15 @@ public void notifyNewlyInitializedJobVertices(List vertices) throw new UnsupportedOperationException(); } + @Override + public void addNewJobVertices( + List sortedJobVertices, + JobManagerJobMetricGroup jobManagerJobMetricGroup, + VertexParallelismStore newVerticesParallelismStore) + throws JobException { + throw new UnsupportedOperationException(); + } + public void registerExecution(TestingAccessExecution execution) { executions.put(execution.getAttemptId(), execution); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java index c67877783be304..30801a9b0f9866 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java @@ -133,7 +133,7 @@ private static StreamGraph getStreamGraph( int sourceMaxParallelism, int sinkParallelism, int sinkMaxParallelism) { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamExecutionEnvironment env = new StreamExecutionEnvironment(); env.fromSequence(0L, 1L).disableChaining().print(); StreamGraph streamGraph = env.getStreamGraph();