Skip to content

Commit

Permalink
[FLINK-34371][runtime] Support EndOfStreamTrigger in GlobalWindows
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub authored and xintongsong committed Feb 28, 2024
1 parent 89b8c5c commit 5916aa4
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

import javax.annotation.Nullable;

import java.util.Collection;
import java.util.Collections;

Expand All @@ -39,8 +41,11 @@
@PublicEvolving
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;
@Nullable private final Trigger<Object, GlobalWindow> defaultTrigger;

private GlobalWindows() {}
private GlobalWindows(Trigger<Object, GlobalWindow> defaultTrigger) {
this.defaultTrigger = defaultTrigger;
}

@Override
public Collection<GlobalWindow> assignWindows(
Expand All @@ -56,22 +61,29 @@ public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironmen

@Override
public Trigger<Object, GlobalWindow> getDefaultTrigger() {
return new NeverTrigger();
return defaultTrigger == null ? new NeverTrigger() : defaultTrigger;
}

@Override
public String toString() {
return "GlobalWindows()";
return "GlobalWindows(trigger=" + getDefaultTrigger().getClass().getSimpleName() + ")";
}

/**
* Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns all elements to the
* same {@link GlobalWindow}.
*
* @return The global window policy.
* Creates a {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}.
* The window is only useful if you also specify a custom trigger. Otherwise, the window will
* never be triggered and no computation will be performed.
*/
public static GlobalWindows create() {
return new GlobalWindows();
return new GlobalWindows(new NeverTrigger());
}

/**
* Creates a {@link WindowAssigner} that assigns all elements to the same {@link GlobalWindow}
* and the window is triggered if and only if the input stream is ended.
*/
public static GlobalWindows createWithEndOfStreamTrigger() {
return new GlobalWindows(new EndOfStreamTrigger());
}

/** A trigger that never fires, as default Trigger for GlobalWindows. */
Expand Down Expand Up @@ -107,6 +119,35 @@ public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executio
return new GlobalWindow.Serializer();
}

/** A trigger that fires iff the input stream reaches EndOfStream. */
@Internal
public static class EndOfStreamTrigger extends Trigger<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;

@Override
public TriggerResult onElement(
Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}

@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}

@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}

@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {}
}

@Override
public boolean isEventTime() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,8 @@ public void testProperties() {
assertEquals(
new GlobalWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
assertThat(assigner.getDefaultTrigger(), instanceOf(GlobalWindows.NeverTrigger.class));
assigner = GlobalWindows.createWithEndOfStreamTrigger();
assertThat(
assigner.getDefaultTrigger(), instanceOf(GlobalWindows.EndOfStreamTrigger.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.runtime.operators.windowing;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
Expand Down Expand Up @@ -1334,6 +1335,72 @@ public void testCountTrigger() throws Exception {
testHarness.close();
}

@Test
public void testEndOfStreamTrigger() throws Exception {
ReducingStateDescriptor<Tuple2<String, Integer>> stateDesc =
new ReducingStateDescriptor<>(
"window-contents",
new SumReducer(),
STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));

WindowOperator<
String,
Tuple2<String, Integer>,
Tuple2<String, Integer>,
Tuple2<String, Integer>,
GlobalWindow>
operator =
new WindowOperator<>(
GlobalWindows.createWithEndOfStreamTrigger(),
new GlobalWindow.Serializer(),
new TupleKeySelector(),
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(
new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(
new PassThroughWindowFunction<
String, GlobalWindow, Tuple2<String, Integer>>()),
GlobalWindows.createWithEndOfStreamTrigger().getDefaultTrigger(),
0,
null /* late data output tag */);

OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>>
testHarness = createTestHarness(operator);

testHarness.open();

// add elements out-of-order
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 3999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 20));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 0));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));

TestHarnessUtil.assertOutputEqualsSorted(
"Output was not correct.",
Collections.EMPTY_LIST,
testHarness.getOutput(),
new Tuple2ResultSortComparator());

testHarness.processWatermark(Watermark.MAX_WATERMARK);

ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key1", 3), Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(new Tuple2<>("key2", 5), Long.MAX_VALUE));
expectedOutput.add(Watermark.MAX_WATERMARK);

TestHarnessUtil.assertOutputEqualsSorted(
"Output was not correct.",
expectedOutput,
testHarness.getOutput(),
new Tuple2ResultSortComparator());

testHarness.close();
}

@Test
public void testProcessingTimeTumblingWindows() throws Throwable {
final int windowSize = 3;
Expand Down

0 comments on commit 5916aa4

Please sign in to comment.