From 1070c6e9e0f9f00991bdeb34f0757e4f0597931e Mon Sep 17 00:00:00 2001 From: Feng Jin Date: Mon, 26 Feb 2024 19:27:18 +0800 Subject: [PATCH] [FLINK-34312][table] Improve the handling of default node types for named parameters (#24235) Co-authored-by: Shengkai <1059623455@qq.com> --- .../sql/validate/ProcedureNamespace.java | 5 +- .../sql/validate/SqlValidatorImpl.java | 20 +- .../calcite/sql2rel/SqlToRelConverter.java | 37 ++-- .../planner/calcite/FlinkConvertletTable.java | 39 ---- .../planner/calcite/FlinkOperatorBinding.java | 181 ------------------ .../planner/calcite/FlinkSqlCallBinding.java | 117 +++++++++++ .../inference/CallBindingCallContext.java | 16 +- .../inference/OperatorBindingCallContext.java | 9 +- .../functions/sql/FlinkSqlOperatorTable.java | 6 - .../functions/sql/SqlDefaultOperator.java | 58 ++++++ .../converters/SqlProcedureCallConverter.java | 37 ++-- .../planner/codegen/calls/StringCallGen.scala | 12 +- 12 files changed, 240 insertions(+), 297 deletions(-) delete mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java index 22e93800aeb9a..9bd055f75d23c 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/ProcedureNamespace.java @@ -17,6 +17,7 @@ package org.apache.calcite.sql.validate; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlCall; @@ -56,9 +57,9 @@ public final class ProcedureNamespace extends AbstractNamespace { public RelDataType validateImpl(RelDataType targetRowType) { validator.inferUnknownTypes(validator.unknownType, scope, call); // The result is ignored but the type is derived to trigger the validation - validator.deriveTypeImpl(scope, call); + final SqlCallBinding callBinding = new FlinkSqlCallBinding(validator, scope, call); + validator.deriveTypeImpl(scope, callBinding.permutedCall()); final SqlOperator operator = call.getOperator(); - final SqlCallBinding callBinding = new SqlCallBinding(validator, scope, call); if (!(operator instanceof SqlTableFunction)) { throw new IllegalArgumentException( "Argument must be a table function: " + operator.getNameAsId()); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java index 3322f4f458a99..5d2bfbd74195d 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java @@ -16,6 +16,8 @@ */ package org.apache.calcite.sql.validate; +import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -158,16 +160,17 @@ * Default implementation of {@link SqlValidator}, the class was copied over because of * CALCITE-4554. * - *

Lines 1958 ~ 1978, Flink improves error message for functions without appropriate arguments in + *

Lines 1961 ~ 1981, Flink improves error message for functions without appropriate arguments in * handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}. * - *

