diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
index 81458f0cae4f2..fb9f38ed96707 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSort.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.nodes.exec.stream;
+import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
@@ -28,6 +29,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
@@ -37,7 +39,11 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
import java.util.Collections;
+import java.util.List;
/**
* {@link StreamExecNode} for Sort.
@@ -45,10 +51,19 @@
*
NOTES: This class is used for testing with bounded source now. If a query is converted
* to this node in product environment, an exception will be thrown.
*/
+@ExecNodeMetadata(
+ name = "stream-exec-sort",
+ version = 1,
+ producedTransformations = {StreamExecSort.SORT_TRANSFORMATION},
+ minPlanVersion = FlinkVersion.v1_18,
+ minStateVersion = FlinkVersion.v1_18)
public class StreamExecSort extends ExecNodeBase implements StreamExecNode {
- private static final String SORT_TRANSFORMATION = "sort";
+ public static final String SORT_TRANSFORMATION = "sort";
+
+ public static final String FIELD_NAME_SORT_SPEC = "orderBy";
+ @JsonProperty(FIELD_NAME_SORT_SPEC)
private final SortSpec sortSpec;
public StreamExecSort(
@@ -67,6 +82,20 @@ public StreamExecSort(
this.sortSpec = sortSpec;
}
+ @JsonCreator
+ public StreamExecSort(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+
+ super(id, context, persistedConfig, inputProperties, outputType, description);
+ this.sortSpec = sortSpec;
+ }
+
@SuppressWarnings("unchecked")
@Override
protected Transformation translateToPlanInternal(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
index 174c169d41a49..5c467f29583cb 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/ExecNodeMetadataUtil.java
@@ -126,6 +126,7 @@ private ExecNodeMetadataUtil() {
add(StreamExecRank.class);
add(StreamExecSink.class);
add(StreamExecSortLimit.class);
+ add(StreamExecSort.class);
add(StreamExecTableSourceScan.class);
add(StreamExecTemporalJoin.class);
add(StreamExecTemporalSort.class);
@@ -163,7 +164,6 @@ private ExecNodeMetadataUtil() {
add(StreamExecLegacySink.class);
add(StreamExecGroupTableAggregate.class);
add(StreamExecPythonGroupTableAggregate.class);
- add(StreamExecSort.class);
add(StreamExecMultipleInput.class);
}
};
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java
new file mode 100644
index 0000000000000..a23264cc2b753
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test json serialization for sort limit. */
+public class SortJsonPlanTest extends TableTestBase {
+
+ private StreamTableTestUtil util;
+ private TableEnvironment tEnv;
+
+ @Before
+ public void setup() {
+ util = streamTestUtil(TableConfig.getDefault());
+ tEnv = util.getTableEnv();
+
+ String srcTableDdl =
+ "CREATE TABLE MyTable (\n"
+ + " a bigint,\n"
+ + " b int not null,\n"
+ + " c varchar,\n"
+ + " d timestamp(3)\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false')";
+ tEnv.executeSql(srcTableDdl);
+ }
+
+ @Test
+ public void testSort() {
+ String sinkTableDdl =
+ "CREATE TABLE MySink (\n"
+ + " a bigint,\n"
+ + " b bigint\n"
+ + ") with (\n"
+ + " 'connector' = 'values',\n"
+ + " 'sink-insert-only' = 'false',\n"
+ + " 'table-sink-class' = 'DEFAULT')";
+ tEnv.executeSql(sinkTableDdl);
+ String sql = "insert into MySink SELECT a, a from MyTable order by b";
+ util.verifyJsonPlan(sql);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out
new file mode 100644
index 0000000000000..a0d7d070ec1de
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/SortJsonPlanTest_jsonplan/testSort.out
@@ -0,0 +1,172 @@
+{
+ "flinkVersion" : "",
+ "nodes" : [ {
+ "id" : 1,
+ "type" : "stream-exec-table-source-scan_1",
+ "scanTableSource" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MyTable`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "INT NOT NULL"
+ }, {
+ "name" : "c",
+ "dataType" : "VARCHAR(2147483647)"
+ }, {
+ "name" : "d",
+ "dataType" : "TIMESTAMP(3)"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "connector" : "values",
+ "bounded" : "false"
+ }
+ }
+ },
+ "abilities" : [ {
+ "type" : "ProjectPushDown",
+ "projectedFields" : [ [ 0 ], [ 1 ] ],
+ "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+ }, {
+ "type" : "ReadingMetadata",
+ "metadataKeys" : [ ],
+ "producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
+ } ]
+ },
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+ "description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
+ "inputProperties" : [ ]
+ }, {
+ "id" : 2,
+ "type" : "stream-exec-exchange_1",
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "SINGLETON"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+ "description" : "Exchange(distribution=[single])"
+ }, {
+ "id" : 3,
+ "type" : "stream-exec-sort_1",
+ "orderBy" : {
+ "fields" : [ {
+ "index" : 1,
+ "isAscending" : true,
+ "nullIsLast" : false
+ } ]
+ },
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
+ "description" : "Sort(orderBy=[b ASC])"
+ }, {
+ "id" : 4,
+ "type" : "stream-exec-calc_1",
+ "projection" : [ {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ }, {
+ "kind" : "INPUT_REF",
+ "inputIndex" : 0,
+ "type" : "BIGINT"
+ } ],
+ "condition" : null,
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `a1` BIGINT>",
+ "description" : "Calc(select=[a, a AS a1])"
+ }, {
+ "id" : 5,
+ "type" : "stream-exec-sink_1",
+ "configuration" : {
+ "table.exec.sink.keyed-shuffle" : "AUTO",
+ "table.exec.sink.not-null-enforcer" : "ERROR",
+ "table.exec.sink.rowtime-inserter" : "ENABLED",
+ "table.exec.sink.type-length-enforcer" : "IGNORE",
+ "table.exec.sink.upsert-materialize" : "AUTO"
+ },
+ "dynamicTableSink" : {
+ "table" : {
+ "identifier" : "`default_catalog`.`default_database`.`MySink`",
+ "resolvedTable" : {
+ "schema" : {
+ "columns" : [ {
+ "name" : "a",
+ "dataType" : "BIGINT"
+ }, {
+ "name" : "b",
+ "dataType" : "BIGINT"
+ } ],
+ "watermarkSpecs" : [ ]
+ },
+ "partitionKeys" : [ ],
+ "options" : {
+ "sink-insert-only" : "false",
+ "table-sink-class" : "DEFAULT",
+ "connector" : "values"
+ }
+ }
+ }
+ },
+ "inputChangelogMode" : [ "INSERT" ],
+ "inputProperties" : [ {
+ "requiredDistribution" : {
+ "type" : "UNKNOWN"
+ },
+ "damBehavior" : "PIPELINED",
+ "priority" : 0
+ } ],
+ "outputType" : "ROW<`a` BIGINT, `a1` BIGINT>",
+ "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, a1])"
+ } ],
+ "edges" : [ {
+ "source" : 1,
+ "target" : 2,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 2,
+ "target" : 3,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 3,
+ "target" : 4,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ }, {
+ "source" : 4,
+ "target" : 5,
+ "shuffle" : {
+ "type" : "FORWARD"
+ },
+ "shuffleMode" : "PIPELINED"
+ } ]
+}
\ No newline at end of file