Skip to content

Commit

Permalink
[FLINK-33158] Cryptic exception when there is a StreamExecSort in Jso…
Browse files Browse the repository at this point in the history
…nPlan
  • Loading branch information
dawidwys committed Oct 3, 2023
1 parent ab26175 commit 4afd092
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableException;
Expand All @@ -28,6 +29,7 @@
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
Expand All @@ -37,18 +39,31 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;
import java.util.List;

/**
* {@link StreamExecNode} for Sort.
*
* <p><b>NOTES:</b> This class is used for testing with bounded source now. If a query is converted
* to this node in product environment, an exception will be thrown.
*/
@ExecNodeMetadata(
name = "stream-exec-sort",
version = 1,
producedTransformations = {StreamExecSort.SORT_TRANSFORMATION},
minPlanVersion = FlinkVersion.v1_18,
minStateVersion = FlinkVersion.v1_18)
public class StreamExecSort extends ExecNodeBase<RowData> implements StreamExecNode<RowData> {

private static final String SORT_TRANSFORMATION = "sort";
public static final String SORT_TRANSFORMATION = "sort";

public static final String FIELD_NAME_SORT_SPEC = "orderBy";

@JsonProperty(FIELD_NAME_SORT_SPEC)
private final SortSpec sortSpec;

public StreamExecSort(
Expand All @@ -67,6 +82,20 @@ public StreamExecSort(
this.sortSpec = sortSpec;
}

@JsonCreator
public StreamExecSort(
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
@JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
@JsonProperty(FIELD_NAME_SORT_SPEC) SortSpec sortSpec,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {

super(id, context, persistedConfig, inputProperties, outputType, description);
this.sortSpec = sortSpec;
}

@SuppressWarnings("unchecked")
@Override
protected Transformation<RowData> translateToPlanInternal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private ExecNodeMetadataUtil() {
add(StreamExecRank.class);
add(StreamExecSink.class);
add(StreamExecSortLimit.class);
add(StreamExecSort.class);
add(StreamExecTableSourceScan.class);
add(StreamExecTemporalJoin.class);
add(StreamExecTemporalSort.class);
Expand Down Expand Up @@ -163,7 +164,6 @@ private ExecNodeMetadataUtil() {
add(StreamExecLegacySink.class);
add(StreamExecGroupTableAggregate.class);
add(StreamExecPythonGroupTableAggregate.class);
add(StreamExecSort.class);
add(StreamExecMultipleInput.class);
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;

import org.junit.Before;
import org.junit.Test;

/** Test json serialization for sort limit. */
public class SortJsonPlanTest extends TableTestBase {

private StreamTableTestUtil util;
private TableEnvironment tEnv;

@Before
public void setup() {
util = streamTestUtil(TableConfig.getDefault());
tEnv = util.getTableEnv();

String srcTableDdl =
"CREATE TABLE MyTable (\n"
+ " a bigint,\n"
+ " b int not null,\n"
+ " c varchar,\n"
+ " d timestamp(3)\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'false')";
tEnv.executeSql(srcTableDdl);
}

@Test
public void testSort() {
String sinkTableDdl =
"CREATE TABLE MySink (\n"
+ " a bigint,\n"
+ " b bigint\n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'sink-insert-only' = 'false',\n"
+ " 'table-sink-class' = 'DEFAULT')";
tEnv.executeSql(sinkTableDdl);
String sql = "insert into MySink SELECT a, a from MyTable order by b";
util.verifyJsonPlan(sql);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
{
"flinkVersion" : "",
"nodes" : [ {
"id" : 1,
"type" : "stream-exec-table-source-scan_1",
"scanTableSource" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`MyTable`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "a",
"dataType" : "BIGINT"
}, {
"name" : "b",
"dataType" : "INT NOT NULL"
}, {
"name" : "c",
"dataType" : "VARCHAR(2147483647)"
}, {
"name" : "d",
"dataType" : "TIMESTAMP(3)"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ],
"options" : {
"connector" : "values",
"bounded" : "false"
}
}
},
"abilities" : [ {
"type" : "ProjectPushDown",
"projectedFields" : [ [ 0 ], [ 1 ] ],
"producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
}, {
"type" : "ReadingMetadata",
"metadataKeys" : [ ],
"producedType" : "ROW<`a` BIGINT, `b` INT NOT NULL> NOT NULL"
} ]
},
"outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
"description" : "TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[a, b], metadata=[]]], fields=[a, b])",
"inputProperties" : [ ]
}, {
"id" : 2,
"type" : "stream-exec-exchange_1",
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "SINGLETON"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
"description" : "Exchange(distribution=[single])"
}, {
"id" : 3,
"type" : "stream-exec-sort_1",
"orderBy" : {
"fields" : [ {
"index" : 1,
"isAscending" : true,
"nullIsLast" : false
} ]
},
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` BIGINT, `b` INT NOT NULL>",
"description" : "Sort(orderBy=[b ASC])"
}, {
"id" : 4,
"type" : "stream-exec-calc_1",
"projection" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 0,
"type" : "BIGINT"
}, {
"kind" : "INPUT_REF",
"inputIndex" : 0,
"type" : "BIGINT"
} ],
"condition" : null,
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` BIGINT, `a1` BIGINT>",
"description" : "Calc(select=[a, a AS a1])"
}, {
"id" : 5,
"type" : "stream-exec-sink_1",
"configuration" : {
"table.exec.sink.keyed-shuffle" : "AUTO",
"table.exec.sink.not-null-enforcer" : "ERROR",
"table.exec.sink.rowtime-inserter" : "ENABLED",
"table.exec.sink.type-length-enforcer" : "IGNORE",
"table.exec.sink.upsert-materialize" : "AUTO"
},
"dynamicTableSink" : {
"table" : {
"identifier" : "`default_catalog`.`default_database`.`MySink`",
"resolvedTable" : {
"schema" : {
"columns" : [ {
"name" : "a",
"dataType" : "BIGINT"
}, {
"name" : "b",
"dataType" : "BIGINT"
} ],
"watermarkSpecs" : [ ]
},
"partitionKeys" : [ ],
"options" : {
"sink-insert-only" : "false",
"table-sink-class" : "DEFAULT",
"connector" : "values"
}
}
}
},
"inputChangelogMode" : [ "INSERT" ],
"inputProperties" : [ {
"requiredDistribution" : {
"type" : "UNKNOWN"
},
"damBehavior" : "PIPELINED",
"priority" : 0
} ],
"outputType" : "ROW<`a` BIGINT, `a1` BIGINT>",
"description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, a1])"
} ],
"edges" : [ {
"source" : 1,
"target" : 2,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 2,
"target" : 3,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 3,
"target" : 4,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
}, {
"source" : 4,
"target" : 5,
"shuffle" : {
"type" : "FORWARD"
},
"shuffleMode" : "PIPELINED"
} ]
}

0 comments on commit 4afd092

Please sign in to comment.