From 94b5f031a785d16077d870fe9e009d168077430b Mon Sep 17 00:00:00 2001 From: sxnan Date: Fri, 26 Jan 2024 15:18:13 +0800 Subject: [PATCH] [FLINK-34371][runtime] Set OutputOnlyAfterEndOfStream on WindowOperator and StreamSortOperator This closes #24272 --- .../runtime/operators/windowing/WindowOperator.java | 13 +++++++++++++ .../runtime/operators/sort/StreamSortOperator.java | 7 +++++++ 2 files changed, 20 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 049db3144b2c8..5c3929f79dc64 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -57,8 +57,11 @@ import org.apache.flink.streaming.api.operators.InternalTimer; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.triggers.Trigger; @@ -527,6 +530,16 @@ && isCleanupTime(triggerContext.window, timer.getTimestamp())) { } } + @Override + public OperatorAttributes getOperatorAttributes() { + boolean isOutputOnlyAfterEndOfStream = + windowAssigner instanceof GlobalWindows + && trigger instanceof GlobalWindows.EndOfStreamTrigger; + return new OperatorAttributesBuilder() + .setOutputOnlyAfterEndOfStream(isOutputOnlyAfterEndOfStream) + .build(); + } + /** * Drops all state for the given window and calls {@link Trigger#clear(Window, * Trigger.TriggerContext)}. diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java index d462b16e0ea66..636d2563684bb 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sort/StreamSortOperator.java @@ -26,6 +26,8 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.OperatorAttributes; +import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; @@ -153,4 +155,9 @@ public void finish() throws Exception { } super.finish(); } + + @Override + public OperatorAttributes getOperatorAttributes() { + return new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build(); + } }