Skip to content

Commit

Permalink
[FLINK-36068][runtime] Introduce AdaptiveExecutionHandler for handlin…
Browse files Browse the repository at this point in the history
…g the adaptive execution of batch jobs.
  • Loading branch information
JunRuiLee committed Nov 16, 2024
1 parent d017410 commit 00c1992
Show file tree
Hide file tree
Showing 16 changed files with 996 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.jobmaster.event;

import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;

import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** This class is used to record the completion info of {@link ExecutionJobVertex}. */
public class ExecutionJobVertexFinishedEvent implements JobEvent {

private final JobVertexID vertexId;

private final Map<IntermediateDataSetID, BlockingResultInfo> resultInfo;

public ExecutionJobVertexFinishedEvent(
JobVertexID vertexId, Map<IntermediateDataSetID, BlockingResultInfo> resultInfo) {
this.vertexId = checkNotNull(vertexId);
this.resultInfo = checkNotNull(resultInfo);
}

public JobVertexID getVertexId() {
return vertexId;
}

public Map<IntermediateDataSetID, BlockingResultInfo> getResultInfo() {
return resultInfo;
}

@Override
public String toString() {
return "ExecutionJobVertexFinishedEvent{"
+ "vertexId="
+ vertexId
+ ", resultInfos="
+ resultInfo
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class JobEvents {
jobEventTypeIdMapping.put(ExecutionVertexFinishedEvent.class, 1);
jobEventTypeIdMapping.put(ExecutionVertexResetEvent.class, 2);
jobEventTypeIdMapping.put(ExecutionJobVertexInitializedEvent.class, 3);
jobEventTypeIdMapping.put(ExecutionJobVertexFinishedEvent.class, 4);
}

public static int getTypeID(Class<? extends JobEvent> clazz) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,12 @@ private static int normalizeParallelism(int parallelism) {
* @return the computed max parallelism
*/
public static int getDefaultMaxParallelism(JobVertex vertex) {
return getDefaultMaxParallelism(vertex.getParallelism());
}

public static int getDefaultMaxParallelism(int parallelism) {
return KeyGroupRangeAssignment.computeDefaultMaxParallelism(
normalizeParallelism(vertex.getParallelism()));
normalizeParallelism(parallelism));
}

public static VertexParallelismStore computeVertexParallelismStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,16 +858,18 @@ public static VertexParallelismStore computeVertexParallelismStoreForDynamicGrap
// global default max parallelism.
return computeVertexParallelismStore(
vertices,
v -> {
if (v.getParallelism() > 0) {
return getDefaultMaxParallelism(v);
} else {
return defaultMaxParallelism;
}
},
v -> computeMaxParallelism(v.getParallelism(), defaultMaxParallelism),
Function.identity());
}

public static int computeMaxParallelism(int parallelism, int defaultMaxParallelism) {
if (parallelism > 0) {
return getDefaultMaxParallelism(parallelism);
} else {
return defaultMaxParallelism;
}
}

private static void resetDynamicParallelism(Iterable<JobVertex> vertices) {
for (JobVertex vertex : vertices) {
if (vertex.isDynamicParallelism()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.adaptivebatch;

import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.jobmaster.event.JobEvent;

import java.util.function.BiConsumer;

/**
* The {@code AdaptiveExecutionHandler} interface defines the operations for handling the adaptive
* execution of batch jobs. This includes acquiring the current job graph and dynamically adjusting
* the job topology in response to job events.
*/
public interface AdaptiveExecutionHandler {

/**
* Returns the {@code JobGraph} representing the batch job.
*
* @return the JobGraph.
*/
JobGraph getJobGraph();

/**
* Handles the provided {@code JobEvent}, attempting dynamic modifications to the {@code
* StreamGraph} based on the specifics of the job event.
*
* @param jobEvent The job event to handle. This event contains the necessary information that
* might trigger adjustments to the StreamGraph.
*/
void handleJobEvent(JobEvent jobEvent);

/**
* Registers a listener to receive updates when the {@code JobGraph} is modified.
*
* @param listener the listener to register for JobGraph updates.
*/
void registerJobGraphUpdateListener(JobGraphUpdateListener listener);

/**
* Retrieves the {@code ForwardGroup} for a given JobVertex ID.
*
* @param jobVertexId the ID of the JobVertex.
* @return the corresponding ForwardGroup.
*/
ForwardGroup getForwardGroupByJobVertexId(JobVertexID jobVertexId);

/**
* Updates the parallelism of the forward group associated with a specific JobVertex id. This
* update also applies to the parallelism of all StreamNodes or JobVertices that belong to this
* forward group.
*
* @param jobVertexId the ID of the JobVertex.
* @param newParallelism the new parallelism level to set.
* @param jobVertexParallelismUpdater a BiConsumer that updates the execution JobVertex
* parallelism.
*/
void updateForwardGroupParallelism(
JobVertexID jobVertexId,
int newParallelism,
BiConsumer<JobVertexID, Integer> jobVertexParallelismUpdater);

/**
* Creates an instance of {@code StreamGraphSchedulingContext}.
*
* @param defaultMaxParallelism the default value for maximum parallelism.
* @return an instance of {@code StreamGraphSchedulingContext}.
*/
StreamGraphSchedulingContext createStreamGraphSchedulingContext(int defaultMaxParallelism);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.adaptivebatch;

import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.StreamGraph;

import java.util.concurrent.Executor;

import static org.apache.flink.util.Preconditions.checkState;

/**
* A factory class for creating instances of {@link AdaptiveExecutionHandler}. This factory provides
* a method to create an appropriate handler based on the type of the execution plan.
*/
public class AdaptiveExecutionHandlerFactory {

/**
* Creates an instance of {@link AdaptiveExecutionHandler} based on the provided execution plan.
*
* @param executionPlan The execution plan, which can be either a {@link JobGraph} or a {@link
* StreamGraph}.
* @param userClassLoader The class loader for the user code.
* @param serializationExecutor The executor used for serialization tasks.
* @return An instance of {@link AdaptiveExecutionHandler}.
* @throws IllegalArgumentException if the execution plan is neither a {@link JobGraph} nor a
* {@link StreamGraph}.
*/
public static AdaptiveExecutionHandler create(
ExecutionPlan executionPlan,
ClassLoader userClassLoader,
Executor serializationExecutor) {
if (executionPlan instanceof JobGraph) {
return new DummyAdaptiveExecutionHandler((JobGraph) executionPlan);
} else {
checkState(executionPlan instanceof StreamGraph, "Unsupported execution plan.");
return new DefaultAdaptiveExecutionHandler(
userClassLoader, (StreamGraph) executionPlan, serializationExecutor);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler.adaptivebatch;

import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup;
import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent;
import org.apache.flink.runtime.jobmaster.event.JobEvent;
import org.apache.flink.streaming.api.graph.AdaptiveGraphManager;
import org.apache.flink.streaming.api.graph.StreamGraph;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;

/** Default implementation of {@link AdaptiveExecutionHandler}. */
public class DefaultAdaptiveExecutionHandler implements AdaptiveExecutionHandler {

private final Logger log = LoggerFactory.getLogger(DefaultAdaptiveExecutionHandler.class);

private final List<JobGraphUpdateListener> jobGraphUpdateListeners = new ArrayList<>();

private final AdaptiveGraphManager adaptiveGraphManager;

public DefaultAdaptiveExecutionHandler(
ClassLoader userClassloader, StreamGraph streamGraph, Executor serializationExecutor) {
this.adaptiveGraphManager =
new AdaptiveGraphManager(userClassloader, streamGraph, serializationExecutor);
}

@Override
public JobGraph getJobGraph() {
return adaptiveGraphManager.getJobGraph();
}

@Override
public void handleJobEvent(JobEvent jobEvent) {
try {
tryUpdateJobGraph(jobEvent);
} catch (Exception e) {
log.error("Failed to handle job event {}.", jobEvent, e);
throw new RuntimeException(e);
}
}

private void tryUpdateJobGraph(JobEvent jobEvent) throws Exception {
if (jobEvent instanceof ExecutionJobVertexFinishedEvent) {
ExecutionJobVertexFinishedEvent event = (ExecutionJobVertexFinishedEvent) jobEvent;

List<JobVertex> newlyCreatedJobVertices =
adaptiveGraphManager.onJobVertexFinished(event.getVertexId());

if (!newlyCreatedJobVertices.isEmpty()) {
notifyJobGraphUpdated(
newlyCreatedJobVertices, adaptiveGraphManager.getPendingOperatorsCount());
}
}
}

private void notifyJobGraphUpdated(List<JobVertex> jobVertices, int pendingOperatorsCount)
throws Exception {
for (JobGraphUpdateListener listener : jobGraphUpdateListeners) {
listener.onNewJobVerticesAdded(jobVertices, pendingOperatorsCount);
}
}

@Override
public void registerJobGraphUpdateListener(JobGraphUpdateListener listener) {
jobGraphUpdateListeners.add(listener);
}

@Override
public ForwardGroup getForwardGroupByJobVertexId(JobVertexID jobVertexId) {
return adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(jobVertexId);
}

@Override
public void updateForwardGroupParallelism(
JobVertexID jobVertexId,
int newParallelism,
BiConsumer<JobVertexID, Integer> jobVertexParallelismUpdater) {
StreamNodeForwardGroup forwardGroup =
adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(jobVertexId);

if (forwardGroup != null && !forwardGroup.isParallelismDecided()) {
forwardGroup.setParallelism(newParallelism);

forwardGroup
.getChainedStreamNodeGroups()
.forEach(
chainedStreamNodeGroup ->
chainedStreamNodeGroup.forEach(
streamNode ->
adaptiveGraphManager
.updateStreamNodeParallelism(
streamNode.getId(),
newParallelism)));
forwardGroup
.getStartNodes()
.forEach(
streamNode ->
adaptiveGraphManager
.findVertexByStreamNodeId(streamNode.getId())
.ifPresent(
id ->
jobVertexParallelismUpdater.accept(
id, newParallelism)));
}
}

@Override
public StreamGraphSchedulingContext createStreamGraphSchedulingContext(
int defaultMaxParallelism) {
return new DefaultStreamGraphSchedulingContext(adaptiveGraphManager, defaultMaxParallelism);
}
}
Loading

0 comments on commit 00c1992

Please sign in to comment.