Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36788] Fix and cover global committer #25685

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.transformations.GlobalCommitterTransform;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;

/** This utility class provides building blocks for custom topologies. */
Expand All @@ -39,12 +41,29 @@ private StandardSinkTopologies() {}
*/
public static <CommT> void addGlobalCommitter(
DataStream<CommittableMessage<CommT>> committables,
SerializableSupplier<Committer<CommT>> committerFactory,
SerializableFunction<CommitterInitContext, Committer<CommT>> committerFactory,
SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer) {
committables
.getExecutionEnvironment()
.addOperator(
new GlobalCommitterTransform<>(
committables, committerFactory, committableSerializer));
}

/**
* Adds a global committer to the pipeline that runs as final operator with a parallelism of
* one.
*/
public static <CommT> void addGlobalCommitter(
fapaul marked this conversation as resolved.
Show resolved Hide resolved
DataStream<CommittableMessage<CommT>> committables,
SerializableSupplier<Committer<CommT>> committerFactory,
SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer) {
committables
.getExecutionEnvironment()
.addOperator(
new GlobalCommitterTransform<>(
committables,
ctx -> committerFactory.get(),
committableSerializer));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;

import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;
Expand All @@ -45,12 +47,12 @@
public class GlobalCommitterTransform<CommT> extends TransformationWithLineage<Void> {

private final DataStream<CommittableMessage<CommT>> inputStream;
private final SerializableSupplier<Committer<CommT>> committerFactory;
private final SerializableFunction<CommitterInitContext, Committer<CommT>> committerFactory;
private final SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer;

public GlobalCommitterTransform(
DataStream<CommittableMessage<CommT>> inputStream,
SerializableSupplier<Committer<CommT>> committerFactory,
SerializableFunction<CommitterInitContext, Committer<CommT>> committerFactory,
SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializer) {
super(StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME, Types.VOID, 1, true);
this.inputStream = inputStream;
Expand Down Expand Up @@ -78,7 +80,7 @@ public DataStream<CommittableMessage<CommT>> getInputStream() {
return inputStream;
}

public SerializableSupplier<Committer<CommT>> getCommitterFactory() {
public SerializableFunction<CommitterInitContext, Committer<CommT>> getCommitterFactory() {
return committerFactory;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;

import java.util.OptionalLong;

import static org.apache.flink.util.Preconditions.checkNotNull;

class CommitterInitContextImpl extends InitContextBase implements CommitterInitContext {

private final SinkCommitterMetricGroup metricGroup;

public CommitterInitContextImpl(
StreamingRuntimeContext runtimeContext,
SinkCommitterMetricGroup metricGroup,
OptionalLong restoredCheckpointId) {
super(runtimeContext, restoredCheckpointId);
this.metricGroup = checkNotNull(metricGroup);
}

@Override
public SinkCommitterMetricGroup metricGroup() {
return metricGroup;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
Expand Down Expand Up @@ -123,7 +122,8 @@ protected void setup(
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
OptionalLong checkpointId = context.getRestoredCheckpointId();
CommitterInitContext initContext = createInitContext(checkpointId);
CommitterInitContext initContext =
new CommitterInitContextImpl(getRuntimeContext(), metricGroup, checkpointId);
committer = committerSupplier.apply(initContext);
committableCollectorState =
new SimpleVersionedListState<>(
Expand Down Expand Up @@ -213,27 +213,4 @@ public void processElement(StreamRecord<CommittableMessage<CommT>> element) thro
public void close() throws Exception {
closeAll(committer, super::close);
}

private CommitterInitContext createInitContext(OptionalLong restoredCheckpointId) {
return new CommitterInitContextImp(getRuntimeContext(), metricGroup, restoredCheckpointId);
}

private static class CommitterInitContextImp extends InitContextBase
implements CommitterInitContext {

private final SinkCommitterMetricGroup metricGroup;

public CommitterInitContextImp(
StreamingRuntimeContext runtimeContext,
SinkCommitterMetricGroup metricGroup,
OptionalLong restoredCheckpointId) {
super(runtimeContext, restoredCheckpointId);
this.metricGroup = checkNotNull(metricGroup);
}

@Override
public SinkCommitterMetricGroup metricGroup() {
return metricGroup;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.api.connector.sink2;
package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,21 @@
* limitations under the License.
*/

package org.apache.flink.streaming.api.connector.sink2;
package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
Expand All @@ -39,6 +41,7 @@
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;

import javax.annotation.Nullable;
Expand All @@ -47,6 +50,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -100,7 +104,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamO
new ListStateDescriptor<>(
"streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);

private final SerializableSupplier<Committer<CommT>> committerFactory;
private final SerializableFunction<CommitterInitContext, Committer<CommT>> committerFactory;
private final SerializableSupplier<SimpleVersionedSerializer<CommT>>
committableSerializerFactory;
/**
Expand All @@ -121,7 +125,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamO
private List<GlobalCommT> sinkV1State = new ArrayList<>();

public GlobalCommitterOperator(
SerializableSupplier<Committer<CommT>> committerFactory,
SerializableFunction<CommitterInitContext, Committer<CommT>> committerFactory,
SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory,
boolean commitOnInput) {
this.committerFactory = checkNotNull(committerFactory);
Expand All @@ -135,7 +139,6 @@ protected void setup(
StreamConfig config,
Output<StreamRecord<Void>> output) {
super.setup(containingTask, config, output);
committer = committerFactory.get();
metricGroup = InternalSinkCommitterMetricGroup.wrap(metrics);
committableCollector = CommittableCollector.of(metricGroup);
committableSerializer = committableSerializerFactory.get();
Expand All @@ -155,6 +158,11 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
OptionalLong restoredCheckpointId = context.getRestoredCheckpointId();
committer =
committerFactory.apply(
new CommitterInitContextImpl(
getRuntimeContext(), metricGroup, restoredCheckpointId));
globalCommitterState =
new SimpleVersionedListState<>(
context.getOperatorStateStore()
Expand All @@ -169,8 +177,8 @@ public void initializeState(StateInitializationContext context) throws Exception
committableCollector.merge(cc.getCommittableCollector());
});
// try to re-commit recovered transactions as quickly as possible
if (context.getRestoredCheckpointId().isPresent()) {
commit(context.getRestoredCheckpointId().getAsLong());
if (restoredCheckpointId.isPresent()) {
commit(restoredCheckpointId.getAsLong());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.streaming.api.connector.sink2;
package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerialization;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
Expand All @@ -31,14 +30,18 @@
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.GlobalCommitterOperator;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;

import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Queue;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME;

Expand Down Expand Up @@ -74,11 +77,10 @@ private Collection<Integer> translateInternal(
boolean commitOnInput = batch || !checkpointingEnabled || hasUpstreamCommitter(inputStream);

// Create a global shuffle and add the global committer with parallelism 1.
DataStream<CommittableMessage<CommT>> global = inputStream.global();
final PhysicalTransformation<Void> transformation =
(PhysicalTransformation<Void>)
inputStream
.global()
.transform(
global.transform(
GLOBAL_COMMITTER_TRANSFORMATION_NAME,
Types.VOID,
new GlobalCommitterOperator<>(
Expand All @@ -87,10 +89,20 @@ private Collection<Integer> translateInternal(
commitOnInput))
.getTransformation();
transformation.setChainingStrategy(ChainingStrategy.ALWAYS);
transformation.setName(GLOBAL_COMMITTER_TRANSFORMATION_NAME);
transformation.setParallelism(1);
transformation.setMaxParallelism(1);
return Collections.emptyList();
copySafely(transformation::setName, globalCommitterTransform::getName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which scenarios does the globalCommitterTransform have it's own attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We set the attributes while expanding the post commit topology. We expand this transform almost at the end of the sink expansion. So it's fair to say that we have it in all cases where we set the properties during post commit expansion. It's not set if no uid or customUidHashes are user-supplied. I think name should always be set but I still use the copySafely method because you never know how things will evolve.

copySafely(transformation::setUid, globalCommitterTransform::getUid);
copySafely(transformation::setUidHash, globalCommitterTransform::getUserProvidedNodeHash);

return Arrays.asList(global.getId(), transformation.getId());
}

private static <T> void copySafely(Consumer<T> consumer, Supplier<T> provider) {
T value = provider.get();
if (value != null) {
consumer.accept(value);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkState;

Expand Down Expand Up @@ -177,6 +179,12 @@ private void expand() {

getSinkTransformations(sizeBefore).forEach(context::transform);

repeatUntilConverged(
() ->
getSinkTransformations(sizeBefore).stream()
.flatMap(t -> context.transform(t).stream())
.collect(Collectors.toList()));

disallowUnalignedCheckpoint(getSinkTransformations(sizeBefore));

// Remove all added sink subtransformations to avoid duplications and allow additional
Expand All @@ -188,6 +196,14 @@ private void expand() {
}
}

private <R> void repeatUntilConverged(Supplier<R> producer) {
R lastResult = producer.get();
R nextResult;
while (!lastResult.equals(nextResult = producer.get())) {
lastResult = nextResult;
}
}

private List<Transformation<?>> getSinkTransformations(int sizeBefore) {
fapaul marked this conversation as resolved.
Show resolved Hide resolved
return executionEnvironment
.getTransformations()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.streaming.api.connector.sink2;

import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer;

import org.junit.jupiter.api.Test;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.connector.sink2;

import org.apache.flink.api.common.typeutils.TypeInformationTestBase;
import org.apache.flink.streaming.runtime.operators.sink.IntegerSerializer;

/** Test for {@link CommittableMessageTypeInfo}. */
class CommittableMessageTypeInfoTest
Expand Down
Loading