From 2e15c79540e86e44089dd59124c3646688d52025 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Mon, 25 Nov 2024 14:35:47 +0100 Subject: [PATCH 1/3] [FLINK-36788] Move GlobalCommitter to flink-runtime Small refactor for next commit --- .../operators/sink}/GlobalCommittableWrapper.java | 2 +- .../operators/sink}/GlobalCommitterOperator.java | 3 ++- .../operators/sink}/GlobalCommitterSerializer.java | 2 +- .../translators/GlobalCommitterTransformationTranslator.java | 2 +- .../connector/sink2/CommittableMessageSerializerTest.java | 2 ++ .../api/connector/sink2/CommittableMessageTypeInfoTest.java | 1 + .../operators/sink}/GlobalCommitterOperatorTest.java | 5 ++++- .../operators/sink}/GlobalCommitterSerializerTest.java | 4 +++- .../sink2 => runtime/operators/sink}/IntegerSerializer.java | 2 +- .../committables/CommittableCollectorSerializerTest.java | 2 +- 10 files changed, 17 insertions(+), 8 deletions(-) rename flink-runtime/src/main/java/org/apache/flink/streaming/{api/connector/sink2 => runtime/operators/sink}/GlobalCommittableWrapper.java (96%) rename flink-runtime/src/main/java/org/apache/flink/streaming/{api/connector/sink2 => runtime/operators/sink}/GlobalCommitterOperator.java (98%) rename flink-runtime/src/main/java/org/apache/flink/streaming/{api/connector/sink2 => runtime/operators/sink}/GlobalCommitterSerializer.java (99%) rename flink-runtime/src/test/java/org/apache/flink/streaming/{api/connector/sink2 => runtime/operators/sink}/GlobalCommitterOperatorTest.java (96%) rename flink-runtime/src/test/java/org/apache/flink/streaming/{api/connector/sink2 => runtime/operators/sink}/GlobalCommitterSerializerTest.java (96%) rename flink-runtime/src/test/java/org/apache/flink/streaming/{api/connector/sink2 => runtime/operators/sink}/IntegerSerializer.java (96%) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommittableWrapper.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommittableWrapper.java similarity index 96% rename from flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommittableWrapper.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommittableWrapper.java index c9f6d08151088..a74f7c379cc05 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommittableWrapper.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommittableWrapper.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java similarity index 98% rename from flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java index 7dc4492e40a58..3b80a0870ab4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.ListState; @@ -29,6 +29,7 @@ import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializer.java similarity index 99% rename from flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java rename to flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializer.java index a8c5ae5497883..4fe1cd35733fe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerialization; diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java index d46e6a295107a..e4b3449337d66 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; -import org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.graph.TransformationTranslator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -31,6 +30,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PhysicalTransformation; import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.GlobalCommitterOperator; import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import java.util.ArrayDeque; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java index 0f84c1c12008f..379200339c5e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageSerializerTest.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.connector.sink2; +import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer; + import org.junit.jupiter.api.Test; import java.io.IOException; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfoTest.java index 43d64ad091895..c318ab5a4bd94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/CommittableMessageTypeInfoTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.connector.sink2; import org.apache.flink.api.common.typeutils.TypeInformationTestBase; +import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer; /** Test for {@link CommittableMessageTypeInfo}. */ class CommittableMessageTypeInfoTest diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java similarity index 96% rename from flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java rename to flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java index 5dc646240fcd4..b2bd505fcffd5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java @@ -16,10 +16,13 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializerTest.java similarity index 96% rename from flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java rename to flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializerTest.java index 8c5cc11b9051b..eb2416386a778 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/GlobalCommitterSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterSerializerTest.java @@ -16,13 +16,15 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils; +import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; +import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager; import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector; import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/IntegerSerializer.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/IntegerSerializer.java similarity index 96% rename from flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/IntegerSerializer.java rename to flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/IntegerSerializer.java index 4fa4e3a650bd1..dc59acd349bac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/connector/sink2/IntegerSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/IntegerSerializer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.api.connector.sink2; +package org.apache.flink.streaming.runtime.operators.sink; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java index 938427ba2d51c..a69c7c64355ec 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializerTest.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; -import org.apache.flink.streaming.api.connector.sink2.IntegerSerializer; +import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer; import org.assertj.core.api.InstanceOfAssertFactories; import org.junit.jupiter.api.Test; From bc8f1739d80e7ad3212808527715ccd6ffa50a80 Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Mon, 25 Nov 2024 14:37:48 +0100 Subject: [PATCH 2/3] [FLINK-36788] Add InitContext to GlobalCommitter factory StandardSinkTopologies didn't expose yet the newly added ctx, such that the global committer couldn't access the metric groups --- .../sink2/StandardSinkTopologies.java | 21 ++++++++- .../GlobalCommitterTransform.java | 8 ++-- .../sink/CommitterInitContextImpl.java | 45 +++++++++++++++++++ .../operators/sink/CommitterOperator.java | 27 +---------- .../sink/GlobalCommitterOperator.java | 17 ++++--- .../sink/GlobalCommitterOperatorTest.java | 2 +- 6 files changed, 85 insertions(+), 35 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterInitContextImpl.java diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java index 691a797752eee..e974bf9761223 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/connector/sink2/StandardSinkTopologies.java @@ -20,9 +20,11 @@ import org.apache.flink.annotation.Experimental; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform; +import org.apache.flink.util.function.SerializableFunction; import org.apache.flink.util.function.SerializableSupplier; /** This utility class provides building blocks for custom topologies. */ @@ -39,7 +41,7 @@ private StandardSinkTopologies() {} */ public static void addGlobalCommitter( DataStream> committables, - SerializableSupplier> committerFactory, + SerializableFunction> committerFactory, SerializableSupplier> committableSerializer) { committables .getExecutionEnvironment() @@ -47,4 +49,21 @@ public static void addGlobalCommitter( new GlobalCommitterTransform<>( committables, committerFactory, committableSerializer)); } + + /** + * Adds a global committer to the pipeline that runs as final operator with a parallelism of + * one. + */ + public static void addGlobalCommitter( + DataStream> committables, + SerializableSupplier> committerFactory, + SerializableSupplier> committableSerializer) { + committables + .getExecutionEnvironment() + .addOperator( + new GlobalCommitterTransform<>( + committables, + ctx -> committerFactory.get(), + committableSerializer)); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java index 303c632259fab..7b59fa8f0a7a6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/GlobalCommitterTransform.java @@ -21,12 +21,14 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.api.dag.Transformation; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.util.function.SerializableFunction; import org.apache.flink.util.function.SerializableSupplier; import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; @@ -45,12 +47,12 @@ public class GlobalCommitterTransform extends TransformationWithLineage { private final DataStream> inputStream; - private final SerializableSupplier> committerFactory; + private final SerializableFunction> committerFactory; private final SerializableSupplier> committableSerializer; public GlobalCommitterTransform( DataStream> inputStream, - SerializableSupplier> committerFactory, + SerializableFunction> committerFactory, SerializableSupplier> committableSerializer) { super(StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME, Types.VOID, 1, true); this.inputStream = inputStream; @@ -78,7 +80,7 @@ public DataStream> getInputStream() { return inputStream; } - public SerializableSupplier> getCommitterFactory() { + public SerializableFunction> getCommitterFactory() { return committerFactory; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterInitContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterInitContextImpl.java new file mode 100644 index 0000000000000..f69fbd2fbd89e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterInitContextImpl.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.connector.sink2.CommitterInitContext; +import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; + +import java.util.OptionalLong; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +class CommitterInitContextImpl extends InitContextBase implements CommitterInitContext { + + private final SinkCommitterMetricGroup metricGroup; + + public CommitterInitContextImpl( + StreamingRuntimeContext runtimeContext, + SinkCommitterMetricGroup metricGroup, + OptionalLong restoredCheckpointId) { + super(runtimeContext, restoredCheckpointId); + this.metricGroup = checkNotNull(metricGroup); + } + + @Override + public SinkCommitterMetricGroup metricGroup() { + return metricGroup; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java index 3a2c221a5eaee..9cd85c4001abc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/CommitterOperator.java @@ -37,7 +37,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; -import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager; import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector; @@ -123,7 +122,8 @@ protected void setup( public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); OptionalLong checkpointId = context.getRestoredCheckpointId(); - CommitterInitContext initContext = createInitContext(checkpointId); + CommitterInitContext initContext = + new CommitterInitContextImpl(getRuntimeContext(), metricGroup, checkpointId); committer = committerSupplier.apply(initContext); committableCollectorState = new SimpleVersionedListState<>( @@ -213,27 +213,4 @@ public void processElement(StreamRecord> element) thro public void close() throws Exception { closeAll(committer, super::close); } - - private CommitterInitContext createInitContext(OptionalLong restoredCheckpointId) { - return new CommitterInitContextImp(getRuntimeContext(), metricGroup, restoredCheckpointId); - } - - private static class CommitterInitContextImp extends InitContextBase - implements CommitterInitContext { - - private final SinkCommitterMetricGroup metricGroup; - - public CommitterInitContextImp( - StreamingRuntimeContext runtimeContext, - SinkCommitterMetricGroup metricGroup, - OptionalLong restoredCheckpointId) { - super(runtimeContext, restoredCheckpointId); - this.metricGroup = checkNotNull(metricGroup); - } - - @Override - public SinkCommitterMetricGroup metricGroup() { - return metricGroup; - } - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java index 3b80a0870ab4e..9c229b0bb2c2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.CommitterInitContext; import org.apache.flink.configuration.SinkOptions; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.metrics.groups.SinkCommitterMetricGroup; @@ -40,6 +41,7 @@ import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.function.SerializableFunction; import org.apache.flink.util.function.SerializableSupplier; import javax.annotation.Nullable; @@ -48,6 +50,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.OptionalLong; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -101,7 +104,7 @@ public class GlobalCommitterOperator extends AbstractStreamO new ListStateDescriptor<>( "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE); - private final SerializableSupplier> committerFactory; + private final SerializableFunction> committerFactory; private final SerializableSupplier> committableSerializerFactory; /** @@ -122,7 +125,7 @@ public class GlobalCommitterOperator extends AbstractStreamO private List sinkV1State = new ArrayList<>(); public GlobalCommitterOperator( - SerializableSupplier> committerFactory, + SerializableFunction> committerFactory, SerializableSupplier> committableSerializerFactory, boolean commitOnInput) { this.committerFactory = checkNotNull(committerFactory); @@ -136,7 +139,6 @@ protected void setup( StreamConfig config, Output> output) { super.setup(containingTask, config, output); - committer = committerFactory.get(); metricGroup = InternalSinkCommitterMetricGroup.wrap(metrics); committableCollector = CommittableCollector.of(metricGroup); committableSerializer = committableSerializerFactory.get(); @@ -156,6 +158,11 @@ public void snapshotState(StateSnapshotContext context) throws Exception { @Override public void initializeState(StateInitializationContext context) throws Exception { super.initializeState(context); + OptionalLong restoredCheckpointId = context.getRestoredCheckpointId(); + committer = + committerFactory.apply( + new CommitterInitContextImpl( + getRuntimeContext(), metricGroup, restoredCheckpointId)); globalCommitterState = new SimpleVersionedListState<>( context.getOperatorStateStore() @@ -170,8 +177,8 @@ public void initializeState(StateInitializationContext context) throws Exception committableCollector.merge(cc.getCommittableCollector()); }); // try to re-commit recovered transactions as quickly as possible - if (context.getRestoredCheckpointId().isPresent()) { - commit(context.getRestoredCheckpointId().getAsLong()); + if (restoredCheckpointId.isPresent()) { + commit(restoredCheckpointId.getAsLong()); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java index b2bd505fcffd5..a73dcc24d012c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperatorTest.java @@ -176,7 +176,7 @@ private OneInputStreamOperatorTestHarness, Void> cre Committer committer, boolean commitOnInput) throws Exception { return new OneInputStreamOperatorTestHarness<>( new GlobalCommitterOperator<>( - () -> committer, IntegerSerializer::new, commitOnInput)); + ctx -> committer, IntegerSerializer::new, commitOnInput)); } private static class MockCommitter implements Committer { From 960cda3c9bfc3d1eff18eac45138cd0bd319b12c Mon Sep 17 00:00:00 2001 From: Arvid Heise Date: Mon, 25 Nov 2024 14:39:48 +0100 Subject: [PATCH 3/3] [FLINK-36788] Fix GlobalCommitter expansion So far, the global committer didn't properly get all properties set during expansion. In some cases, it didn't get expanded at all. This commit also inline SinkTransformationTranslatorITCaseBase.java after the respective V1 implementation was removed. V2 now has the same coverage as V1 used to have. --- ...obalCommitterTransformationTranslator.java | 22 +- .../SinkTransformationTranslator.java | 16 ++ ...inkTransformationTranslatorITCaseBase.java | 256 ------------------ .../SinkV2TransformationTranslatorITCase.java | 253 ++++++++++++++++- .../runtime/operators/sink/TestSinkV2.java | 4 +- 5 files changed, 274 insertions(+), 277 deletions(-) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java index e4b3449337d66..278a29477890c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/GlobalCommitterTransformationTranslator.java @@ -34,11 +34,14 @@ import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Queue; import java.util.Set; +import java.util.function.Consumer; +import java.util.function.Supplier; import static org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME; @@ -74,11 +77,10 @@ private Collection translateInternal( boolean commitOnInput = batch || !checkpointingEnabled || hasUpstreamCommitter(inputStream); // Create a global shuffle and add the global committer with parallelism 1. + DataStream> global = inputStream.global(); final PhysicalTransformation transformation = (PhysicalTransformation) - inputStream - .global() - .transform( + global.transform( GLOBAL_COMMITTER_TRANSFORMATION_NAME, Types.VOID, new GlobalCommitterOperator<>( @@ -87,10 +89,20 @@ private Collection translateInternal( commitOnInput)) .getTransformation(); transformation.setChainingStrategy(ChainingStrategy.ALWAYS); - transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME); transformation.setParallelism(1); transformation.setMaxParallelism(1); - return Collections.emptyList(); + copySafely(transformation::setName, globalCommitterTransform::getName); + copySafely(transformation::setUid, globalCommitterTransform::getUid); + copySafely(transformation::setUidHash, globalCommitterTransform::getUserProvidedNodeHash); + + return Arrays.asList(global.getId(), transformation.getId()); + } + + private static void copySafely(Consumer consumer, Supplier provider) { + T value = provider.get(); + if (value != null) { + consumer.accept(value); + } } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index 52449be1f1825..dd41218a4970f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -59,6 +59,8 @@ import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkState; @@ -177,6 +179,12 @@ private void expand() { getSinkTransformations(sizeBefore).forEach(context::transform); + repeatUntilConverged( + () -> + getSinkTransformations(sizeBefore).stream() + .flatMap(t -> context.transform(t).stream()) + .collect(Collectors.toList())); + disallowUnalignedCheckpoint(getSinkTransformations(sizeBefore)); // Remove all added sink subtransformations to avoid duplications and allow additional @@ -188,6 +196,14 @@ private void expand() { } } + private void repeatUntilConverged(Supplier producer) { + R lastResult = producer.get(); + R nextResult; + while (!lastResult.equals(nextResult = producer.get())) { + lastResult = nextResult; + } + } + private List> getSinkTransformations(int sizeBefore) { return executionEnvironment .getTransformations() diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java deleted file mode 100644 index e95523a1492ee..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java +++ /dev/null @@ -1,256 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.graph; - -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ExecutionOptions; -import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.StreamOperatorFactory; -import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; -import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; - -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; - -import java.util.Arrays; -import java.util.Collection; -import java.util.function.Predicate; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -/** - * Tests for {@link org.apache.flink.streaming.api.transformations.SinkTransformation}. - * - *

ATTENTION: This test is extremely brittle. Do NOT remove, add or re-order test cases. - */ -@ExtendWith(ParameterizedTestExtension.class) -abstract class SinkTransformationTranslatorITCaseBase { - - @Parameters(name = "Execution Mode: {0}") - private static Collection data() { - return Arrays.asList(RuntimeExecutionMode.STREAMING, RuntimeExecutionMode.BATCH); - } - - @Parameter protected RuntimeExecutionMode runtimeExecutionMode; - - static final String NAME = "FileSink"; - static final String SLOT_SHARE_GROUP = "FileGroup"; - static final String UID = "FileUid"; - static final int PARALLELISM = 2; - - abstract SinkT simpleSink(); - - abstract SinkT sinkWithCommitter(); - - abstract DataStreamSink sinkTo(DataStream stream, SinkT sink); - - @TestTemplate - void generateWriterTopology() { - final StreamGraph streamGraph = buildGraph(simpleSink(), runtimeExecutionMode); - - final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); - final StreamNode writerNode = findWriter(streamGraph); - - assertThat(streamGraph.getStreamNodes()).hasSize(2); - - validateTopology( - sourceNode, - IntSerializer.class, - writerNode, - SinkWriterOperatorFactory.class, - PARALLELISM, - -1); - } - - @TestTemplate - void generateWriterCommitterTopology() { - - final StreamGraph streamGraph = buildGraph(sinkWithCommitter(), runtimeExecutionMode); - - final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); - final StreamNode writerNode = findWriter(streamGraph); - - validateTopology( - sourceNode, - IntSerializer.class, - writerNode, - SinkWriterOperatorFactory.class, - PARALLELISM, - -1); - - final StreamNode committerNode = - findNodeName(streamGraph, name -> name.contains("Committer")); - - assertThat(streamGraph.getStreamNodes()).hasSize(3); - assertNoUnalignedOutput(writerNode); - - validateTopology( - writerNode, - SimpleVersionedSerializerTypeSerializerProxy.class, - committerNode, - CommitterOperatorFactory.class, - PARALLELISM, - -1); - } - - @TestTemplate - void testParallelismConfigured() { - testParallelismConfiguredInternal(true); - - testParallelismConfiguredInternal(false); - } - - private void testParallelismConfiguredInternal(boolean setSinkParallelism) { - final StreamGraph streamGraph = - buildGraph(sinkWithCommitter(), runtimeExecutionMode, setSinkParallelism); - - final StreamNode writerNode = findWriter(streamGraph); - final StreamNode committerNode = findCommitter(streamGraph); - - assertThat(writerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism); - assertThat(committerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism); - } - - StreamNode findWriter(StreamGraph streamGraph) { - return findNodeName( - streamGraph, name -> name.contains("Writer") && !name.contains("Committer")); - } - - StreamNode findCommitter(StreamGraph streamGraph) { - return findNodeName( - streamGraph, - name -> name.contains("Committer") && !name.contains("Global Committer")); - } - - StreamNode findGlobalCommitter(StreamGraph streamGraph) { - return findNodeName(streamGraph, name -> name.contains("Global Committer")); - } - - @TestTemplate - void throwExceptionWithoutSettingUid() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - final Configuration config = new Configuration(); - config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode); - env.configure(config, getClass().getClassLoader()); - // disable auto generating uid - env.getConfig().disableAutoGeneratedUIDs(); - sinkTo(env.fromElements(1, 2), simpleSink()); - assertThatThrownBy(env::getStreamGraph).isInstanceOf(IllegalStateException.class); - } - - @TestTemplate - void disableOperatorChain() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - final DataStreamSource src = env.fromElements(1, 2); - final DataStreamSink dataStreamSink = sinkTo(src, sinkWithCommitter()).name(NAME); - dataStreamSink.disableChaining(); - - final StreamGraph streamGraph = env.getStreamGraph(); - final StreamNode writer = findWriter(streamGraph); - final StreamNode committer = findCommitter(streamGraph); - - assertThat(writer.getOperatorFactory().getChainingStrategy()) - .isEqualTo(ChainingStrategy.NEVER); - assertThat(committer.getOperatorFactory().getChainingStrategy()) - .isEqualTo(ChainingStrategy.NEVER); - } - - void validateTopology( - StreamNode src, - Class srcOutTypeInfo, - StreamNode dest, - Class operatorFactoryClass, - int expectedParallelism, - int expectedMaxParallelism) { - - // verify src node - final StreamEdge srcOutEdge = src.getOutEdges().get(0); - assertThat(srcOutEdge.getTargetId()).isEqualTo(dest.getId()); - assertThat(src.getTypeSerializerOut()).isInstanceOf(srcOutTypeInfo); - - // verify dest node input - final StreamEdge destInputEdge = dest.getInEdges().get(0); - assertThat(destInputEdge.getTargetId()).isEqualTo(dest.getId()); - assertThat(dest.getTypeSerializersIn()[0]).isInstanceOf(srcOutTypeInfo); - - // make sure 2 sink operators have different names/uid - assertThat(dest.getOperatorName()).isNotEqualTo(src.getOperatorName()); - assertThat(dest.getTransformationUID()).isNotEqualTo(src.getTransformationUID()); - - assertThat(dest.getOperatorFactory()).isInstanceOf(operatorFactoryClass); - assertThat(dest.getParallelism()).isEqualTo(expectedParallelism); - assertThat(dest.getMaxParallelism()).isEqualTo(expectedMaxParallelism); - assertThat(dest.getOperatorFactory().getChainingStrategy()) - .isEqualTo(ChainingStrategy.ALWAYS); - assertThat(dest.getSlotSharingGroup()).isEqualTo(SLOT_SHARE_GROUP); - } - - protected static void assertNoUnalignedOutput(StreamNode src) { - assertThat(src.getOutEdges()).allMatch(e -> !e.supportsUnalignedCheckpoints()); - } - - StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode runtimeExecutionMode) { - return buildGraph(sink, runtimeExecutionMode, true); - } - - StreamGraph buildGraph( - SinkT sink, RuntimeExecutionMode runtimeExecutionMode, boolean setSinkParallelism) { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - final Configuration config = new Configuration(); - config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode); - env.configure(config, getClass().getClassLoader()); - final DataStreamSource src = env.fromElements(1, 2); - final DataStreamSink dataStreamSink = sinkTo(src.rebalance(), sink); - setSinkProperty(dataStreamSink, setSinkParallelism); - // Trigger the plan generation but do not clear the transformations - env.getExecutionPlan(); - return env.getStreamGraph(); - } - - private void setSinkProperty( - DataStreamSink dataStreamSink, boolean setSinkParallelism) { - dataStreamSink.name(NAME); - dataStreamSink.uid(UID); - if (setSinkParallelism) { - dataStreamSink.setParallelism(SinkTransformationTranslatorITCaseBase.PARALLELISM); - } - dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP); - } - - StreamNode findNodeName(StreamGraph streamGraph, Predicate predicate) { - return streamGraph.getStreamNodes().stream() - .filter(node -> predicate.test(node.getOperatorName())) - .findFirst() - .orElseThrow(() -> new IllegalStateException("Can not find the node")); - } -} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java index 6b09b83961dec..9e3753031c0bf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java @@ -18,77 +18,300 @@ package org.apache.flink.streaming.api.graph; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; +import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory; import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2; -import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.function.Predicate; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for {@link org.apache.flink.streaming.api.transformations.SinkTransformation}. * *

ATTENTION: This test is extremely brittle. Do NOT remove, add or re-order test cases. */ -@ExtendWith(ParameterizedTestExtension.class) -class SinkV2TransformationTranslatorITCase - extends SinkTransformationTranslatorITCaseBase> { +class SinkV2TransformationTranslatorITCase { + + static final String NAME = "FileSink"; + static final String SLOT_SHARE_GROUP = "FileGroup"; + static final String UID = "FileUid"; + static final int PARALLELISM = 2; + + protected static void assertNoUnalignedOutput(StreamNode src) { + assertThat(src.getOutEdges()).allMatch(e -> !e.supportsUnalignedCheckpoints()); + } - @Override Sink simpleSink() { return TestSinkV2.newBuilder().build(); } - @Override Sink sinkWithCommitter() { return TestSinkV2.newBuilder().setDefaultCommitter().build(); } - @Override + Sink sinkWithCommitterAndGlobalCommitter() { + return TestSinkV2.newBuilder() + .setDefaultCommitter() + .setWithPostCommitTopology(true) + .build(); + } + DataStreamSink sinkTo(DataStream stream, Sink sink) { return stream.sinkTo(sink); } - @TestTemplate + @Test void testSettingOperatorUidHash() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - final DataStreamSource src = env.fromElements(1, 2); + final DataStreamSource src = env.fromData(1, 2); final String writerHash = "f6b178ce445dc3ffaa06bad27a51fead"; final String committerHash = "68ac8ae79eae4e3135a54f9689c4aa10"; + final String globalCommitterHash = "77e6aa6eeb1643b3765e1e4a7a672f37"; final CustomSinkOperatorUidHashes operatorsUidHashes = CustomSinkOperatorUidHashes.builder() .setWriterUidHash(writerHash) .setCommitterUidHash(committerHash) + .setGlobalCommitterUidHash(globalCommitterHash) .build(); - src.sinkTo(sinkWithCommitter(), operatorsUidHashes).name(NAME); + src.sinkTo(sinkWithCommitterAndGlobalCommitter(), operatorsUidHashes).name(NAME); final StreamGraph streamGraph = env.getStreamGraph(); assertThat(findWriter(streamGraph).getUserHash()).isEqualTo(writerHash); assertThat(findCommitter(streamGraph).getUserHash()).isEqualTo(committerHash); + assertThat(findGlobalCommitter(streamGraph).getUserHash()).isEqualTo(globalCommitterHash); } /** * When ever you need to change something in this test case please think about possible state * upgrade problems introduced by your changes. */ - @TestTemplate + @Test void testSettingOperatorUids() { final String sinkUid = "f6b178ce445dc3ffaa06bad27a51fead"; final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - final DataStreamSource src = env.fromElements(1, 2); - src.sinkTo(sinkWithCommitter()).name(NAME).uid(sinkUid); + final DataStreamSource src = env.fromData(1, 2); + src.sinkTo(sinkWithCommitterAndGlobalCommitter()).name(NAME).uid(sinkUid); final StreamGraph streamGraph = env.getStreamGraph(); assertThat(findWriter(streamGraph).getTransformationUID()).isEqualTo(sinkUid); assertThat(findCommitter(streamGraph).getTransformationUID()) .isEqualTo(String.format("Sink Committer: %s", sinkUid)); + assertThat(findGlobalCommitter(streamGraph).getTransformationUID()) + .isEqualTo(String.format("Sink %s Global Committer", sinkUid)); + } + + @Test + void testSettingOperatorNames() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final DataStreamSource src = env.fromData(1, 2); + src.sinkTo(sinkWithCommitterAndGlobalCommitter()).name(NAME); + + final StreamGraph streamGraph = env.getStreamGraph(); + assertThat(findWriter(streamGraph).getOperatorName()) + .isEqualTo(String.format("%s: Writer", NAME)); + assertThat(findCommitter(streamGraph).getOperatorName()) + .isEqualTo(String.format("%s: Committer", NAME)); + assertThat(findGlobalCommitter(streamGraph).getOperatorName()) + .isEqualTo(String.format("%s: Global Committer", NAME)); + } + + @ParameterizedTest + @EnumSource(RuntimeExecutionMode.class) + void generateWriterTopology(RuntimeExecutionMode runtimeExecutionMode) { + final StreamGraph streamGraph = buildGraph(simpleSink(), runtimeExecutionMode); + + final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); + final StreamNode writerNode = findWriter(streamGraph); + + assertThat(streamGraph.getStreamNodes()).hasSize(2); + + validateTopology( + sourceNode, + IntSerializer.class, + writerNode, + SinkWriterOperatorFactory.class, + PARALLELISM, + -1); + } + + @ParameterizedTest + @EnumSource(RuntimeExecutionMode.class) + void generateWriterCommitterTopology(RuntimeExecutionMode runtimeExecutionMode) { + final StreamGraph streamGraph = buildGraph(sinkWithCommitter(), runtimeExecutionMode); + + final StreamNode sourceNode = findNodeName(streamGraph, node -> node.contains("Source")); + final StreamNode writerNode = findWriter(streamGraph); + + validateTopology( + sourceNode, + IntSerializer.class, + writerNode, + SinkWriterOperatorFactory.class, + PARALLELISM, + -1); + + final StreamNode committerNode = + findNodeName(streamGraph, name -> name.contains("Committer")); + + assertThat(streamGraph.getStreamNodes()).hasSize(3); + assertNoUnalignedOutput(writerNode); + + validateTopology( + writerNode, + SimpleVersionedSerializerTypeSerializerProxy.class, + committerNode, + CommitterOperatorFactory.class, + PARALLELISM, + -1); + } + + @ParameterizedTest + @CsvSource({"STREAMING, true", "STREAMING, false", "BATCH, true", "BATCH, false"}) + void testParallelismConfigured( + RuntimeExecutionMode runtimeExecutionMode, boolean setSinkParallelism) { + final StreamGraph streamGraph = + buildGraph(sinkWithCommitter(), runtimeExecutionMode, setSinkParallelism); + + final StreamNode writerNode = findWriter(streamGraph); + final StreamNode committerNode = findCommitter(streamGraph); + + assertThat(writerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism); + assertThat(committerNode.isParallelismConfigured()).isEqualTo(setSinkParallelism); + } + + @ParameterizedTest + @EnumSource(RuntimeExecutionMode.class) + void throwExceptionWithoutSettingUid(RuntimeExecutionMode runtimeExecutionMode) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final Configuration config = new Configuration(); + config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode); + env.configure(config, getClass().getClassLoader()); + // disable auto generating uid + env.getConfig().disableAutoGeneratedUIDs(); + sinkTo(env.fromData(1, 2), simpleSink()); + assertThatThrownBy(env::getStreamGraph).isInstanceOf(IllegalStateException.class); + } + + @Test + void disableOperatorChain() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStreamSource src = env.fromData(1, 2); + final DataStreamSink dataStreamSink = sinkTo(src, sinkWithCommitter()).name(NAME); + dataStreamSink.disableChaining(); + + final StreamGraph streamGraph = env.getStreamGraph(); + final StreamNode writer = findWriter(streamGraph); + final StreamNode committer = findCommitter(streamGraph); + + assertThat(writer.getOperatorFactory().getChainingStrategy()) + .isEqualTo(ChainingStrategy.NEVER); + assertThat(committer.getOperatorFactory().getChainingStrategy()) + .isEqualTo(ChainingStrategy.NEVER); + } + + void validateTopology( + StreamNode src, + Class srcOutTypeInfo, + StreamNode dest, + Class operatorFactoryClass, + int expectedParallelism, + int expectedMaxParallelism) { + + // verify src node + final StreamEdge srcOutEdge = src.getOutEdges().get(0); + assertThat(srcOutEdge.getTargetId()).isEqualTo(dest.getId()); + assertThat(src.getTypeSerializerOut()).isInstanceOf(srcOutTypeInfo); + + // verify dest node input + final StreamEdge destInputEdge = dest.getInEdges().get(0); + assertThat(destInputEdge.getTargetId()).isEqualTo(dest.getId()); + assertThat(dest.getTypeSerializersIn()[0]).isInstanceOf(srcOutTypeInfo); + + // make sure 2 sink operators have different names/uid + assertThat(dest.getOperatorName()).isNotEqualTo(src.getOperatorName()); + assertThat(dest.getTransformationUID()).isNotEqualTo(src.getTransformationUID()); + + assertThat(dest.getOperatorFactory()).isInstanceOf(operatorFactoryClass); + assertThat(dest.getParallelism()).isEqualTo(expectedParallelism); + assertThat(dest.getMaxParallelism()).isEqualTo(expectedMaxParallelism); + assertThat(dest.getOperatorFactory().getChainingStrategy()) + .isEqualTo(ChainingStrategy.ALWAYS); + assertThat(dest.getSlotSharingGroup()).isEqualTo(SLOT_SHARE_GROUP); + } + + StreamGraph buildGraph(Sink sink, RuntimeExecutionMode runtimeExecutionMode) { + return buildGraph(sink, runtimeExecutionMode, true); + } + + StreamGraph buildGraph( + Sink sink, + RuntimeExecutionMode runtimeExecutionMode, + boolean setSinkParallelism) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final Configuration config = new Configuration(); + config.set(ExecutionOptions.RUNTIME_MODE, runtimeExecutionMode); + env.configure(config, getClass().getClassLoader()); + final DataStreamSource src = env.fromData(1, 2); + final DataStreamSink dataStreamSink = sinkTo(src.rebalance(), sink); + setSinkProperty(dataStreamSink, setSinkParallelism); + // Trigger the plan generation but do not clear the transformations + env.getExecutionPlan(); + return env.getStreamGraph(); + } + + private void setSinkProperty( + DataStreamSink dataStreamSink, boolean setSinkParallelism) { + dataStreamSink.name(NAME); + dataStreamSink.uid(UID); + if (setSinkParallelism) { + dataStreamSink.setParallelism(SinkV2TransformationTranslatorITCase.PARALLELISM); + } + dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP); + } + + StreamNode findNodeName(StreamGraph streamGraph, Predicate predicate) { + return streamGraph.getStreamNodes().stream() + .filter(node -> predicate.test(node.getOperatorName())) + .findFirst() + .orElseThrow(() -> new IllegalStateException("Can not find the node")); + } + + StreamNode findWriter(StreamGraph streamGraph) { + return findNodeName( + streamGraph, name -> name.contains("Writer") && !name.contains("Committer")); + } + + StreamNode findCommitter(StreamGraph streamGraph) { + return findNodeName( + streamGraph, + name -> name.contains("Committer") && !name.contains("Global Committer")); + } + + StreamNode findGlobalCommitter(StreamGraph streamGraph) { + return findNodeName(streamGraph, name -> name.contains("Global Committer")); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java index fc1a9066c7cd8..18f934752a377 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/TestSinkV2.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; import org.apache.flink.streaming.api.connector.sink2.CommittableSummary; import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage; +import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology; import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology; import org.apache.flink.streaming.api.datastream.DataStream; @@ -241,7 +242,8 @@ public TestSinkV2WithPostCommitTopology( @Override public void addPostCommitTopology(DataStream> committables) { - // We do not need to do anything for tests + StandardSinkTopologies.addGlobalCommitter( + committables, this::createCommitter, this::getCommittableSerializer); } }