From 00c199267e6c99a5c2fe4810fdd43084834fa53f Mon Sep 17 00:00:00 2001 From: JunRuiLee Date: Sat, 16 Nov 2024 17:20:21 +0800 Subject: [PATCH] [FLINK-36068][runtime] Introduce AdaptiveExecutionHandler for handling the adaptive execution of batch jobs. --- .../ExecutionJobVertexFinishedEvent.java | 60 ++++++ .../runtime/jobmaster/event/JobEvents.java | 1 + .../runtime/scheduler/SchedulerBase.java | 6 +- .../adaptivebatch/AdaptiveBatchScheduler.java | 16 +- .../AdaptiveExecutionHandler.java | 88 +++++++++ .../AdaptiveExecutionHandlerFactory.java | 58 ++++++ .../DefaultAdaptiveExecutionHandler.java | 139 ++++++++++++++ .../DefaultStreamGraphSchedulingContext.java | 68 +++++++ .../DummyAdaptiveExecutionHandler.java | 87 +++++++++ .../DummyStreamGraphSchedulingContext.java | 43 +++++ .../adaptivebatch/JobGraphUpdateListener.java | 41 ++++ .../StreamGraphSchedulingContext.java | 47 +++++ .../api/graph/AdaptiveGraphManager.java | 6 +- .../api/graph/util/ImmutableStreamNode.java | 8 + .../DefaultAdaptiveExecutionHandlerTest.java | 161 ++++++++++++++++ ...faultStreamGraphSchedulingContextTest.java | 176 ++++++++++++++++++ 16 files changed, 996 insertions(+), 9 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/ExecutionJobVertexFinishedEvent.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContext.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyStreamGraphSchedulingContext.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/JobGraphUpdateListener.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphSchedulingContext.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/ExecutionJobVertexFinishedEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/ExecutionJobVertexFinishedEvent.java new file mode 100644 index 00000000000000..c7f7529d576e28 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/ExecutionJobVertexFinishedEvent.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.event; + +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo; + +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** This class is used to record the completion info of {@link ExecutionJobVertex}. */ +public class ExecutionJobVertexFinishedEvent implements JobEvent { + + private final JobVertexID vertexId; + + private final Map resultInfo; + + public ExecutionJobVertexFinishedEvent( + JobVertexID vertexId, Map resultInfo) { + this.vertexId = checkNotNull(vertexId); + this.resultInfo = checkNotNull(resultInfo); + } + + public JobVertexID getVertexId() { + return vertexId; + } + + public Map getResultInfo() { + return resultInfo; + } + + @Override + public String toString() { + return "ExecutionJobVertexFinishedEvent{" + + "vertexId=" + + vertexId + + ", resultInfos=" + + resultInfo + + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/JobEvents.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/JobEvents.java index 593b759f4d1cac..45def8abdcde29 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/JobEvents.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/event/JobEvents.java @@ -33,6 +33,7 @@ public class JobEvents { jobEventTypeIdMapping.put(ExecutionVertexFinishedEvent.class, 1); jobEventTypeIdMapping.put(ExecutionVertexResetEvent.class, 2); jobEventTypeIdMapping.put(ExecutionJobVertexInitializedEvent.class, 3); + jobEventTypeIdMapping.put(ExecutionJobVertexFinishedEvent.class, 4); } public static int getTypeID(Class clazz) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 2b001cc072e2ba..635e869a88b07b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -299,8 +299,12 @@ private static int normalizeParallelism(int parallelism) { * @return the computed max parallelism */ public static int getDefaultMaxParallelism(JobVertex vertex) { + return getDefaultMaxParallelism(vertex.getParallelism()); + } + + public static int getDefaultMaxParallelism(int parallelism) { return KeyGroupRangeAssignment.computeDefaultMaxParallelism( - normalizeParallelism(vertex.getParallelism())); + normalizeParallelism(parallelism)); } public static VertexParallelismStore computeVertexParallelismStore( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index 204ed3428bfebf..a0311cb7ba6337 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -858,16 +858,18 @@ public static VertexParallelismStore computeVertexParallelismStoreForDynamicGrap // global default max parallelism. return computeVertexParallelismStore( vertices, - v -> { - if (v.getParallelism() > 0) { - return getDefaultMaxParallelism(v); - } else { - return defaultMaxParallelism; - } - }, + v -> computeMaxParallelism(v.getParallelism(), defaultMaxParallelism), Function.identity()); } + public static int computeMaxParallelism(int parallelism, int defaultMaxParallelism) { + if (parallelism > 0) { + return getDefaultMaxParallelism(parallelism); + } else { + return defaultMaxParallelism; + } + } + private static void resetDynamicParallelism(Iterable vertices) { for (JobVertex vertex : vertices) { if (vertex.isDynamicParallelism()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandler.java new file mode 100644 index 00000000000000..05f0761081d0b9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandler.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; +import org.apache.flink.runtime.jobmaster.event.JobEvent; + +import java.util.function.BiConsumer; + +/** + * The {@code AdaptiveExecutionHandler} interface defines the operations for handling the adaptive + * execution of batch jobs. This includes acquiring the current job graph and dynamically adjusting + * the job topology in response to job events. + */ +public interface AdaptiveExecutionHandler { + + /** + * Returns the {@code JobGraph} representing the batch job. + * + * @return the JobGraph. + */ + JobGraph getJobGraph(); + + /** + * Handles the provided {@code JobEvent}, attempting dynamic modifications to the {@code + * StreamGraph} based on the specifics of the job event. + * + * @param jobEvent The job event to handle. This event contains the necessary information that + * might trigger adjustments to the StreamGraph. + */ + void handleJobEvent(JobEvent jobEvent); + + /** + * Registers a listener to receive updates when the {@code JobGraph} is modified. + * + * @param listener the listener to register for JobGraph updates. + */ + void registerJobGraphUpdateListener(JobGraphUpdateListener listener); + + /** + * Retrieves the {@code ForwardGroup} for a given JobVertex ID. + * + * @param jobVertexId the ID of the JobVertex. + * @return the corresponding ForwardGroup. + */ + ForwardGroup getForwardGroupByJobVertexId(JobVertexID jobVertexId); + + /** + * Updates the parallelism of the forward group associated with a specific JobVertex id. This + * update also applies to the parallelism of all StreamNodes or JobVertices that belong to this + * forward group. + * + * @param jobVertexId the ID of the JobVertex. + * @param newParallelism the new parallelism level to set. + * @param jobVertexParallelismUpdater a BiConsumer that updates the execution JobVertex + * parallelism. + */ + void updateForwardGroupParallelism( + JobVertexID jobVertexId, + int newParallelism, + BiConsumer jobVertexParallelismUpdater); + + /** + * Creates an instance of {@code StreamGraphSchedulingContext}. + * + * @param defaultMaxParallelism the default value for maximum parallelism. + * @return an instance of {@code StreamGraphSchedulingContext}. + */ + StreamGraphSchedulingContext createStreamGraphSchedulingContext(int defaultMaxParallelism); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java new file mode 100644 index 00000000000000..49fdd3e89c9cfb --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandlerFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.streaming.api.graph.ExecutionPlan; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import java.util.concurrent.Executor; + +import static org.apache.flink.util.Preconditions.checkState; + +/** + * A factory class for creating instances of {@link AdaptiveExecutionHandler}. This factory provides + * a method to create an appropriate handler based on the type of the execution plan. + */ +public class AdaptiveExecutionHandlerFactory { + + /** + * Creates an instance of {@link AdaptiveExecutionHandler} based on the provided execution plan. + * + * @param executionPlan The execution plan, which can be either a {@link JobGraph} or a {@link + * StreamGraph}. + * @param userClassLoader The class loader for the user code. + * @param serializationExecutor The executor used for serialization tasks. + * @return An instance of {@link AdaptiveExecutionHandler}. + * @throws IllegalArgumentException if the execution plan is neither a {@link JobGraph} nor a + * {@link StreamGraph}. + */ + public static AdaptiveExecutionHandler create( + ExecutionPlan executionPlan, + ClassLoader userClassLoader, + Executor serializationExecutor) { + if (executionPlan instanceof JobGraph) { + return new DummyAdaptiveExecutionHandler((JobGraph) executionPlan); + } else { + checkState(executionPlan instanceof StreamGraph, "Unsupported execution plan."); + return new DefaultAdaptiveExecutionHandler( + userClassLoader, (StreamGraph) executionPlan, serializationExecutor); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandler.java new file mode 100644 index 00000000000000..004ae2e0082fe8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandler.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; +import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup; +import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent; +import org.apache.flink.runtime.jobmaster.event.JobEvent; +import org.apache.flink.streaming.api.graph.AdaptiveGraphManager; +import org.apache.flink.streaming.api.graph.StreamGraph; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.function.BiConsumer; + +/** Default implementation of {@link AdaptiveExecutionHandler}. */ +public class DefaultAdaptiveExecutionHandler implements AdaptiveExecutionHandler { + + private final Logger log = LoggerFactory.getLogger(DefaultAdaptiveExecutionHandler.class); + + private final List jobGraphUpdateListeners = new ArrayList<>(); + + private final AdaptiveGraphManager adaptiveGraphManager; + + public DefaultAdaptiveExecutionHandler( + ClassLoader userClassloader, StreamGraph streamGraph, Executor serializationExecutor) { + this.adaptiveGraphManager = + new AdaptiveGraphManager(userClassloader, streamGraph, serializationExecutor); + } + + @Override + public JobGraph getJobGraph() { + return adaptiveGraphManager.getJobGraph(); + } + + @Override + public void handleJobEvent(JobEvent jobEvent) { + try { + tryUpdateJobGraph(jobEvent); + } catch (Exception e) { + log.error("Failed to handle job event {}.", jobEvent, e); + throw new RuntimeException(e); + } + } + + private void tryUpdateJobGraph(JobEvent jobEvent) throws Exception { + if (jobEvent instanceof ExecutionJobVertexFinishedEvent) { + ExecutionJobVertexFinishedEvent event = (ExecutionJobVertexFinishedEvent) jobEvent; + + List newlyCreatedJobVertices = + adaptiveGraphManager.onJobVertexFinished(event.getVertexId()); + + if (!newlyCreatedJobVertices.isEmpty()) { + notifyJobGraphUpdated( + newlyCreatedJobVertices, adaptiveGraphManager.getPendingOperatorsCount()); + } + } + } + + private void notifyJobGraphUpdated(List jobVertices, int pendingOperatorsCount) + throws Exception { + for (JobGraphUpdateListener listener : jobGraphUpdateListeners) { + listener.onNewJobVerticesAdded(jobVertices, pendingOperatorsCount); + } + } + + @Override + public void registerJobGraphUpdateListener(JobGraphUpdateListener listener) { + jobGraphUpdateListeners.add(listener); + } + + @Override + public ForwardGroup getForwardGroupByJobVertexId(JobVertexID jobVertexId) { + return adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(jobVertexId); + } + + @Override + public void updateForwardGroupParallelism( + JobVertexID jobVertexId, + int newParallelism, + BiConsumer jobVertexParallelismUpdater) { + StreamNodeForwardGroup forwardGroup = + adaptiveGraphManager.getStreamNodeForwardGroupByVertexId(jobVertexId); + + if (forwardGroup != null && !forwardGroup.isParallelismDecided()) { + forwardGroup.setParallelism(newParallelism); + + forwardGroup + .getChainedStreamNodeGroups() + .forEach( + chainedStreamNodeGroup -> + chainedStreamNodeGroup.forEach( + streamNode -> + adaptiveGraphManager + .updateStreamNodeParallelism( + streamNode.getId(), + newParallelism))); + forwardGroup + .getStartNodes() + .forEach( + streamNode -> + adaptiveGraphManager + .findVertexByStreamNodeId(streamNode.getId()) + .ifPresent( + id -> + jobVertexParallelismUpdater.accept( + id, newParallelism))); + } + } + + @Override + public StreamGraphSchedulingContext createStreamGraphSchedulingContext( + int defaultMaxParallelism) { + return new DefaultStreamGraphSchedulingContext(adaptiveGraphManager, defaultMaxParallelism); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContext.java new file mode 100644 index 00000000000000..668863127efc80 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContext.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.graph.AdaptiveGraphManager; +import org.apache.flink.streaming.api.graph.util.ImmutableStreamNode; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Default implementation of {@link StreamGraphSchedulingContext}. */ +public class DefaultStreamGraphSchedulingContext implements StreamGraphSchedulingContext { + + private final AdaptiveGraphManager adaptiveGraphManager; + private final int defaultMaxParallelism; + + public DefaultStreamGraphSchedulingContext( + AdaptiveGraphManager adaptiveGraphManager, int defaultMaxParallelism) { + this.adaptiveGraphManager = checkNotNull(adaptiveGraphManager); + this.defaultMaxParallelism = defaultMaxParallelism; + } + + @Override + public int getParallelism(int streamNodeId) { + return adaptiveGraphManager + .getStreamGraphContext() + .getStreamGraph() + .getStreamNode(streamNodeId) + .getParallelism(); + } + + @Override + public int getMaxParallelismOrDefault(int streamNodeId) { + ImmutableStreamNode streamNode = + adaptiveGraphManager + .getStreamGraphContext() + .getStreamGraph() + .getStreamNode(streamNodeId); + + if (streamNode.getMaxParallelism() == JobVertex.MAX_PARALLELISM_DEFAULT) { + return AdaptiveBatchScheduler.computeMaxParallelism( + streamNode.getParallelism(), defaultMaxParallelism); + } else { + return streamNode.getMaxParallelism(); + } + } + + @Override + public int getPendingOperatorCount() { + return adaptiveGraphManager.getPendingOperatorsCount(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java new file mode 100644 index 00000000000000..d687d9427ca2cd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyAdaptiveExecutionHandler.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup; +import org.apache.flink.runtime.jobmaster.event.JobEvent; + +import java.util.Map; +import java.util.function.BiConsumer; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A dummy implementation of {@link AdaptiveExecutionHandler}. */ +public class DummyAdaptiveExecutionHandler implements AdaptiveExecutionHandler { + + private final JobGraph jobGraph; + private final Map forwardGroupsByJobVertexId; + + public DummyAdaptiveExecutionHandler(JobGraph jobGraph) { + this.jobGraph = checkNotNull(jobGraph); + this.forwardGroupsByJobVertexId = + ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism( + getJobGraph().getVerticesSortedTopologicallyFromSources()); + } + + @Override + public JobGraph getJobGraph() { + return jobGraph; + } + + @Override + public void handleJobEvent(JobEvent jobEvent) { + // do nothing + } + + @Override + public void registerJobGraphUpdateListener(JobGraphUpdateListener listener) { + // do nothing + } + + @Override + public ForwardGroup getForwardGroupByJobVertexId(JobVertexID jobVertexId) { + return forwardGroupsByJobVertexId.get(jobVertexId); + } + + @Override + public void updateForwardGroupParallelism( + JobVertexID jobVertexId, + int newParallelism, + BiConsumer jobVertexParallelismUpdater) { + JobVertexForwardGroup forwardGroup = forwardGroupsByJobVertexId.get(jobVertexId); + + if (forwardGroup != null && !forwardGroup.isParallelismDecided()) { + forwardGroup.setParallelism(newParallelism); + + for (JobVertexID id : forwardGroup.getJobVertexIds()) { + jobVertexParallelismUpdater.accept(id, newParallelism); + } + } + } + + @Override + public StreamGraphSchedulingContext createStreamGraphSchedulingContext( + int defaultMaxParallelism) { + return DummyStreamGraphSchedulingContext.INSTANCE; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyStreamGraphSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyStreamGraphSchedulingContext.java new file mode 100644 index 00000000000000..71a7ec9f8bcbb7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DummyStreamGraphSchedulingContext.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +/** A dummy implementation of {@link StreamGraphSchedulingContext}. */ +public final class DummyStreamGraphSchedulingContext implements StreamGraphSchedulingContext { + + public static final DummyStreamGraphSchedulingContext INSTANCE = + new DummyStreamGraphSchedulingContext(); + + private DummyStreamGraphSchedulingContext() {} + + @Override + public int getParallelism(int streamNodeId) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMaxParallelismOrDefault(int streamNodeId) { + throw new UnsupportedOperationException(); + } + + @Override + public int getPendingOperatorCount() { + return 0; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/JobGraphUpdateListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/JobGraphUpdateListener.java new file mode 100644 index 00000000000000..8265b5455af3e8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/JobGraphUpdateListener.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobVertex; + +import java.util.List; + +/** + * This interface defines operations for components that are interested in being notified when new + * job vertices are added to the job graph. + */ +@FunctionalInterface +public interface JobGraphUpdateListener { + /** + * Invoked when new {@link JobVertex} instances are added to the JobGraph of a specific job. + * This allows interested components to react to the addition of new vertices to the job + * topology. + * + * @param newVertices A list of newly added JobVertex instances. + * @param pendingOperatorsCount The number of pending operators. + */ + void onNewJobVerticesAdded(List newVertices, int pendingOperatorsCount) + throws Exception; +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphSchedulingContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphSchedulingContext.java new file mode 100644 index 00000000000000..742a7d21d328f3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/StreamGraphSchedulingContext.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +/** Interface for retrieving stream graph context details for adaptive batch jobs. */ +public interface StreamGraphSchedulingContext { + + /** + * Retrieves the parallelism of a given stream node. + * + * @param streamNodeId the ID of the stream node. + * @return the current parallelism of the stream node. + */ + int getParallelism(int streamNodeId); + + /** + * Retrieves the maximum parallelism for a specified stream node. If the maximum parallelism is + * not explicitly set, the default maximum parallelism is returned. + * + * @param streamNodeId the ID of the stream node. + * @return the maximum parallelism for the stream node, or the default if unspecified. + */ + int getMaxParallelismOrDefault(int streamNodeId); + + /** + * Retrieves the count of pending operators waiting to be transferred to job vertices. + * + * @return the number of pending operators. + */ + int getPendingOperatorCount(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java index 34ec7f3874d816..2e4e161521cc6d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java @@ -227,7 +227,7 @@ public Integer getProducerStreamNodeId(IntermediateDataSetID intermediateDataSet return intermediateDataSetIdToProducerMap.get(intermediateDataSetID); } - private Optional findVertexByStreamNodeId(int streamNodeId) { + public Optional findVertexByStreamNodeId(int streamNodeId) { if (frozenNodeToStartNodeMap.containsKey(streamNodeId)) { Integer startNodeId = frozenNodeToStartNodeMap.get(streamNodeId); return Optional.of(jobVerticesCache.get(startNodeId).getID()); @@ -694,4 +694,8 @@ private boolean isReadyToCreateJobVertex(OperatorChainInfo chainInfo) { } return true; } + + public void updateStreamNodeParallelism(int streamNodeId, int newParallelism) { + streamGraph.getStreamNode(streamNodeId).setParallelism(newParallelism); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java index 516bd2b433f3f4..c58e0d90cb024c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ImmutableStreamNode.java @@ -67,4 +67,12 @@ public int getId() { public @Nullable StreamOperatorFactory getOperatorFactory() { return streamNode.getOperatorFactory(); } + + public int getMaxParallelism() { + return streamNode.getMaxParallelism(); + } + + public int getParallelism() { + return streamNode.getParallelism(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest.java new file mode 100644 index 00000000000000..ce384600ce2086 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultAdaptiveExecutionHandlerTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link DefaultAdaptiveExecutionHandler}. */ +class DefaultAdaptiveExecutionHandlerTest { + + @RegisterExtension + static final TestExecutorExtension EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + @Test + void testGetJobGraph() { + JobGraph jobGraph = createAdaptiveExecutionHandler().getJobGraph(); + + assertThat(jobGraph).isNotNull(); + assertThat(jobGraph.getNumberOfVertices()).isOne(); + assertThat(jobGraph.getVertices().iterator().next().getName()).contains("Source"); + } + + @Test + void testHandleJobEvent() { + List newAddedJobVertices = new ArrayList<>(); + AtomicInteger pendingOperators = new AtomicInteger(); + + DefaultAdaptiveExecutionHandler handler = + createAdaptiveExecutionHandler( + (newVertices, pendingOperatorsCount) -> { + newAddedJobVertices.addAll(newVertices); + pendingOperators.set(pendingOperatorsCount); + }, + createStreamGraph()); + + JobGraph jobGraph = handler.getJobGraph(); + JobVertex source = jobGraph.getVertices().iterator().next(); + + // notify Source node is finished + ExecutionJobVertexFinishedEvent event1 = + new ExecutionJobVertexFinishedEvent(source.getID(), Collections.emptyMap()); + handler.handleJobEvent(event1); + assertThat(newAddedJobVertices).hasSize(1); + assertThat(newAddedJobVertices.get(0).getName()).contains("Map"); + assertThat(pendingOperators.get()).isOne(); + + // notify Map node is finished + ExecutionJobVertexFinishedEvent event2 = + new ExecutionJobVertexFinishedEvent( + newAddedJobVertices.get(0).getID(), Collections.emptyMap()); + handler.handleJobEvent(event2); + assertThat(newAddedJobVertices).hasSize(2); + assertThat(newAddedJobVertices.get(1).getName()).contains("Sink"); + assertThat(pendingOperators.get()).isZero(); + } + + @Test + void testUpdateForwardGroupParallelism() { + StreamGraph streamGraph = createStreamGraph(); + DefaultAdaptiveExecutionHandler handler = + createAdaptiveExecutionHandler( + (newVertices, pendingOperatorsCount) -> {}, streamGraph); + JobGraph jobGraph = handler.getJobGraph(); + JobVertexID sourceId = jobGraph.getVertices().iterator().next().getID(); + + Map result = new HashMap<>(); + handler.updateForwardGroupParallelism(sourceId, 2, result::put); + + assertThat(handler.getForwardGroupByJobVertexId(sourceId).getParallelism()).isEqualTo(2); + assertThat( + streamGraph.getStreamNodes().stream() + .filter(node -> node.getOperatorName().equals("Map")) + .findFirst() + .get() + .getParallelism()) + .isEqualTo(2); + assertThat(result.getOrDefault(sourceId, -1)).isEqualTo(2); + } + + private DefaultAdaptiveExecutionHandler createAdaptiveExecutionHandler() { + return createAdaptiveExecutionHandler( + (newVertices, pendingOperatorsCount) -> {}, createStreamGraph()); + } + + /** + * Create a stream graph with the following topology. + * + *
+     *     Source -- forward --> Map -- rescale --> Sink
+     * 
+ */ + private StreamGraph createStreamGraph() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.disableOperatorChaining(); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + + env.fromSequence(0, 1) + .name("Source") + .forward() + .map(i -> i) + .name("Map") + .rescale() + .print() + .name("Sink") + .disableChaining(); + env.setParallelism(1); + + return env.getStreamGraph(); + } + + /** + * Create an {@link DefaultAdaptiveExecutionHandler} with a given {@link JobGraphUpdateListener} + * and a given {@link StreamGraph}. + */ + private DefaultAdaptiveExecutionHandler createAdaptiveExecutionHandler( + JobGraphUpdateListener listener, StreamGraph streamGraph) { + DefaultAdaptiveExecutionHandler handler = + new DefaultAdaptiveExecutionHandler( + getClass().getClassLoader(), streamGraph, EXECUTOR_RESOURCE.getExecutor()); + handler.registerJobGraphUpdateListener(listener); + + return handler; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java new file mode 100644 index 00000000000000..c67877783be304 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultStreamGraphSchedulingContextTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptivebatch; + +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.event.ExecutionJobVertexFinishedEvent; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.streaming.api.graph.StreamNode; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.ScheduledExecutorService; + +import static org.apache.flink.runtime.scheduler.SchedulerBase.getDefaultMaxParallelism; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link DefaultStreamGraphSchedulingContext}. */ +class DefaultStreamGraphSchedulingContextTest { + + @RegisterExtension + private static final TestExecutorExtension EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + @Test + void testGetParallelismAndMaxParallelism() { + int sourceParallelism = 3; + int sinkParallelism = 4; + int sourceMaxParallelism = 5; + int sinkMaxParallelism = 5; + + StreamGraph streamGraph = + getStreamGraph( + sourceParallelism, + sourceMaxParallelism, + sinkParallelism, + sinkMaxParallelism); + + StreamGraphSchedulingContext streamGraphSchedulingContext = + getStreamGraphSchedulingContext(streamGraph, 100); + + Iterator iterator = streamGraph.getStreamNodes().iterator(); + StreamNode source = iterator.next(); + assertThat(streamGraphSchedulingContext.getParallelism(source.getId())) + .isEqualTo(sourceParallelism); + assertThat(streamGraphSchedulingContext.getMaxParallelismOrDefault(source.getId())) + .isEqualTo(sourceMaxParallelism); + + StreamNode sink = iterator.next(); + assertThat(streamGraphSchedulingContext.getParallelism(sink.getId())) + .isEqualTo(sinkParallelism); + assertThat(streamGraphSchedulingContext.getMaxParallelismOrDefault(sink.getId())) + .isEqualTo(sinkMaxParallelism); + } + + @Test + void testGetDefaultMaxParallelism() { + int sourceParallelism = 3; + int sinkParallelism = -1; + int sourceMaxParallelism = -1; + int sinkMaxParallelism = -1; + int defaultMaxParallelism = 100; + + StreamGraph streamGraph = + getStreamGraph( + sourceParallelism, + sourceMaxParallelism, + sinkParallelism, + sinkMaxParallelism); + + StreamGraphSchedulingContext streamGraphSchedulingContext = + getStreamGraphSchedulingContext(streamGraph, defaultMaxParallelism); + + Iterator iterator = streamGraph.getStreamNodes().iterator(); + StreamNode source = iterator.next(); + // the source max parallelism will not fall back to the default max parallelism because its + // parallelism is greater than zero. + assertThat(streamGraphSchedulingContext.getMaxParallelismOrDefault(source.getId())) + .isEqualTo(getDefaultMaxParallelism(sourceParallelism)); + + StreamNode sink = iterator.next(); + // the sink max parallelism will fall back to default max parallelism + assertThat(streamGraphSchedulingContext.getMaxParallelismOrDefault(sink.getId())) + .isEqualTo(defaultMaxParallelism); + } + + @Test + public void testGetPendingOperatorCount() { + StreamGraph streamGraph = getStreamGraph(); + DefaultAdaptiveExecutionHandler executionHandler = + getDefaultAdaptiveExecutionHandler(streamGraph); + StreamGraphSchedulingContext schedulingContext = + getStreamGraphSchedulingContext(executionHandler, 1); + + assertThat(schedulingContext.getPendingOperatorCount()).isEqualTo(1); + + JobGraph jobGraph = executionHandler.getJobGraph(); + JobVertex source = jobGraph.getVertices().iterator().next(); + executionHandler.handleJobEvent( + new ExecutionJobVertexFinishedEvent(source.getID(), Collections.emptyMap())); + + assertThat(schedulingContext.getPendingOperatorCount()).isEqualTo(0); + } + + private static StreamGraph getStreamGraph() { + return getStreamGraph(1, 1, 2, 2); + } + + private static StreamGraph getStreamGraph( + int sourceParallelism, + int sourceMaxParallelism, + int sinkParallelism, + int sinkMaxParallelism) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.fromSequence(0L, 1L).disableChaining().print(); + StreamGraph streamGraph = env.getStreamGraph(); + + Iterator iterator = streamGraph.getStreamNodes().iterator(); + + StreamNode source = iterator.next(); + StreamNode sink = iterator.next(); + + source.setParallelism(sourceParallelism); + sink.setParallelism(sinkParallelism); + + if (sourceMaxParallelism > 0) { + source.setMaxParallelism(sourceMaxParallelism); + } + + if (sinkMaxParallelism > 0) { + sink.setMaxParallelism(sinkMaxParallelism); + } + return streamGraph; + } + + private static DefaultAdaptiveExecutionHandler getDefaultAdaptiveExecutionHandler( + StreamGraph streamGraph) { + return new DefaultAdaptiveExecutionHandler( + Thread.currentThread().getContextClassLoader(), + streamGraph, + EXECUTOR_RESOURCE.getExecutor()); + } + + private static StreamGraphSchedulingContext getStreamGraphSchedulingContext( + StreamGraph streamGraph, int defaultMaxParallelism) { + return getDefaultAdaptiveExecutionHandler(streamGraph) + .createStreamGraphSchedulingContext(defaultMaxParallelism); + } + + private static StreamGraphSchedulingContext getStreamGraphSchedulingContext( + AdaptiveExecutionHandler adaptiveExecutionHandler, int defaultMaxParallelism) { + return adaptiveExecutionHandler.createStreamGraphSchedulingContext(defaultMaxParallelism); + } +}