diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
index 22e93800aeb9a..9bd055f75d23c 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java
@@ -17,6 +17,7 @@
package org.apache.calcite.sql.validate;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlCall;
@@ -56,9 +57,9 @@ public final class ProcedureNamespace extends AbstractNamespace {
public RelDataType validateImpl(RelDataType targetRowType) {
validator.inferUnknownTypes(validator.unknownType, scope, call);
// The result is ignored but the type is derived to trigger the validation
- validator.deriveTypeImpl(scope, call);
+ final SqlCallBinding callBinding = new FlinkSqlCallBinding(validator, scope, call);
+ validator.deriveTypeImpl(scope, callBinding.permutedCall());
final SqlOperator operator = call.getOperator();
- final SqlCallBinding callBinding = new SqlCallBinding(validator, scope, call);
if (!(operator instanceof SqlTableFunction)) {
throw new IllegalArgumentException(
"Argument must be a table function: " + operator.getNameAsId());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 3322f4f458a99..5d2bfbd74195d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.sql.validate;
+import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -158,16 +160,17 @@
* Default implementation of {@link SqlValidator}, the class was copied over because of
* CALCITE-4554.
*
- *
Lines 1958 ~ 1978, Flink improves error message for functions without appropriate arguments in
+ *
Lines 1961 ~ 1981, Flink improves error message for functions without appropriate arguments in
* handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}.
*
- *
Lines 3736 ~ 3740, Flink improves Optimize the retrieval of sub-operands in SqlCall when using
- * NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
+ *
Lines 3739 ~ 3743, 6333 ~ 6339, Flink improves validating the SqlCall that uses named
+ * parameters, rearrange the order of sub-operands, and fill in missing operands with the default
+ * node.
*
- *
Lines 5108 ~ 5121, Flink enables TIMESTAMP and TIMESTAMP_LTZ for system time period
+ *
Lines 5111 ~ 5124, Flink enables TIMESTAMP and TIMESTAMP_LTZ for system time period
* specification type at {@link org.apache.calcite.sql.validate.SqlValidatorImpl#validateSnapshot}.
*
- *
Lines 5465 ~ 5471, Flink enables TIMESTAMP and TIMESTAMP_LTZ for first orderBy column in
+ *
Lines 5468 ~ 5474, Flink enables TIMESTAMP and TIMESTAMP_LTZ for first orderBy column in
* matchRecognize at {@link SqlValidatorImpl#validateMatchRecognize}.
*/
public class SqlValidatorImpl implements SqlValidatorWithHints {
@@ -6327,8 +6330,13 @@ public RelDataType visit(SqlLiteral literal) {
@Override
public RelDataType visit(SqlCall call) {
+ // ----- FLINK MODIFICATION BEGIN -----
+ FlinkSqlCallBinding flinkSqlCallBinding =
+ new FlinkSqlCallBinding(this.scope.getValidator(), scope, call);
final SqlOperator operator = call.getOperator();
- return operator.deriveType(SqlValidatorImpl.this, scope, call);
+ return operator.deriveType(
+ SqlValidatorImpl.this, scope, flinkSqlCallBinding.permutedCall());
+ // ----- FLINK MODIFICATION END -----
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 0c5d7dda150e7..c100ccb62383e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -18,7 +18,7 @@
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
+import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;
import org.apache.flink.table.planner.calcite.TimestampSchemaVersion;
import org.apache.flink.table.planner.hint.ClearQueryHintsWithInvalidPropagationShuttle;
import org.apache.flink.table.planner.hint.FlinkHints;
@@ -235,14 +235,15 @@
*
FLINK modifications are at lines
*
*
- * - Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 654 ~ 671
- *
- Added in Flink-24024: Lines 1435 ~ 1445, Lines 1459 ~ 1501
- *
- Added in FLINK-28682: Lines 2323 ~ 2340
- *
- Added in FLINK-28682: Lines 2377 ~ 2405
- *
- Added in FLINK-32474: Lines 2875 ~ 2887
- *
- Added in FLINK-32474: Lines 2987 ~ 3021
- *
- Added in FLINK-20873: Lines 5519 ~ 5528
- *
- Added in FLINK-34057, FLINK-34058: Lines 6090 ~ 6116
+ *
- Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 655 ~ 673
+ *
- Added in Flink-24024: Lines 1437 ~ 1447, Lines 1461 ~ 1503
+ *
- Added in FLINK-28682: Lines 2325 ~ 2342
+ *
- Added in FLINK-28682: Lines 2379 ~ 2407
+ *
- Added in FLINK-32474: Lines 2877 ~ 2889
+ *
- Added in FLINK-32474: Lines 2989 ~ 3023
+ *
- Added in FLINK-20873: Lines 5521 ~ 5530
+ *
- Added in FLINK-34312: Lines 5641 ~ 5644
+ *
- Added in FLINK-34057, FLINK-34058, FLINK-34312: Lines 6093 ~ 6111
*
*/
@SuppressWarnings("UnstableApiUsage")
@@ -5637,8 +5638,10 @@ public RexNode visit(SqlCall call) {
() -> "agg.lookupAggregates for call " + call);
}
}
+ // ----- FLINK MODIFICATION BEGIN -----
return exprConverter.convertCall(
- this, new SqlCallBinding(validator(), scope, call).permutedCall());
+ this, new FlinkSqlCallBinding(validator(), scope, call).permutedCall());
+ // ----- FLINK MODIFICATION END -----
}
@Override
@@ -6088,11 +6091,9 @@ private void translateAgg(
// switch out of agg mode
bb.agg = null;
// ----- FLINK MODIFICATION BEGIN -----
- SqlCallBinding sqlCallBinding =
- new SqlCallBinding(validator(), aggregatingSelectScope, call);
- List sqlNodes = sqlCallBinding.operands();
- FlinkOperatorBinding flinkOperatorBinding =
- new FlinkOperatorBinding(sqlCallBinding);
+ FlinkSqlCallBinding binding =
+ new FlinkSqlCallBinding(validator(), aggregatingSelectScope, call);
+ List sqlNodes = binding.operands();
for (int i = 0; i < sqlNodes.size(); i++) {
SqlNode operand = sqlNodes.get(i);
// special case for COUNT(*): delete the *
@@ -6105,12 +6106,6 @@ private void translateAgg(
}
}
RexNode convertedExpr = bb.convertExpression(operand);
- if (convertedExpr.getKind() == SqlKind.DEFAULT) {
- RelDataType relDataType = flinkOperatorBinding.getOperandType(i);
- convertedExpr =
- ((RexCall) convertedExpr)
- .clone(relDataType, ((RexCall) convertedExpr).operands);
- }
args.add(lookupOrCreateGroupExpr(convertedExpr));
}
// ----- FLINK MODIFICATION END -----
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
index 0a22585ea5cd1..8f319dd1e232b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java
@@ -30,7 +30,6 @@
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlDataTypeSpec;
import org.apache.calcite.sql.SqlIntervalQualifier;
import org.apache.calcite.sql.SqlKind;
@@ -44,10 +43,8 @@
import org.apache.calcite.sql2rel.SqlRexConvertletTable;
import org.apache.calcite.sql2rel.StandardConvertletTable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -72,10 +69,6 @@ public SqlRexConvertlet get(SqlCall call) {
return this::convertSetSemanticsWindowTableFunction;
}
- if (isContainsDefaultNode(call)) {
- return this::convertSqlCallWithDefaultNode;
- }
-
return StandardConvertletTable.INSTANCE.get(call);
}
@@ -120,12 +113,6 @@ private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
return !operands.isEmpty() && operands.get(0).getKind() == SqlKind.SET_SEMANTICS_TABLE;
}
- private boolean isContainsDefaultNode(SqlCall call) {
- return call.getOperandList().stream()
- .filter(Objects::nonNull)
- .anyMatch(operand -> operand.getKind() == SqlKind.DEFAULT);
- }
-
/**
* Due to CALCITE-6204, we need to manually extract partition keys and order keys and convert
* them to {@link RexSetSemanticsTableCall}.
@@ -215,30 +202,4 @@ private static int parseFieldIdx(RexNode e) {
// should not happen
throw new TableException("Unsupported partition key with type: " + e.getKind());
}
-
- /**
- * When the SqlCall contains a default operator, the type of the default node to ANY after
- * converted to rel node. However, the ANY type cannot pass various checks well and cannot adapt
- * well to types in flink. Therefore, we replace the ANY type with the argument type obtained
- * from the operator.
- */
- private RexNode convertSqlCallWithDefaultNode(SqlRexContext cx, final SqlCall call) {
- RexNode rexCall = StandardConvertletTable.INSTANCE.convertCall(cx, call);
- SqlCallBinding sqlCallBinding = new SqlCallBinding(cx.getValidator(), null, call);
- FlinkOperatorBinding flinkOperatorBinding = new FlinkOperatorBinding(sqlCallBinding);
- if (rexCall instanceof RexCall) {
- List operands = new ArrayList<>(((RexCall) rexCall).operands);
- for (int i = 0; i < operands.size(); i++) {
- RexNode rexNode = operands.get(i);
- if (rexNode.getKind() == SqlKind.DEFAULT && rexNode instanceof RexCall) {
- RelDataType relDataType = flinkOperatorBinding.getOperandType(i);
- operands.set(
- i,
- ((RexCall) rexNode).clone(relDataType, ((RexCall) rexNode).operands));
- }
- }
- return ((RexCall) rexCall).clone(rexCall.getType(), operands);
- }
- return rexCall;
- }
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java
deleted file mode 100644
index 41e50d8739ac5..0000000000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.calcite;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rex.RexCallBinding;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.runtime.CalciteException;
-import org.apache.calcite.runtime.Resources;
-import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlOperatorBinding;
-import org.apache.calcite.sql.type.SqlOperandMetadata;
-import org.apache.calcite.sql.type.SqlOperandTypeChecker;
-import org.apache.calcite.sql.validate.SqlMonotonicity;
-import org.apache.calcite.sql.validate.SqlValidatorException;
-import org.checkerframework.checker.nullness.qual.Nullable;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * The proxy implementation of {@link SqlOperatorBinding} can be used to correct the operator type
- * when using named parameter.
- */
-public class FlinkOperatorBinding extends SqlOperatorBinding {
-
- private final SqlOperatorBinding sqlOperatorBinding;
-
- private final List argumentTypes;
-
- public FlinkOperatorBinding(SqlOperatorBinding sqlOperatorBinding) {
- super(sqlOperatorBinding.getTypeFactory(), sqlOperatorBinding.getOperator());
- this.sqlOperatorBinding = sqlOperatorBinding;
- this.argumentTypes = getArgumentTypes();
- }
-
- @Override
- public int getOperandCount() {
- if (!argumentTypes.isEmpty()) {
- return argumentTypes.size();
- } else {
- return sqlOperatorBinding.getOperandCount();
- }
- }
-
- @Override
- public RelDataType getOperandType(int ordinal) {
- if (sqlOperatorBinding instanceof SqlCallBinding) {
- SqlNode sqlNode = ((SqlCallBinding) sqlOperatorBinding).operands().get(ordinal);
- if (sqlNode.getKind() == SqlKind.DEFAULT && !argumentTypes.isEmpty()) {
- return argumentTypes.get(ordinal);
- } else {
- return ((SqlCallBinding) sqlOperatorBinding)
- .getValidator()
- .deriveType(((SqlCallBinding) sqlOperatorBinding).getScope(), sqlNode);
- }
- } else if (sqlOperatorBinding instanceof RexCallBinding) {
- RexNode rexNode = ((RexCallBinding) sqlOperatorBinding).operands().get(ordinal);
- if (rexNode.getKind() == SqlKind.DEFAULT && !argumentTypes.isEmpty()) {
- return argumentTypes.get(ordinal);
- } else {
- return rexNode.getType();
- }
- }
- return sqlOperatorBinding.getOperandType(ordinal);
- }
-
- @Override
- public CalciteException newError(Resources.ExInst e) {
- return sqlOperatorBinding.newError(e);
- }
-
- @Override
- public int getGroupCount() {
- return sqlOperatorBinding.getGroupCount();
- }
-
- @Override
- public boolean hasFilter() {
- return sqlOperatorBinding.hasFilter();
- }
-
- @Override
- public SqlOperator getOperator() {
- return sqlOperatorBinding.getOperator();
- }
-
- @Override
- public RelDataTypeFactory getTypeFactory() {
- return sqlOperatorBinding.getTypeFactory();
- }
-
- @Override
- public @Nullable String getStringLiteralOperand(int ordinal) {
- return sqlOperatorBinding.getStringLiteralOperand(ordinal);
- }
-
- @Override // to be removed before 2.0
- public int getIntLiteralOperand(int ordinal) {
- return sqlOperatorBinding.getIntLiteralOperand(ordinal);
- }
-
- @Override
- public boolean isOperandNull(int ordinal, boolean allowCast) {
- return sqlOperatorBinding.isOperandNull(ordinal, allowCast);
- }
-
- @Override
- public boolean isOperandLiteral(int ordinal, boolean allowCast) {
- return sqlOperatorBinding.isOperandLiteral(ordinal, allowCast);
- }
-
- @Override
- public @Nullable Object getOperandLiteralValue(int ordinal, RelDataType type) {
- return sqlOperatorBinding.getOperandLiteralValue(ordinal, type);
- }
-
- @Override
- public @Nullable Comparable getOperandLiteralValue(int ordinal) {
- return sqlOperatorBinding.getOperandLiteralValue(ordinal);
- }
-
- @Override
- public SqlMonotonicity getOperandMonotonicity(int ordinal) {
- return sqlOperatorBinding.getOperandMonotonicity(ordinal);
- }
-
- @Override
- public List collectOperandTypes() {
- return sqlOperatorBinding.collectOperandTypes();
- }
-
- @Override
- public @Nullable RelDataType getCursorOperand(int ordinal) {
- return sqlOperatorBinding.getCursorOperand(ordinal);
- }
-
- @Override
- public @Nullable String getColumnListParamInfo(
- int ordinal, String paramName, List columnList) {
- return sqlOperatorBinding.getColumnListParamInfo(ordinal, paramName, columnList);
- }
-
- @Override
- public @Nullable T getOperandLiteralValue(int ordinal, Class clazz) {
- return sqlOperatorBinding.getOperandLiteralValue(ordinal, clazz);
- }
-
- private List getArgumentTypes() {
- SqlOperandTypeChecker sqlOperandTypeChecker = getOperator().getOperandTypeChecker();
- if (sqlOperandTypeChecker != null
- && sqlOperandTypeChecker.isFixedParameters()
- && sqlOperandTypeChecker instanceof SqlOperandMetadata) {
- SqlOperandMetadata sqlOperandMetadata =
- ((SqlOperandMetadata) getOperator().getOperandTypeChecker());
- return sqlOperandMetadata.paramTypes(getTypeFactory());
- } else {
- return Collections.emptyList();
- }
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java
new file mode 100644
index 0000000000000..adb427b656df3
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.flink.table.planner.functions.sql.SqlDefaultOperator;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorNamespace;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Binding supports to rewrite the DEFAULT operator. */
+public class FlinkSqlCallBinding extends SqlCallBinding {
+
+ private final List fixArgumentTypes;
+
+ private final List rewrittenOperands;
+
+ public FlinkSqlCallBinding(
+ SqlValidator validator, @Nullable SqlValidatorScope scope, SqlCall call) {
+ super(validator, scope, call);
+ this.fixArgumentTypes = getFixArgumentTypes();
+ this.rewrittenOperands = getRewrittenOperands();
+ }
+
+ @Override
+ public int getOperandCount() {
+ return rewrittenOperands.size();
+ }
+
+ @Override
+ public List operands() {
+ if (isFixedParameters()) {
+ return rewrittenOperands;
+ } else {
+ return super.operands();
+ }
+ }
+
+ @Override
+ public RelDataType getOperandType(int ordinal) {
+ if (!isFixedParameters()) {
+ return super.getOperandType(ordinal);
+ }
+
+ SqlNode operand = rewrittenOperands.get(ordinal);
+ if (operand.getKind() == SqlKind.DEFAULT) {
+ return fixArgumentTypes.get(ordinal);
+ }
+
+ final RelDataType type = SqlTypeUtil.deriveType(this, operand);
+ final SqlValidatorNamespace namespace = getValidator().getNamespace(operand);
+ return namespace != null ? namespace.getType() : type;
+ }
+
+ public boolean isFixedParameters() {
+ return !fixArgumentTypes.isEmpty();
+ }
+
+ private List getFixArgumentTypes() {
+ SqlOperandTypeChecker sqlOperandTypeChecker = getOperator().getOperandTypeChecker();
+ if (sqlOperandTypeChecker instanceof SqlOperandMetadata
+ && sqlOperandTypeChecker.isFixedParameters()) {
+ return ((SqlOperandMetadata) sqlOperandTypeChecker).paramTypes(getTypeFactory());
+ }
+ return Collections.emptyList();
+ }
+
+ private List getRewrittenOperands() {
+ if (!isFixedParameters()) {
+ return super.operands();
+ }
+
+ List rewrittenOperands = new ArrayList<>();
+ for (SqlNode operand : super.operands()) {
+ if (operand instanceof SqlCall
+ && ((SqlCall) operand).getOperator() == SqlStdOperatorTable.DEFAULT) {
+ rewrittenOperands.add(
+ new SqlDefaultOperator(fixArgumentTypes.get(rewrittenOperands.size()))
+ .createCall(SqlParserPos.ZERO));
+ } else {
+ rewrittenOperands.add(operand);
+ }
+ }
+ return rewrittenOperands;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
index 65aefccad2e21..cc8825022b58b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java
@@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
@@ -32,7 +31,6 @@
import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.type.SqlTypeName;
@@ -53,23 +51,23 @@ public final class CallBindingCallContext extends AbstractSqlCallContext {
private final List argumentDataTypes;
- private final SqlOperatorBinding binding;
+ private final SqlCallBinding binding;
private final @Nullable DataType outputType;
public CallBindingCallContext(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
- SqlCallBinding sqlCallBinding,
+ SqlCallBinding binding,
@Nullable RelDataType outputType) {
super(
dataTypeFactory,
definition,
- sqlCallBinding.getOperator().getNameAsId().toString(),
- sqlCallBinding.getGroupCount() > 0);
+ binding.getOperator().getNameAsId().toString(),
+ binding.getGroupCount() > 0);
- this.adaptedArguments = sqlCallBinding.operands(); // reorders the operands
- this.binding = new FlinkOperatorBinding(sqlCallBinding);
+ this.adaptedArguments = binding.operands(); // reorders the operands
+ this.binding = binding;
this.argumentDataTypes =
new AbstractList() {
@Override
@@ -84,7 +82,7 @@ public int size() {
return binding.getOperandCount();
}
};
- this.outputType = convertOutputType(sqlCallBinding, outputType);
+ this.outputType = convertOutputType(binding, outputType);
}
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
index 9961354e3045e..2167203ea1c45 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java
@@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
@@ -52,15 +51,15 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext {
public OperatorBindingCallContext(
DataTypeFactory dataTypeFactory,
FunctionDefinition definition,
- SqlOperatorBinding sqlOperatorBinding,
+ SqlOperatorBinding binding,
RelDataType returnRelDataType) {
super(
dataTypeFactory,
definition,
- sqlOperatorBinding.getOperator().getNameAsId().toString(),
- sqlOperatorBinding.getGroupCount() > 0);
+ binding.getOperator().getNameAsId().toString(),
+ binding.getGroupCount() > 0);
- this.binding = new FlinkOperatorBinding(sqlOperatorBinding);
+ this.binding = binding;
this.argumentDataTypes =
new AbstractList() {
@Override
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index 628b6bb804f6d..a539e1f81b4b8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -35,7 +35,6 @@
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlPostfixOperator;
import org.apache.calcite.sql.SqlPrefixOperator;
-import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlSyntax;
import org.apache.calcite.sql.fun.SqlBasicAggFunction;
import org.apache.calcite.sql.fun.SqlLibraryOperators;
@@ -1307,9 +1306,4 @@ public List getAuxiliaryFunctions() {
.operandTypeChecker(OperandTypes.NILADIC)
.notDeterministic()
.build();
-
- // DEFAULT FUNCTION
- // The default operator is used to fill in missing parameters when using named parameter,
- // which is used during code generation and not exposed to the user by default.
- public static final SqlSpecialOperator DEFAULT = SqlStdOperatorTable.DEFAULT;
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java
new file mode 100644
index 0000000000000..05cf3c948ed82
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.type.InferTypes;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorScope;
+
+/** Default operator has specified type. */
+public class SqlDefaultOperator extends SqlSpecialOperator {
+
+ private final RelDataType returnType;
+
+ public SqlDefaultOperator(RelDataType returnType) {
+ super(
+ "DEFAULT",
+ SqlKind.DEFAULT,
+ 100,
+ true,
+ ReturnTypes.explicit(returnType),
+ InferTypes.RETURN_TYPE,
+ OperandTypes.NILADIC);
+ this.returnType = returnType;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
+ writer.keyword(getName());
+ }
+
+ @Override
+ public RelDataType deriveType(SqlValidator validator, SqlValidatorScope scope, SqlCall call) {
+ return returnType;
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
index 8205d52ff7e14..7791da5a6a692 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java
@@ -21,7 +21,7 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.planner.calcite.FlinkOperatorBinding;
+import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlProcedure;
import org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext;
import org.apache.flink.table.planner.operations.PlannerCallProcedureOperation;
@@ -31,18 +31,16 @@
import org.apache.flink.table.types.inference.TypeInferenceUtil;
import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.ExplicitOperatorBinding;
import org.apache.calcite.sql.SqlCall;
-import org.apache.calcite.sql.SqlCallBinding;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperatorBinding;
import org.apache.calcite.sql.validate.SqlValidator;
+import org.apache.calcite.sql.validate.SqlValidatorImpl;
-import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
@@ -71,8 +69,11 @@ public Operation convertSqlNode(SqlNode sqlNode, ConvertContext context) {
ProcedureDefinition procedureDefinition =
new ProcedureDefinition(sqlProcedure.getContextResolveProcedure().getProcedure());
- SqlCallBinding sqlCallBinding =
- new SqlCallBinding(context.getSqlValidator(), null, callProcedure);
+ FlinkSqlCallBinding sqlCallBinding =
+ new FlinkSqlCallBinding(
+ context.getSqlValidator(),
+ ((SqlValidatorImpl) context.getSqlValidator()).getEmptyScope(),
+ callProcedure);
List reducedOperands = reduceOperands(sqlCallBinding, context);
SqlOperatorBinding sqlOperatorBinding =
@@ -129,7 +130,8 @@ public Operation convertSqlNode(SqlNode sqlNode, ConvertContext context) {
typeInferResult.getOutputDataType());
}
- private List reduceOperands(SqlCallBinding sqlCallBinding, ConvertContext context) {
+ private List reduceOperands(
+ FlinkSqlCallBinding sqlCallBinding, ConvertContext context) {
// we don't really care about the input row type while converting to RexNode
// since call procedure shouldn't refer any inputs.
// so, construct an empty row for it.
@@ -137,22 +139,9 @@ private List reduceOperands(SqlCallBinding sqlCallBinding, ConvertConte
toRelDataType(
DataTypes.ROW().getLogicalType(),
context.getSqlValidator().getTypeFactory());
- List rexNodes = new ArrayList<>();
- List operands = sqlCallBinding.operands();
- FlinkOperatorBinding flinkOperatorBinding = new FlinkOperatorBinding(sqlCallBinding);
- for (int i = 0; i < operands.size(); i++) {
- RexNode rexNode = context.toRexNode(operands.get(i), inputRowType, null);
- if (rexNode.getKind() == SqlKind.DEFAULT) {
- rexNodes.add(
- ((RexCall) rexNode)
- .clone(
- flinkOperatorBinding.getOperandType(i),
- ((RexCall) rexNode).operands));
- } else {
- rexNodes.add(rexNode);
- }
- }
- rexNodes = context.reduceRexNodes(rexNodes);
- return rexNodes;
+ return context.reduceRexNodes(
+ sqlCallBinding.operands().stream()
+ .map(node -> context.toRexNode(node, inputRowType, null))
+ .collect(Collectors.toList()));
}
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
index 9352a8388ef50..75f25eb610cf0 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils._
import org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateNonNullField, generateNullLiteral, generateStringResultCallIfArgsNotNull}
import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._
+import org.apache.flink.table.planner.functions.sql.SqlDefaultOperator
import org.apache.flink.table.runtime.functions.SqlFunctionUtils
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isCharacterString, isTimestamp, isTimestampWithLocalZone}
import org.apache.flink.table.types.logical._
@@ -237,10 +238,13 @@ object StringCallGen {
val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase()
generateNonNullField(returnType, currentDatabase)
- case DEFAULT =>
- generateNullLiteral(returnType)
-
- case _ => null
+ case op => {
+ if (op.isInstanceOf[SqlDefaultOperator]) {
+ generateNullLiteral(returnType)
+ } else {
+ null
+ }
+ }
}
Option(generator)