Lines 3736 ~ 3740, Flink improves Optimize the retrieval of sub-operands in SqlCall when using - * NamedParameters at {@link SqlValidatorImpl#checkRollUp}. + *

Lines 3739 ~ 3743, 6333 ~ 6339, Flink improves validating the SqlCall that uses named + * parameters, rearrange the order of sub-operands, and fill in missing operands with the default + * node. * - *

Lines 5108 ~ 5121, Flink enables TIMESTAMP and TIMESTAMP_LTZ for system time period + *

Lines 5111 ~ 5124, Flink enables TIMESTAMP and TIMESTAMP_LTZ for system time period * specification type at {@link org.apache.calcite.sql.validate.SqlValidatorImpl#validateSnapshot}. * - *

Lines 5465 ~ 5471, Flink enables TIMESTAMP and TIMESTAMP_LTZ for first orderBy column in + *

Lines 5468 ~ 5474, Flink enables TIMESTAMP and TIMESTAMP_LTZ for first orderBy column in * matchRecognize at {@link SqlValidatorImpl#validateMatchRecognize}. */ public class SqlValidatorImpl implements SqlValidatorWithHints { @@ -6327,8 +6330,13 @@ public RelDataType visit(SqlLiteral literal) { @Override public RelDataType visit(SqlCall call) { + // ----- FLINK MODIFICATION BEGIN ----- + FlinkSqlCallBinding flinkSqlCallBinding = + new FlinkSqlCallBinding(this.scope.getValidator(), scope, call); final SqlOperator operator = call.getOperator(); - return operator.deriveType(SqlValidatorImpl.this, scope, call); + return operator.deriveType( + SqlValidatorImpl.this, scope, flinkSqlCallBinding.permutedCall()); + // ----- FLINK MODIFICATION END ----- } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java index 0c5d7dda150e7..c100ccb62383e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java @@ -18,7 +18,7 @@ import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.data.TimestampData; -import org.apache.flink.table.planner.calcite.FlinkOperatorBinding; +import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding; import org.apache.flink.table.planner.calcite.TimestampSchemaVersion; import org.apache.flink.table.planner.hint.ClearQueryHintsWithInvalidPropagationShuttle; import org.apache.flink.table.planner.hint.FlinkHints; @@ -235,14 +235,15 @@ *

FLINK modifications are at lines * *

    - *
  1. Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 654 ~ 671 - *
  2. Added in Flink-24024: Lines 1435 ~ 1445, Lines 1459 ~ 1501 - *
  3. Added in FLINK-28682: Lines 2323 ~ 2340 - *
  4. Added in FLINK-28682: Lines 2377 ~ 2405 - *
  5. Added in FLINK-32474: Lines 2875 ~ 2887 - *
  6. Added in FLINK-32474: Lines 2987 ~ 3021 - *
  7. Added in FLINK-20873: Lines 5519 ~ 5528 - *
  8. Added in FLINK-34057, FLINK-34058: Lines 6090 ~ 6116 + *
  9. Added in FLINK-29081, FLINK-28682, FLINK-33395: Lines 655 ~ 673 + *
  10. Added in Flink-24024: Lines 1437 ~ 1447, Lines 1461 ~ 1503 + *
  11. Added in FLINK-28682: Lines 2325 ~ 2342 + *
  12. Added in FLINK-28682: Lines 2379 ~ 2407 + *
  13. Added in FLINK-32474: Lines 2877 ~ 2889 + *
  14. Added in FLINK-32474: Lines 2989 ~ 3023 + *
  15. Added in FLINK-20873: Lines 5521 ~ 5530 + *
  16. Added in FLINK-34312: Lines 5641 ~ 5644 + *
  17. Added in FLINK-34057, FLINK-34058, FLINK-34312: Lines 6093 ~ 6111 *
*/ @SuppressWarnings("UnstableApiUsage") @@ -5637,8 +5638,10 @@ public RexNode visit(SqlCall call) { () -> "agg.lookupAggregates for call " + call); } } + // ----- FLINK MODIFICATION BEGIN ----- return exprConverter.convertCall( - this, new SqlCallBinding(validator(), scope, call).permutedCall()); + this, new FlinkSqlCallBinding(validator(), scope, call).permutedCall()); + // ----- FLINK MODIFICATION END ----- } @Override @@ -6088,11 +6091,9 @@ private void translateAgg( // switch out of agg mode bb.agg = null; // ----- FLINK MODIFICATION BEGIN ----- - SqlCallBinding sqlCallBinding = - new SqlCallBinding(validator(), aggregatingSelectScope, call); - List sqlNodes = sqlCallBinding.operands(); - FlinkOperatorBinding flinkOperatorBinding = - new FlinkOperatorBinding(sqlCallBinding); + FlinkSqlCallBinding binding = + new FlinkSqlCallBinding(validator(), aggregatingSelectScope, call); + List sqlNodes = binding.operands(); for (int i = 0; i < sqlNodes.size(); i++) { SqlNode operand = sqlNodes.get(i); // special case for COUNT(*): delete the * @@ -6105,12 +6106,6 @@ private void translateAgg( } } RexNode convertedExpr = bb.convertExpression(operand); - if (convertedExpr.getKind() == SqlKind.DEFAULT) { - RelDataType relDataType = flinkOperatorBinding.getOperandType(i); - convertedExpr = - ((RexCall) convertedExpr) - .clone(relDataType, ((RexCall) convertedExpr).operands); - } args.add(lookupOrCreateGroupExpr(convertedExpr)); } // ----- FLINK MODIFICATION END ----- diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java index 0a22585ea5cd1..8f319dd1e232b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java @@ -30,7 +30,6 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlCallBinding; import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIntervalQualifier; import org.apache.calcite.sql.SqlKind; @@ -44,10 +43,8 @@ import org.apache.calcite.sql2rel.SqlRexConvertletTable; import org.apache.calcite.sql2rel.StandardConvertletTable; -import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Objects; import static org.apache.flink.util.Preconditions.checkArgument; @@ -72,10 +69,6 @@ public SqlRexConvertlet get(SqlCall call) { return this::convertSetSemanticsWindowTableFunction; } - if (isContainsDefaultNode(call)) { - return this::convertSqlCallWithDefaultNode; - } - return StandardConvertletTable.INSTANCE.get(call); } @@ -120,12 +113,6 @@ private boolean isSetSemanticsWindowTableFunction(SqlCall call) { return !operands.isEmpty() && operands.get(0).getKind() == SqlKind.SET_SEMANTICS_TABLE; } - private boolean isContainsDefaultNode(SqlCall call) { - return call.getOperandList().stream() - .filter(Objects::nonNull) - .anyMatch(operand -> operand.getKind() == SqlKind.DEFAULT); - } - /** * Due to CALCITE-6204, we need to manually extract partition keys and order keys and convert * them to {@link RexSetSemanticsTableCall}. @@ -215,30 +202,4 @@ private static int parseFieldIdx(RexNode e) { // should not happen throw new TableException("Unsupported partition key with type: " + e.getKind()); } - - /** - * When the SqlCall contains a default operator, the type of the default node to ANY after - * converted to rel node. However, the ANY type cannot pass various checks well and cannot adapt - * well to types in flink. Therefore, we replace the ANY type with the argument type obtained - * from the operator. - */ - private RexNode convertSqlCallWithDefaultNode(SqlRexContext cx, final SqlCall call) { - RexNode rexCall = StandardConvertletTable.INSTANCE.convertCall(cx, call); - SqlCallBinding sqlCallBinding = new SqlCallBinding(cx.getValidator(), null, call); - FlinkOperatorBinding flinkOperatorBinding = new FlinkOperatorBinding(sqlCallBinding); - if (rexCall instanceof RexCall) { - List operands = new ArrayList<>(((RexCall) rexCall).operands); - for (int i = 0; i < operands.size(); i++) { - RexNode rexNode = operands.get(i); - if (rexNode.getKind() == SqlKind.DEFAULT && rexNode instanceof RexCall) { - RelDataType relDataType = flinkOperatorBinding.getOperandType(i); - operands.set( - i, - ((RexCall) rexNode).clone(relDataType, ((RexCall) rexNode).operands)); - } - } - return ((RexCall) rexCall).clone(rexCall.getType(), operands); - } - return rexCall; - } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java deleted file mode 100644 index 41e50d8739ac5..0000000000000 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkOperatorBinding.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.calcite; - -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rex.RexCallBinding; -import org.apache.calcite.rex.RexNode; -import org.apache.calcite.runtime.CalciteException; -import org.apache.calcite.runtime.Resources; -import org.apache.calcite.sql.SqlCallBinding; -import org.apache.calcite.sql.SqlKind; -import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlOperator; -import org.apache.calcite.sql.SqlOperatorBinding; -import org.apache.calcite.sql.type.SqlOperandMetadata; -import org.apache.calcite.sql.type.SqlOperandTypeChecker; -import org.apache.calcite.sql.validate.SqlMonotonicity; -import org.apache.calcite.sql.validate.SqlValidatorException; -import org.checkerframework.checker.nullness.qual.Nullable; - -import java.util.Collections; -import java.util.List; - -/** - * The proxy implementation of {@link SqlOperatorBinding} can be used to correct the operator type - * when using named parameter. - */ -public class FlinkOperatorBinding extends SqlOperatorBinding { - - private final SqlOperatorBinding sqlOperatorBinding; - - private final List argumentTypes; - - public FlinkOperatorBinding(SqlOperatorBinding sqlOperatorBinding) { - super(sqlOperatorBinding.getTypeFactory(), sqlOperatorBinding.getOperator()); - this.sqlOperatorBinding = sqlOperatorBinding; - this.argumentTypes = getArgumentTypes(); - } - - @Override - public int getOperandCount() { - if (!argumentTypes.isEmpty()) { - return argumentTypes.size(); - } else { - return sqlOperatorBinding.getOperandCount(); - } - } - - @Override - public RelDataType getOperandType(int ordinal) { - if (sqlOperatorBinding instanceof SqlCallBinding) { - SqlNode sqlNode = ((SqlCallBinding) sqlOperatorBinding).operands().get(ordinal); - if (sqlNode.getKind() == SqlKind.DEFAULT && !argumentTypes.isEmpty()) { - return argumentTypes.get(ordinal); - } else { - return ((SqlCallBinding) sqlOperatorBinding) - .getValidator() - .deriveType(((SqlCallBinding) sqlOperatorBinding).getScope(), sqlNode); - } - } else if (sqlOperatorBinding instanceof RexCallBinding) { - RexNode rexNode = ((RexCallBinding) sqlOperatorBinding).operands().get(ordinal); - if (rexNode.getKind() == SqlKind.DEFAULT && !argumentTypes.isEmpty()) { - return argumentTypes.get(ordinal); - } else { - return rexNode.getType(); - } - } - return sqlOperatorBinding.getOperandType(ordinal); - } - - @Override - public CalciteException newError(Resources.ExInst e) { - return sqlOperatorBinding.newError(e); - } - - @Override - public int getGroupCount() { - return sqlOperatorBinding.getGroupCount(); - } - - @Override - public boolean hasFilter() { - return sqlOperatorBinding.hasFilter(); - } - - @Override - public SqlOperator getOperator() { - return sqlOperatorBinding.getOperator(); - } - - @Override - public RelDataTypeFactory getTypeFactory() { - return sqlOperatorBinding.getTypeFactory(); - } - - @Override - public @Nullable String getStringLiteralOperand(int ordinal) { - return sqlOperatorBinding.getStringLiteralOperand(ordinal); - } - - @Override // to be removed before 2.0 - public int getIntLiteralOperand(int ordinal) { - return sqlOperatorBinding.getIntLiteralOperand(ordinal); - } - - @Override - public boolean isOperandNull(int ordinal, boolean allowCast) { - return sqlOperatorBinding.isOperandNull(ordinal, allowCast); - } - - @Override - public boolean isOperandLiteral(int ordinal, boolean allowCast) { - return sqlOperatorBinding.isOperandLiteral(ordinal, allowCast); - } - - @Override - public @Nullable Object getOperandLiteralValue(int ordinal, RelDataType type) { - return sqlOperatorBinding.getOperandLiteralValue(ordinal, type); - } - - @Override - public @Nullable Comparable getOperandLiteralValue(int ordinal) { - return sqlOperatorBinding.getOperandLiteralValue(ordinal); - } - - @Override - public SqlMonotonicity getOperandMonotonicity(int ordinal) { - return sqlOperatorBinding.getOperandMonotonicity(ordinal); - } - - @Override - public List collectOperandTypes() { - return sqlOperatorBinding.collectOperandTypes(); - } - - @Override - public @Nullable RelDataType getCursorOperand(int ordinal) { - return sqlOperatorBinding.getCursorOperand(ordinal); - } - - @Override - public @Nullable String getColumnListParamInfo( - int ordinal, String paramName, List columnList) { - return sqlOperatorBinding.getColumnListParamInfo(ordinal, paramName, columnList); - } - - @Override - public @Nullable T getOperandLiteralValue(int ordinal, Class clazz) { - return sqlOperatorBinding.getOperandLiteralValue(ordinal, clazz); - } - - private List getArgumentTypes() { - SqlOperandTypeChecker sqlOperandTypeChecker = getOperator().getOperandTypeChecker(); - if (sqlOperandTypeChecker != null - && sqlOperandTypeChecker.isFixedParameters() - && sqlOperandTypeChecker instanceof SqlOperandMetadata) { - SqlOperandMetadata sqlOperandMetadata = - ((SqlOperandMetadata) getOperator().getOperandTypeChecker()); - return sqlOperandMetadata.paramTypes(getTypeFactory()); - } else { - return Collections.emptyList(); - } - } -} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java new file mode 100644 index 0000000000000..adb427b656df3 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkSqlCallBinding.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.flink.table.planner.functions.sql.SqlDefaultOperator; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.type.SqlOperandMetadata; +import org.apache.calcite.sql.type.SqlOperandTypeChecker; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorNamespace; +import org.apache.calcite.sql.validate.SqlValidatorScope; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** Binding supports to rewrite the DEFAULT operator. */ +public class FlinkSqlCallBinding extends SqlCallBinding { + + private final List fixArgumentTypes; + + private final List rewrittenOperands; + + public FlinkSqlCallBinding( + SqlValidator validator, @Nullable SqlValidatorScope scope, SqlCall call) { + super(validator, scope, call); + this.fixArgumentTypes = getFixArgumentTypes(); + this.rewrittenOperands = getRewrittenOperands(); + } + + @Override + public int getOperandCount() { + return rewrittenOperands.size(); + } + + @Override + public List operands() { + if (isFixedParameters()) { + return rewrittenOperands; + } else { + return super.operands(); + } + } + + @Override + public RelDataType getOperandType(int ordinal) { + if (!isFixedParameters()) { + return super.getOperandType(ordinal); + } + + SqlNode operand = rewrittenOperands.get(ordinal); + if (operand.getKind() == SqlKind.DEFAULT) { + return fixArgumentTypes.get(ordinal); + } + + final RelDataType type = SqlTypeUtil.deriveType(this, operand); + final SqlValidatorNamespace namespace = getValidator().getNamespace(operand); + return namespace != null ? namespace.getType() : type; + } + + public boolean isFixedParameters() { + return !fixArgumentTypes.isEmpty(); + } + + private List getFixArgumentTypes() { + SqlOperandTypeChecker sqlOperandTypeChecker = getOperator().getOperandTypeChecker(); + if (sqlOperandTypeChecker instanceof SqlOperandMetadata + && sqlOperandTypeChecker.isFixedParameters()) { + return ((SqlOperandMetadata) sqlOperandTypeChecker).paramTypes(getTypeFactory()); + } + return Collections.emptyList(); + } + + private List getRewrittenOperands() { + if (!isFixedParameters()) { + return super.operands(); + } + + List rewrittenOperands = new ArrayList<>(); + for (SqlNode operand : super.operands()) { + if (operand instanceof SqlCall + && ((SqlCall) operand).getOperator() == SqlStdOperatorTable.DEFAULT) { + rewrittenOperands.add( + new SqlDefaultOperator(fixArgumentTypes.get(rewrittenOperands.size())) + .createCall(SqlParserPos.ZERO)); + } else { + rewrittenOperands.add(operand); + } + } + return rewrittenOperands; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java index 65aefccad2e21..cc8825022b58b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; -import org.apache.flink.table.planner.calcite.FlinkOperatorBinding; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; @@ -32,7 +31,6 @@ import org.apache.calcite.sql.SqlCallBinding; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; -import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.SqlUtil; import org.apache.calcite.sql.type.SqlTypeName; @@ -53,23 +51,23 @@ public final class CallBindingCallContext extends AbstractSqlCallContext { private final List argumentDataTypes; - private final SqlOperatorBinding binding; + private final SqlCallBinding binding; private final @Nullable DataType outputType; public CallBindingCallContext( DataTypeFactory dataTypeFactory, FunctionDefinition definition, - SqlCallBinding sqlCallBinding, + SqlCallBinding binding, @Nullable RelDataType outputType) { super( dataTypeFactory, definition, - sqlCallBinding.getOperator().getNameAsId().toString(), - sqlCallBinding.getGroupCount() > 0); + binding.getOperator().getNameAsId().toString(), + binding.getGroupCount() > 0); - this.adaptedArguments = sqlCallBinding.operands(); // reorders the operands - this.binding = new FlinkOperatorBinding(sqlCallBinding); + this.adaptedArguments = binding.operands(); // reorders the operands + this.binding = binding; this.argumentDataTypes = new AbstractList() { @Override @@ -84,7 +82,7 @@ public int size() { return binding.getOperandCount(); } }; - this.outputType = convertOutputType(sqlCallBinding, outputType); + this.outputType = convertOutputType(binding, outputType); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java index 9961354e3045e..2167203ea1c45 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.FunctionDefinition; -import org.apache.flink.table.planner.calcite.FlinkOperatorBinding; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; @@ -52,15 +51,15 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { public OperatorBindingCallContext( DataTypeFactory dataTypeFactory, FunctionDefinition definition, - SqlOperatorBinding sqlOperatorBinding, + SqlOperatorBinding binding, RelDataType returnRelDataType) { super( dataTypeFactory, definition, - sqlOperatorBinding.getOperator().getNameAsId().toString(), - sqlOperatorBinding.getGroupCount() > 0); + binding.getOperator().getNameAsId().toString(), + binding.getGroupCount() > 0); - this.binding = new FlinkOperatorBinding(sqlOperatorBinding); + this.binding = binding; this.argumentDataTypes = new AbstractList() { @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 628b6bb804f6d..a539e1f81b4b8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -35,7 +35,6 @@ import org.apache.calcite.sql.SqlOperator; import org.apache.calcite.sql.SqlPostfixOperator; import org.apache.calcite.sql.SqlPrefixOperator; -import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlSyntax; import org.apache.calcite.sql.fun.SqlBasicAggFunction; import org.apache.calcite.sql.fun.SqlLibraryOperators; @@ -1307,9 +1306,4 @@ public List getAuxiliaryFunctions() { .operandTypeChecker(OperandTypes.NILADIC) .notDeterministic() .build(); - - // DEFAULT FUNCTION - // The default operator is used to fill in missing parameters when using named parameter, - // which is used during code generation and not exposed to the user by default. - public static final SqlSpecialOperator DEFAULT = SqlStdOperatorTable.DEFAULT; } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java new file mode 100644 index 0000000000000..05cf3c948ed82 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlDefaultOperator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.type.InferTypes; +import org.apache.calcite.sql.type.OperandTypes; +import org.apache.calcite.sql.type.ReturnTypes; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorScope; + +/** Default operator has specified type. */ +public class SqlDefaultOperator extends SqlSpecialOperator { + + private final RelDataType returnType; + + public SqlDefaultOperator(RelDataType returnType) { + super( + "DEFAULT", + SqlKind.DEFAULT, + 100, + true, + ReturnTypes.explicit(returnType), + InferTypes.RETURN_TYPE, + OperandTypes.NILADIC); + this.returnType = returnType; + } + + @Override + public void unparse(SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) { + writer.keyword(getName()); + } + + @Override + public RelDataType deriveType(SqlValidator validator, SqlValidatorScope scope, SqlCall call) { + return returnType; + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java index 8205d52ff7e14..7791da5a6a692 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java @@ -21,7 +21,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.operations.Operation; -import org.apache.flink.table.planner.calcite.FlinkOperatorBinding; +import org.apache.flink.table.planner.calcite.FlinkSqlCallBinding; import org.apache.flink.table.planner.functions.bridging.BridgingSqlProcedure; import org.apache.flink.table.planner.functions.inference.OperatorBindingCallContext; import org.apache.flink.table.planner.operations.PlannerCallProcedureOperation; @@ -31,18 +31,16 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil; import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; import org.apache.calcite.sql.ExplicitOperatorBinding; import org.apache.calcite.sql.SqlCall; -import org.apache.calcite.sql.SqlCallBinding; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlOperatorBinding; import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorImpl; -import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Optional; @@ -71,8 +69,11 @@ public Operation convertSqlNode(SqlNode sqlNode, ConvertContext context) { ProcedureDefinition procedureDefinition = new ProcedureDefinition(sqlProcedure.getContextResolveProcedure().getProcedure()); - SqlCallBinding sqlCallBinding = - new SqlCallBinding(context.getSqlValidator(), null, callProcedure); + FlinkSqlCallBinding sqlCallBinding = + new FlinkSqlCallBinding( + context.getSqlValidator(), + ((SqlValidatorImpl) context.getSqlValidator()).getEmptyScope(), + callProcedure); List reducedOperands = reduceOperands(sqlCallBinding, context); SqlOperatorBinding sqlOperatorBinding = @@ -129,7 +130,8 @@ public Operation convertSqlNode(SqlNode sqlNode, ConvertContext context) { typeInferResult.getOutputDataType()); } - private List reduceOperands(SqlCallBinding sqlCallBinding, ConvertContext context) { + private List reduceOperands( + FlinkSqlCallBinding sqlCallBinding, ConvertContext context) { // we don't really care about the input row type while converting to RexNode // since call procedure shouldn't refer any inputs. // so, construct an empty row for it. @@ -137,22 +139,9 @@ private List reduceOperands(SqlCallBinding sqlCallBinding, ConvertConte toRelDataType( DataTypes.ROW().getLogicalType(), context.getSqlValidator().getTypeFactory()); - List rexNodes = new ArrayList<>(); - List operands = sqlCallBinding.operands(); - FlinkOperatorBinding flinkOperatorBinding = new FlinkOperatorBinding(sqlCallBinding); - for (int i = 0; i < operands.size(); i++) { - RexNode rexNode = context.toRexNode(operands.get(i), inputRowType, null); - if (rexNode.getKind() == SqlKind.DEFAULT) { - rexNodes.add( - ((RexCall) rexNode) - .clone( - flinkOperatorBinding.getOperandType(i), - ((RexCall) rexNode).operands)); - } else { - rexNodes.add(rexNode); - } - } - rexNodes = context.reduceRexNodes(rexNodes); - return rexNodes; + return context.reduceRexNodes( + sqlCallBinding.operands().stream() + .map(node -> context.toRexNode(node, inputRowType, null)) + .collect(Collectors.toList())); } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala index 9352a8388ef50..75f25eb610cf0 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala @@ -24,6 +24,7 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils._ import org.apache.flink.table.planner.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateNonNullField, generateNullLiteral, generateStringResultCallIfArgsNotNull} import org.apache.flink.table.planner.codegen.calls.ScalarOperatorGens._ import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable._ +import org.apache.flink.table.planner.functions.sql.SqlDefaultOperator import org.apache.flink.table.runtime.functions.SqlFunctionUtils import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isCharacterString, isTimestamp, isTimestampWithLocalZone} import org.apache.flink.table.types.logical._ @@ -237,10 +238,13 @@ object StringCallGen { val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase() generateNonNullField(returnType, currentDatabase) - case DEFAULT => - generateNullLiteral(returnType) - - case _ => null + case op => { + if (op.isInstanceOf[SqlDefaultOperator]) { + generateNullLiteral(returnType) + } else { + null + } + } } Option(generator)