Skip to content

Commit

Permalink
[FLINK-34476][table-planner] Consider assignment operator during TVF …
Browse files Browse the repository at this point in the history
…column expansion
  • Loading branch information
twalthr committed Feb 22, 2024
1 parent d19e886 commit f21ee01
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.calcite.sql.JoinType;
import org.apache.calcite.sql.SqlAsOperator;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlIdentifier;
Expand Down Expand Up @@ -85,6 +86,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.calcite.sql.type.SqlTypeName.DECIMAL;
import static org.apache.flink.table.expressions.resolver.lookups.FieldReferenceLookup.includeExpandedColumn;
Expand Down Expand Up @@ -363,20 +365,23 @@ protected void addToSelectList(

final List<SqlIdentifier> descriptors =
call.getOperandList().stream()
.filter(op -> op.getKind() == SqlKind.DESCRIPTOR)
.flatMap(
desc ->
((SqlBasicCall) desc)
.getOperandList().stream()
.filter(SqlIdentifier.class::isInstance)
.map(SqlIdentifier.class::cast))
.flatMap(FlinkCalciteSqlValidator::extractDescriptors)
.collect(Collectors.toList());

for (int i = 0; i < call.operandCount(); i++) {
final SqlIdentifier tableArg = explicitTableArgs.get(i);
if (tableArg != null) {
call.setOperand(i, new ExplicitTableSqlSelect(tableArg, descriptors));
final SqlNode opReplacement = new ExplicitTableSqlSelect(tableArg, descriptors);
if (call.operand(i).getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
// for TUMBLE(DATA => TABLE t3, ...)
final SqlCall assignment = call.operand(i);
assignment.setOperand(0, opReplacement);
} else {
// for TUMBLE(TABLE t3, ...)
call.setOperand(i, opReplacement);
}
}
// for TUMBLE([DATA =>] SELECT ..., ...)
}
}

Expand Down Expand Up @@ -447,20 +452,40 @@ private static List<SqlIdentifier> getExplicitTableOperands(SqlNode node) {
}

return call.getOperandList().stream()
.map(
op -> {
if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
final SqlBasicCall opCall = (SqlBasicCall) op;
if (opCall.operandCount() == 1
&& opCall.operand(0) instanceof SqlIdentifier) {
return (SqlIdentifier) opCall.operand(0);
}
}
return null;
})
.map(FlinkCalciteSqlValidator::extractExplicitTable)
.collect(Collectors.toList());
}

private static @Nullable SqlIdentifier extractExplicitTable(SqlNode op) {
if (op.getKind() == SqlKind.EXPLICIT_TABLE) {
final SqlBasicCall opCall = (SqlBasicCall) op;
if (opCall.operandCount() == 1 && opCall.operand(0) instanceof SqlIdentifier) {
// for TUMBLE(TABLE t3, ...)
return opCall.operand(0);
}
} else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
// for TUMBLE(DATA => TABLE t3, ...)
final SqlBasicCall opCall = (SqlBasicCall) op;
return extractExplicitTable(opCall.operand(0));
}
return null;
}

private static Stream<SqlIdentifier> extractDescriptors(SqlNode op) {
if (op.getKind() == SqlKind.DESCRIPTOR) {
// for TUMBLE(..., DESCRIPTOR(col), ...)
final SqlBasicCall opCall = (SqlBasicCall) op;
return opCall.getOperandList().stream()
.filter(SqlIdentifier.class::isInstance)
.map(SqlIdentifier.class::cast);
} else if (op.getKind() == SqlKind.ARGUMENT_ASSIGNMENT) {
// for TUMBLE(..., TIMECOL => DESCRIPTOR(col), ...)
final SqlBasicCall opCall = (SqlBasicCall) op;
return extractDescriptors(opCall.operand(0));
}
return Stream.empty();
}

private static boolean isTableFunction(SqlFunction function) {
return function instanceof SqlTableFunction
|| function.getFunctionType() == SqlFunctionCategory.USER_DEFINED_TABLE_FUNCTION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,32 @@ private void assertColumnNames(String sql, String... columnNames) {
assertThat(tableEnv.sqlQuery(sql).getResolvedSchema().getColumnNames())
.containsExactly(columnNames);
}

@Test
void testExplicitTableWithinTableFunctionWithNamedArgs() {
tableEnv.getConfig()
.set(
TABLE_COLUMN_EXPANSION_STRATEGY,
Collections.singletonList(EXCLUDE_DEFAULT_VIRTUAL_METADATA_COLUMNS));

// t3_m_virtual is selected due to expansion of the explicit table expression
// with hints from descriptor
assertColumnNames(
"SELECT * FROM TABLE("
+ "TUMBLE(DATA => TABLE t3, TIMECOL => DESCRIPTOR(t3_m_virtual), SIZE => INTERVAL '1' MINUTE))",
"t3_s",
"t3_i",
"t3_m_virtual",
"window_start",
"window_end",
"window_time");

// Test common window TVF syntax
assertColumnNames(
"SELECT t3_s, SUM(t3_i) AS agg "
+ "FROM TABLE(TUMBLE(DATA => TABLE t3, TIMECOL => DESCRIPTOR(t3_m_virtual), SIZE => INTERVAL '1' MINUTE)) "
+ "GROUP BY t3_s, window_start, window_end",
"t3_s",
"agg");
}
}

0 comments on commit f21ee01

Please sign in to comment.