diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java index 691a797752eee..e974bf9761223 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java @@ -20,9 +20,11 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform; +import org.apache.flink.util.function.SerializableFunction; import org.apache.flink.util.function.SerializableSupplier; /** This utility class provides building blocks for custom topologies. */ @@ -39,7 +41,7 @@ private StandardSinkTopologies() {} */ public static void addGlobalCommitter( DataStream> committables, - SerializableSupplier> committerFactory, + SerializableFunction> committerFactory, SerializableSupplier> committableSerializer) { committables .getExecutionEnvironment() @@ -47,4 +49,21 @@ public static void addGlobalCommitter( new GlobalCommitterTransform<>( committables, committerFactory, committableSerializer)); } + + /** + * Adds a global committer to the pipeline that runs as final operator with a parallelism of + * one. + */ + public static void addGlobalCommitter( + DataStream> committables, + SerializableSupplier> committerFactory, + SerializableSupplier> committableSerializer) { + committables + .getExecutionEnvironment() + .addOperator( + new GlobalCommitterTransform<>( + committables, + ctx -> committerFactory.get(), + committableSerializer)); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java index 303c632259fab..7b59fa8f0a7a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java @@ -21,12 +21,14 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.dag.Transformation; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.util.function.SerializableFunction; import org.apache.flink.util.function.SerializableSupplier; import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; @@ -45,12 +47,12 @@ public class GlobalCommitterTransform extends TransformationWithLineage { private final DataStream> inputStream; - private final SerializableSupplier> committerFactory; + private final SerializableFunction> committerFactory; private final SerializableSupplier> committableSerializer; public GlobalCommitterTransform( DataStream> inputStream, - SerializableSupplier> committerFactory, + SerializableFunction> committerFactory, SerializableSupplier> committableSerializer) { super(StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME, Types.VOID, 1, true); this.inputStream = inputStream; @@ -78,7 +80,7 @@ public DataStream> getInputStream() { return inputStream; } - public SerializableSupplier> getCommitterFactory() { + public SerializableFunction> getCommitterFactory() { return committerFactory; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterInitContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterInitContextImpl.java new file mode 100644 index 0000000000000..f69fbd2fbd89e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterInitContextImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import java.util.OptionalLong; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +class CommitterInitContextImpl extends InitContextBase implements CommitterInitContext { + + private final SinkCommitterMetricGroup metricGroup; + + public CommitterInitContextImpl( + StreamingRuntimeContext runtimeContext, + SinkCommitterMetricGroup metricGroup, + OptionalLong restoredCheckpointId) { + super(runtimeContext, restoredCheckpointId); + this.metricGroup = checkNotNull(metricGroup); + } + + @Override + public SinkCommitterMetricGroup metricGroup() { + return metricGroup; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 3a2c221a5eaee..9cd85c4001abc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -37,7 +37,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager; import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector; @@ -123,7 +122,8 @@ protected void setup( public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); OptionalLong checkpointId = context.getRestoredCheckpointId(); - CommitterInitContext initContext = createInitContext(checkpointId); + CommitterInitContext initContext = + new CommitterInitContextImpl(getRuntimeContext(), metricGroup, checkpointId); committer = committerSupplier.apply(initContext); committableCollectorState = new SimpleVersionedListState<>( @@ -213,27 +213,4 @@ public void processElement(StreamRecord> element) thro public void close() throws Exception { closeAll(committer, super::close); } - - private CommitterInitContext createInitContext(OptionalLong restoredCheckpointId) { - return new CommitterInitContextImp(getRuntimeContext(), metricGroup, restoredCheckpointId); - } - - private static class CommitterInitContextImp extends InitContextBase - implements CommitterInitContext { - - private final SinkCommitterMetricGroup metricGroup; - - public CommitterInitContextImp( - StreamingRuntimeContext runtimeContext, - SinkCommitterMetricGroup metricGroup, - OptionalLong restoredCheckpointId) { - super(runtimeContext, restoredCheckpointId); - this.metricGroup = checkNotNull(metricGroup); - } - - @Override - public SinkCommitterMetricGroup metricGroup() { - return metricGroup; - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommittableWrapper.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommittableWrapper.java similarity index 96% rename from flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommittableWrapper.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommittableWrapper.java index c9f6d08151088..a74f7c379cc05 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommittableWrapper.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommittableWrapper.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java similarity index 92% rename from flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java index 7dc4492e40a58..9c229b0bb2c2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java @@ -16,19 +16,21 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.configuration.SinkOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -39,6 +41,7 @@ import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.SerializableFunction; import org.apache.flink.util.function.SerializableSupplier; import javax.annotation.Nullable; @@ -47,6 +50,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -100,7 +104,7 @@ public class GlobalCommitterOperator extends AbstractStreamO new ListStateDescriptor<>( "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); - private final SerializableSupplier> committerFactory; + private final SerializableFunction> committerFactory; private final SerializableSupplier> committableSerializerFactory; /** @@ -121,7 +125,7 @@ public class GlobalCommitterOperator extends AbstractStreamO private List sinkV1State = new ArrayList<>(); public GlobalCommitterOperator( - SerializableSupplier> committerFactory, + SerializableFunction> committerFactory, SerializableSupplier> committableSerializerFactory, boolean commitOnInput) { this.committerFactory = checkNotNull(committerFactory); @@ -135,7 +139,6 @@ protected void setup( StreamConfig config, Output> output) { super.setup(containingTask, config, output); - committer = committerFactory.get(); metricGroup = InternalSinkCommitterMetricGroup.wrap(metrics); committableCollector = CommittableCollector.of(metricGroup); committableSerializer = committableSerializerFactory.get(); @@ -155,6 +158,11 @@ public void snapshotState(StateSnapshotContext context) throws Exception { @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); + OptionalLong restoredCheckpointId = context.getRestoredCheckpointId(); + committer = + committerFactory.apply( + new CommitterInitContextImpl( + getRuntimeContext(), metricGroup, restoredCheckpointId)); globalCommitterState = new SimpleVersionedListState<>( context.getOperatorStateStore() @@ -169,8 +177,8 @@ public void initializeState(StateInitializationContext context) throws Exception committableCollector.merge(cc.getCommittableCollector()); }); // try to re-commit recovered transactions as quickly as possible - if (context.getRestoredCheckpointId().isPresent()) { - commit(context.getRestoredCheckpointId().getAsLong()); + if (restoredCheckpointId.isPresent()) { + commit(restoredCheckpointId.getAsLong()); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializer.java similarity index 99% rename from flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializer.java index a8c5ae5497883..4fe1cd35733fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerialization; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java index d46e6a295107a..278a29477890c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -31,14 +30,18 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PhysicalTransformation; import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.GlobalCommitterOperator; import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Queue; import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; import static org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME; @@ -74,11 +77,10 @@ private Collection translateInternal( boolean commitOnInput = batch || !checkpointingEnabled || hasUpstreamCommitter(inputStream); // Create a global shuffle and add the global committer with parallelism 1. + DataStream> global = inputStream.global(); final PhysicalTransformation transformation = (PhysicalTransformation) - inputStream - .global() - .transform( + global.transform( GLOBAL_COMMITTER_TRANSFORMATION_NAME, Types.VOID, new GlobalCommitterOperator<>( @@ -87,10 +89,20 @@ private Collection translateInternal( commitOnInput)) .getTransformation(); transformation.setChainingStrategy(ChainingStrategy.ALWAYS); - transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME); transformation.setParallelism(1); transformation.setMaxParallelism(1); - return Collections.emptyList(); + copySafely(transformation::setName, globalCommitterTransform::getName); + copySafely(transformation::setUid, globalCommitterTransform::getUid); + copySafely(transformation::setUidHash, globalCommitterTransform::getUserProvidedNodeHash); + + return Arrays.asList(global.getId(), transformation.getId()); + } + + private static void copySafely(Consumer consumer, Supplier provider) { + T value = provider.get(); + if (value != null) { + consumer.accept(value); + } } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index 52449be1f1825..dd41218a4970f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -59,6 +59,8 @@ import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; @@ -177,6 +179,12 @@ private void expand() { getSinkTransformations(sizeBefore).forEach(context::transform); + repeatUntilConverged( + () -> + getSinkTransformations(sizeBefore).stream() + .flatMap(t -> context.transform(t).stream()) + .collect(Collectors.toList())); + disallowUnalignedCheckpoint(getSinkTransformations(sizeBefore)); // Remove all added sink subtransformations to avoid duplications and allow additional @@ -188,6 +196,14 @@ private void expand() { } } + private void repeatUntilConverged(Supplier producer) { + R lastResult = producer.get(); + R nextResult; + while (!lastResult.equals(nextResult = producer.get())) { + lastResult = nextResult; + } + } + private List> getSinkTransformations(int sizeBefore) { return executionEnvironment .getTransformations() diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java index 0f84c1c12008f..379200339c5e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.connector.sink2; +import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer; + import org.junit.jupiter.api.Test; import java.io.IOException; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfoTest.java index 43d64ad091895..c318ab5a4bd94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfoTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.connector.sink2; import org.apache.flink.api.common.typeutils.TypeInformationTestBase; +import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer; /** Test for {@link CommittableMessageTypeInfo}. */ class CommittableMessageTypeInfoTest diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java deleted file mode 100644 index e95523a1492ee..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.graph; - -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.StreamOperatorFactory; -import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; -import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; - -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.util.Arrays; -import java.util.Collection; -import java.util.function.Predicate; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** - * Tests for {@link org.apache.flink.streaming.api.transformations.SinkTransformation}. - * - *

ATTENTION: This test is extremely brittle. Do NOT remove, add or re-order test cases. - */ -@ExtendWith(ParameterizedTestExtension.class) -abstract class SinkTransformationTranslatorITCaseBase { - - @Parameters(name = "Execution Mode: {0}") - private static Collection data() { - return Arrays.asList(RuntimeExecutionMode.STREAMING, RuntimeExecutionMode.BATCH); - } - - @Parameter protected RuntimeExecutionMode runtimeExecutionMode; - - static final String NAME = "FileSink"; - static final String SLOT_SHARE_GROUP = "FileGroup"; - static final String UID = "FileUid"; - static final int PARALLELISM = 2; - - abstract SinkT simpleSink(); - - abstract SinkT sinkWithCommitter(); - - abstract DataStreamSink sinkTo(DataStream stream, SinkT sink); - - @TestTemplate - void generateWriterTopology() { - final StreamGraph streamGraph = buildGraph(simpleSink(), runtimeExecutionMode); - - final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); - final StreamNode writerNode = findWriter(streamGraph); - - assertThat(streamGraph.getStreamNodes()).hasSize(2); - - validateTopology( - sourceNode, - IntSerializer.class, - writerNode, - SinkWriterOperatorFactory.class, - PARALLELISM, - -1); - } - - @TestTemplate - void generateWriterCommitterTopology() { - - final StreamGraph streamGraph = buildGraph(sinkWithCommitter(), runtimeExecutionMode); - - final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); - final StreamNode writerNode = findWriter(streamGraph); - - validateTopology( - sourceNode, - IntSerializer.class, - writerNode, - SinkWriterOperatorFactory.class, - PARALLELISM, - -1); - - final StreamNode committerNode = - findNodeName(streamGraph, name -> name.contains("Committer")); - - assertThat(streamGraph.getStreamNodes()).hasSize(3); - assertNoUnalignedOutput(writerNode); - - validateTopology( - writerNode, - SimpleVersionedSerializerTypeSerializerProxy.class, - committerNode, - CommitterOperatorFactory.class, - PARALLELISM, - -1); - } - - @TestTemplate - void testParallelismConfigured() { - testParallelismConfiguredInternal(true); - - testParallelismConfiguredInternal(false); - } - - private void testParallelismConfiguredInternal(boolean setSinkParallelism) { - final StreamGraph streamGraph = - buildGraph(sinkWithCommitter(), runtimeExecutionMode, setSinkParallelism); - - final StreamNode writerNode = findWriter(streamGraph); - final StreamNode committerNode = findCommitter(streamGraph); - - assertThat(writerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism); - assertThat(committerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism); - } - - StreamNode findWriter(StreamGraph streamGraph) { - return findNodeName( - streamGraph, name -> name.contains("Writer") && !name.contains("Committer")); - } - - StreamNode findCommitter(StreamGraph streamGraph) { - return findNodeName( - streamGraph, - name -> name.contains("Committer") && !name.contains("Global Committer")); - } - - StreamNode findGlobalCommitter(StreamGraph streamGraph) { - return findNodeName(streamGraph, name -> name.contains("Global Committer")); - } - - @TestTemplate - void throwExceptionWithoutSettingUid() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - final Configuration config = new Configuration(); - config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode); - env.configure(config, getClass().getClassLoader()); - // disable auto generating uid - env.getConfig().disableAutoGeneratedUIDs(); - sinkTo(env.fromElements(1, 2), simpleSink()); - assertThatThrownBy(env::getStreamGraph).isInstanceOf(IllegalStateException.class); - } - - @TestTemplate - void disableOperatorChain() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - final DataStreamSource src = env.fromElements(1, 2); - final DataStreamSink dataStreamSink = sinkTo(src, sinkWithCommitter()).name(NAME); - dataStreamSink.disableChaining(); - - final StreamGraph streamGraph = env.getStreamGraph(); - final StreamNode writer = findWriter(streamGraph); - final StreamNode committer = findCommitter(streamGraph); - - assertThat(writer.getOperatorFactory().getChainingStrategy()) - .isEqualTo(ChainingStrategy.NEVER); - assertThat(committer.getOperatorFactory().getChainingStrategy()) - .isEqualTo(ChainingStrategy.NEVER); - } - - void validateTopology( - StreamNode src, - Class srcOutTypeInfo, - StreamNode dest, - Class operatorFactoryClass, - int expectedParallelism, - int expectedMaxParallelism) { - - // verify src node - final StreamEdge srcOutEdge = src.getOutEdges().get(0); - assertThat(srcOutEdge.getTargetId()).isEqualTo(dest.getId()); - assertThat(src.getTypeSerializerOut()).isInstanceOf(srcOutTypeInfo); - - // verify dest node input - final StreamEdge destInputEdge = dest.getInEdges().get(0); - assertThat(destInputEdge.getTargetId()).isEqualTo(dest.getId()); - assertThat(dest.getTypeSerializersIn()[0]).isInstanceOf(srcOutTypeInfo); - - // make sure 2 sink operators have different names/uid - assertThat(dest.getOperatorName()).isNotEqualTo(src.getOperatorName()); - assertThat(dest.getTransformationUID()).isNotEqualTo(src.getTransformationUID()); - - assertThat(dest.getOperatorFactory()).isInstanceOf(operatorFactoryClass); - assertThat(dest.getParallelism()).isEqualTo(expectedParallelism); - assertThat(dest.getMaxParallelism()).isEqualTo(expectedMaxParallelism); - assertThat(dest.getOperatorFactory().getChainingStrategy()) - .isEqualTo(ChainingStrategy.ALWAYS); - assertThat(dest.getSlotSharingGroup()).isEqualTo(SLOT_SHARE_GROUP); - } - - protected static void assertNoUnalignedOutput(StreamNode src) { - assertThat(src.getOutEdges()).allMatch(e -> !e.supportsUnalignedCheckpoints()); - } - - StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode runtimeExecutionMode) { - return buildGraph(sink, runtimeExecutionMode, true); - } - - StreamGraph buildGraph( - SinkT sink, RuntimeExecutionMode runtimeExecutionMode, boolean setSinkParallelism) { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - final Configuration config = new Configuration(); - config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode); - env.configure(config, getClass().getClassLoader()); - final DataStreamSource src = env.fromElements(1, 2); - final DataStreamSink dataStreamSink = sinkTo(src.rebalance(), sink); - setSinkProperty(dataStreamSink, setSinkParallelism); - // Trigger the plan generation but do not clear the transformations - env.getExecutionPlan(); - return env.getStreamGraph(); - } - - private void setSinkProperty( - DataStreamSink dataStreamSink, boolean setSinkParallelism) { - dataStreamSink.name(NAME); - dataStreamSink.uid(UID); - if (setSinkParallelism) { - dataStreamSink.setParallelism(SinkTransformationTranslatorITCaseBase.PARALLELISM); - } - dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP); - } - - StreamNode findNodeName(StreamGraph streamGraph, Predicate predicate) { - return streamGraph.getStreamNodes().stream() - .filter(node -> predicate.test(node.getOperatorName())) - .findFirst() - .orElseThrow(() -> new IllegalStateException("Can not find the node")); - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java similarity index 95% rename from flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java rename to flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java index 5dc646240fcd4..a73dcc24d012c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java @@ -16,10 +16,13 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -173,7 +176,7 @@ private OneInputStreamOperatorTestHarness, Void> cre Committer committer, boolean commitOnInput) throws Exception { return new OneInputStreamOperatorTestHarness<>( new GlobalCommitterOperator<>( - () -> committer, IntegerSerializer::new, commitOnInput)); + ctx -> committer, IntegerSerializer::new, commitOnInput)); } private static class MockCommitter implements Committer { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializerTest.java similarity index 96% rename from flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java rename to flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializerTest.java index 8c5cc11b9051b..eb2416386a778 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializerTest.java @@ -16,13 +16,15 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager; import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector; import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/IntegerSerializer.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/IntegerSerializer.java similarity index 96% rename from flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/IntegerSerializer.java rename to flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/IntegerSerializer.java index 4fa4e3a650bd1..dc59acd349bac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/IntegerSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/IntegerSerializer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java index 938427ba2d51c..a69c7c64355ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer; +import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java index 6b09b83961dec..9e3753031c0bf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java @@ -18,77 +18,300 @@ package org.apache.flink.streaming.api.graph; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.function.Predicate; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for {@link org.apache.flink.streaming.api.transformations.SinkTransformation}. * *

ATTENTION: This test is extremely brittle. Do NOT remove, add or re-order test cases. */ -@ExtendWith(ParameterizedTestExtension.class) -class SinkV2TransformationTranslatorITCase - extends SinkTransformationTranslatorITCaseBase> { +class SinkV2TransformationTranslatorITCase { + + static final String NAME = "FileSink"; + static final String SLOT_SHARE_GROUP = "FileGroup"; + static final String UID = "FileUid"; + static final int PARALLELISM = 2; + + protected static void assertNoUnalignedOutput(StreamNode src) { + assertThat(src.getOutEdges()).allMatch(e -> !e.supportsUnalignedCheckpoints()); + } - @Override Sink simpleSink() { return TestSinkV2.newBuilder().build(); } - @Override Sink sinkWithCommitter() { return TestSinkV2.newBuilder().setDefaultCommitter().build(); } - @Override + Sink sinkWithCommitterAndGlobalCommitter() { + return TestSinkV2.newBuilder() + .setDefaultCommitter() + .setWithPostCommitTopology(true) + .build(); + } + DataStreamSink sinkTo(DataStream stream, Sink sink) { return stream.sinkTo(sink); } - @TestTemplate + @Test void testSettingOperatorUidHash() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - final DataStreamSource src = env.fromElements(1, 2); + final DataStreamSource src = env.fromData(1, 2); final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead"; final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10"; + final String globalCommitterHash = "77e6aa6eeb1643b3765e1e4a7a672f37"; final CustomSinkOperatorUidHashes operatorsUidHashes = CustomSinkOperatorUidHashes.builder() .setWriterUidHash(writerHash) .setCommitterUidHash(committerHash) + .setGlobalCommitterUidHash(globalCommitterHash) .build(); - src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); + src.sinkTo(sinkWithCommitterAndGlobalCommitter(), operatorsUidHashes).name(NAME); final StreamGraph streamGraph = env.getStreamGraph(); assertThat(findWriter(streamGraph).getUserHash()).isEqualTo(writerHash); assertThat(findCommitter(streamGraph).getUserHash()).isEqualTo(committerHash); + assertThat(findGlobalCommitter(streamGraph).getUserHash()).isEqualTo(globalCommitterHash); } /** * When ever you need to change something in this test case please think about possible state * upgrade problems introduced by your changes. */ - @TestTemplate + @Test void testSettingOperatorUids() { final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead"; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - final DataStreamSource src = env.fromElements(1, 2); - src.sinkTo(sinkWithCommitter()).name(NAME).uid(sinkUid); + final DataStreamSource src = env.fromData(1, 2); + src.sinkTo(sinkWithCommitterAndGlobalCommitter()).name(NAME).uid(sinkUid); final StreamGraph streamGraph = env.getStreamGraph(); assertThat(findWriter(streamGraph).getTransformationUID()).isEqualTo(sinkUid); assertThat(findCommitter(streamGraph).getTransformationUID()) .isEqualTo(String.format("Sink Committer: %s", sinkUid)); + assertThat(findGlobalCommitter(streamGraph).getTransformationUID()) + .isEqualTo(String.format("Sink %s Global Committer", sinkUid)); + } + + @Test + void testSettingOperatorNames() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final DataStreamSource src = env.fromData(1, 2); + src.sinkTo(sinkWithCommitterAndGlobalCommitter()).name(NAME); + + final StreamGraph streamGraph = env.getStreamGraph(); + assertThat(findWriter(streamGraph).getOperatorName()) + .isEqualTo(String.format("%s: Writer", NAME)); + assertThat(findCommitter(streamGraph).getOperatorName()) + .isEqualTo(String.format("%s: Committer", NAME)); + assertThat(findGlobalCommitter(streamGraph).getOperatorName()) + .isEqualTo(String.format("%s: Global Committer", NAME)); + } + + @ParameterizedTest + @EnumSource(RuntimeExecutionMode.class) + void generateWriterTopology(RuntimeExecutionMode runtimeExecutionMode) { + final StreamGraph streamGraph = buildGraph(simpleSink(), runtimeExecutionMode); + + final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); + final StreamNode writerNode = findWriter(streamGraph); + + assertThat(streamGraph.getStreamNodes()).hasSize(2); + + validateTopology( + sourceNode, + IntSerializer.class, + writerNode, + SinkWriterOperatorFactory.class, + PARALLELISM, + -1); + } + + @ParameterizedTest + @EnumSource(RuntimeExecutionMode.class) + void generateWriterCommitterTopology(RuntimeExecutionMode runtimeExecutionMode) { + final StreamGraph streamGraph = buildGraph(sinkWithCommitter(), runtimeExecutionMode); + + final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); + final StreamNode writerNode = findWriter(streamGraph); + + validateTopology( + sourceNode, + IntSerializer.class, + writerNode, + SinkWriterOperatorFactory.class, + PARALLELISM, + -1); + + final StreamNode committerNode = + findNodeName(streamGraph, name -> name.contains("Committer")); + + assertThat(streamGraph.getStreamNodes()).hasSize(3); + assertNoUnalignedOutput(writerNode); + + validateTopology( + writerNode, + SimpleVersionedSerializerTypeSerializerProxy.class, + committerNode, + CommitterOperatorFactory.class, + PARALLELISM, + -1); + } + + @ParameterizedTest + @CsvSource({"STREAMING, true", "STREAMING, false", "BATCH, true", "BATCH, false"}) + void testParallelismConfigured( + RuntimeExecutionMode runtimeExecutionMode, boolean setSinkParallelism) { + final StreamGraph streamGraph = + buildGraph(sinkWithCommitter(), runtimeExecutionMode, setSinkParallelism); + + final StreamNode writerNode = findWriter(streamGraph); + final StreamNode committerNode = findCommitter(streamGraph); + + assertThat(writerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism); + assertThat(committerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism); + } + + @ParameterizedTest + @EnumSource(RuntimeExecutionMode.class) + void throwExceptionWithoutSettingUid(RuntimeExecutionMode runtimeExecutionMode) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final Configuration config = new Configuration(); + config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode); + env.configure(config, getClass().getClassLoader()); + // disable auto generating uid + env.getConfig().disableAutoGeneratedUIDs(); + sinkTo(env.fromData(1, 2), simpleSink()); + assertThatThrownBy(env::getStreamGraph).isInstanceOf(IllegalStateException.class); + } + + @Test + void disableOperatorChain() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStreamSource src = env.fromData(1, 2); + final DataStreamSink dataStreamSink = sinkTo(src, sinkWithCommitter()).name(NAME); + dataStreamSink.disableChaining(); + + final StreamGraph streamGraph = env.getStreamGraph(); + final StreamNode writer = findWriter(streamGraph); + final StreamNode committer = findCommitter(streamGraph); + + assertThat(writer.getOperatorFactory().getChainingStrategy()) + .isEqualTo(ChainingStrategy.NEVER); + assertThat(committer.getOperatorFactory().getChainingStrategy()) + .isEqualTo(ChainingStrategy.NEVER); + } + + void validateTopology( + StreamNode src, + Class srcOutTypeInfo, + StreamNode dest, + Class operatorFactoryClass, + int expectedParallelism, + int expectedMaxParallelism) { + + // verify src node + final StreamEdge srcOutEdge = src.getOutEdges().get(0); + assertThat(srcOutEdge.getTargetId()).isEqualTo(dest.getId()); + assertThat(src.getTypeSerializerOut()).isInstanceOf(srcOutTypeInfo); + + // verify dest node input + final StreamEdge destInputEdge = dest.getInEdges().get(0); + assertThat(destInputEdge.getTargetId()).isEqualTo(dest.getId()); + assertThat(dest.getTypeSerializersIn()[0]).isInstanceOf(srcOutTypeInfo); + + // make sure 2 sink operators have different names/uid + assertThat(dest.getOperatorName()).isNotEqualTo(src.getOperatorName()); + assertThat(dest.getTransformationUID()).isNotEqualTo(src.getTransformationUID()); + + assertThat(dest.getOperatorFactory()).isInstanceOf(operatorFactoryClass); + assertThat(dest.getParallelism()).isEqualTo(expectedParallelism); + assertThat(dest.getMaxParallelism()).isEqualTo(expectedMaxParallelism); + assertThat(dest.getOperatorFactory().getChainingStrategy()) + .isEqualTo(ChainingStrategy.ALWAYS); + assertThat(dest.getSlotSharingGroup()).isEqualTo(SLOT_SHARE_GROUP); + } + + StreamGraph buildGraph(Sink sink, RuntimeExecutionMode runtimeExecutionMode) { + return buildGraph(sink, runtimeExecutionMode, true); + } + + StreamGraph buildGraph( + Sink sink, + RuntimeExecutionMode runtimeExecutionMode, + boolean setSinkParallelism) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final Configuration config = new Configuration(); + config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode); + env.configure(config, getClass().getClassLoader()); + final DataStreamSource src = env.fromData(1, 2); + final DataStreamSink dataStreamSink = sinkTo(src.rebalance(), sink); + setSinkProperty(dataStreamSink, setSinkParallelism); + // Trigger the plan generation but do not clear the transformations + env.getExecutionPlan(); + return env.getStreamGraph(); + } + + private void setSinkProperty( + DataStreamSink dataStreamSink, boolean setSinkParallelism) { + dataStreamSink.name(NAME); + dataStreamSink.uid(UID); + if (setSinkParallelism) { + dataStreamSink.setParallelism(SinkV2TransformationTranslatorITCase.PARALLELISM); + } + dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP); + } + + StreamNode findNodeName(StreamGraph streamGraph, Predicate predicate) { + return streamGraph.getStreamNodes().stream() + .filter(node -> predicate.test(node.getOperatorName())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Can not find the node")); + } + + StreamNode findWriter(StreamGraph streamGraph) { + return findNodeName( + streamGraph, name -> name.contains("Writer") && !name.contains("Committer")); + } + + StreamNode findCommitter(StreamGraph streamGraph) { + return findNodeName( + streamGraph, + name -> name.contains("Committer") && !name.contains("Global Committer")); + } + + StreamNode findGlobalCommitter(StreamGraph streamGraph) { + return findNodeName(streamGraph, name -> name.contains("Global Committer")); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java index fc1a9066c7cd8..18f934752a377 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; import org.apache.flink.streaming.api.datastream.DataStream; @@ -241,7 +242,8 @@ public TestSinkV2WithPostCommitTopology( @Override public void addPostCommitTopology(DataStream> committables) { - // We do not need to do anything for tests + StandardSinkTopologies.addGlobalCommitter( + committables, this::createCommitter, this::getCommittableSerializer); } }