Skip to content

Commit

Permalink
[FLINK-32261][table] Add built-in MAP_UNION function
Browse files Browse the repository at this point in the history
  • Loading branch information
hanyuzheng7 authored Mar 11, 2024
1 parent 5f38a57 commit c2a6fee
Show file tree
Hide file tree
Showing 14 changed files with 676 additions and 139 deletions.
3 changes: 3 additions & 0 deletions docs/data/sql_functions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,9 @@ collection:
- sql: MAP_KEYS(map)
table: MAP.mapKeys()
description: Returns the keys of the map as array. No order guaranteed.
- sql: MAP_UNION(map1, ...)
table: map1.mapUnion(...)
description: Returns a map created by merging at least one map. These maps should have a common map type. If there are overlapping keys, the value from 'map2' will overwrite the value from 'map1', the value from 'map3' will overwrite the value from 'map2', the value from 'mapn' will overwrite the value from 'map(n-1)'. If any of maps is null, return null.
- sql: MAP_VALUES(map)
table: MAP.mapValues()
description: Returns the values of the map as array. No order guaranteed.
Expand Down
3 changes: 3 additions & 0 deletions docs/data/sql_functions_zh.yml
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,9 @@ collection:
- sql: MAP_FROM_ARRAYS(array_of_keys, array_of_values)
table: mapFromArrays(array_of_keys, array_of_values)
description: 返回由 key 的数组 keys 和 value 的数组 values 创建的 map。请注意两个数组的长度应该相等。
- sql: MAP_UNION(map1, map2)
table: map1.mapUnion(map2)
description: 返回一个通过合并两个图 'map1' 和 'map2' 创建的图。这两个图应该具有共同的图类型。如果有重叠的键,'map2' 的值将覆盖 'map1' 的值。如果任一图为空,则返回 null。

json:
- sql: IS JSON [ { VALUE | SCALAR | ARRAY | OBJECT } ]
Expand Down
1 change: 1 addition & 0 deletions flink-python/docs/reference/pyflink.table/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ advanced type helper functions
Expression.array_union
Expression.map_entries
Expression.map_keys
Expression.map_union
Expression.map_values
Expression.array_except

Expand Down
11 changes: 11 additions & 0 deletions flink-python/pyflink/table/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -1627,6 +1627,17 @@ def map_keys(self) -> 'Expression':
"""
return _unary_op("mapKeys")(self)

def map_union(self, *maps) -> 'Expression':
"""
Returns a map created by merging at least one map. These maps should have a common map type.
If there are overlapping keys, the value from 'map2' will overwrite the value from 'map1',
the value from 'map3' will overwrite the value from 'map2', the value from 'mapn' will
overwrite the value from 'map(n-1)'. If any of maps is null, return null.
.. seealso:: :py:attr:`~Expression.map_union`
"""
return _binary_op("mapUnion")(self, *maps)

@property
def map_values(self) -> 'Expression':
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LTRIM;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_ENTRIES;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_KEYS;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_UNION;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_VALUES;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAX;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.MD5;
Expand Down Expand Up @@ -1548,6 +1549,21 @@ public OutType mapEntries() {
return toApiSpecificExpression(unresolvedCall(MAP_ENTRIES, toExpr()));
}

/**
* Returns a map created by merging at least one map. These maps should have a common map type.
* If there are overlapping keys, the value from 'map2' will overwrite the value from 'map1',
* the value from 'map3' will overwrite the value from 'map2', the value from 'mapn' will
* overwrite the value from 'map(n-1)'. If any of maps is null, return null.
*/
public OutType mapUnion(InType... inputs) {
Expression[] args =
Stream.concat(
Stream.of(toExpr()),
Arrays.stream(inputs).map(ApiExpressionUtils::objectToExpression))
.toArray(Expression[]::new);
return toApiSpecificExpression(unresolvedCall(MAP_UNION, args));
}

// Time definition

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.apache.flink.table.types.inference.InputTypeStrategies.TYPE_LITERAL;
import static org.apache.flink.table.types.inference.InputTypeStrategies.and;
import static org.apache.flink.table.types.inference.InputTypeStrategies.commonArrayType;
import static org.apache.flink.table.types.inference.InputTypeStrategies.commonMapType;
import static org.apache.flink.table.types.inference.InputTypeStrategies.commonMultipleArrayType;
import static org.apache.flink.table.types.inference.InputTypeStrategies.commonType;
import static org.apache.flink.table.types.inference.InputTypeStrategies.comparable;
Expand Down Expand Up @@ -167,6 +168,16 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.scalar.MapValuesFunction")
.build();

public static final BuiltInFunctionDefinition MAP_UNION =
BuiltInFunctionDefinition.newBuilder()
.name("MAP_UNION")
.kind(SCALAR)
.inputTypeStrategy(commonMapType(1))
.outputTypeStrategy(COMMON)
.runtimeClass(
"org.apache.flink.table.runtime.functions.scalar.MapUnionFunction")
.build();

public static final BuiltInFunctionDefinition MAP_ENTRIES =
BuiltInFunctionDefinition.newBuilder()
.name("MAP_ENTRIES")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.types.inference.strategies.CommonArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CommonArrayInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CommonInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CommonMapInputTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ComparableTypeStrategy;
import org.apache.flink.table.types.inference.strategies.CompositeArgumentTypeStrategy;
import org.apache.flink.table.types.inference.strategies.ConstraintArgumentTypeStrategy;
Expand Down Expand Up @@ -368,6 +369,14 @@ public static InputTypeStrategy commonMultipleArrayType(int minCount) {
/** @see ItemAtIndexArgumentTypeStrategy */
public static final ArgumentTypeStrategy ITEM_AT_INDEX = new ItemAtIndexArgumentTypeStrategy();

/**
* An {@link InputTypeStrategy} that expects {@code minCount} arguments that have a common map
* type.
*/
public static InputTypeStrategy commonMapType(int minCount) {
return new CommonMapInputTypeStrategy(ConstantArgumentCount.from(minCount));
}

// --------------------------------------------------------------------------------------------

private InputTypeStrategies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,96 +19,15 @@
package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.inference.Signature.Argument;
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
import org.apache.flink.table.types.utils.TypeConversions;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** An {@link InputTypeStrategy} that expects that all arguments have a common array type. */
@Internal
public final class CommonArrayInputTypeStrategy implements InputTypeStrategy {

private static final Argument COMMON_ARGUMENT = Argument.ofGroup("COMMON");

private final ArgumentCount argumentCount;
public final class CommonArrayInputTypeStrategy extends CommonCollectionInputTypeStrategy {

public CommonArrayInputTypeStrategy(ArgumentCount argumentCount) {
this.argumentCount = argumentCount;
}

@Override
public ArgumentCount getArgumentCount() {
return argumentCount;
}

@Override
public Optional<List<DataType>> inferInputTypes(
CallContext callContext, boolean throwOnFailure) {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
List<LogicalType> argumentTypes =
argumentDataTypes.stream()
.map(DataType::getLogicalType)
.collect(Collectors.toList());

if (!argumentTypes.stream()
.allMatch(logicalType -> logicalType.is(LogicalTypeRoot.ARRAY))) {
return callContext.fail(throwOnFailure, "All arguments requires to be a ARRAY type");
}

if (argumentTypes.stream().anyMatch(CommonArrayInputTypeStrategy::isLegacyType)) {
return Optional.of(argumentDataTypes);
}

Optional<LogicalType> commonType = LogicalTypeMerging.findCommonType(argumentTypes);

if (!commonType.isPresent()) {
return callContext.fail(
throwOnFailure,
"Could not find a common type for arguments: %s",
argumentDataTypes);
}

return commonType.map(
type ->
Collections.nCopies(
argumentTypes.size(), TypeConversions.fromLogicalToDataType(type)));
}

@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
Optional<Integer> minCount = argumentCount.getMinCount();
Optional<Integer> maxCount = argumentCount.getMaxCount();

int numberOfMandatoryArguments = minCount.orElse(0);

if (maxCount.isPresent()) {
return IntStream.range(numberOfMandatoryArguments, maxCount.get() + 1)
.mapToObj(count -> Signature.of(Collections.nCopies(count, COMMON_ARGUMENT)))
.collect(Collectors.toList());
}

List<Argument> arguments =
new ArrayList<>(Collections.nCopies(numberOfMandatoryArguments, COMMON_ARGUMENT));
arguments.add(Argument.ofGroupVarying("COMMON"));
return Collections.singletonList(Signature.of(arguments));
}

private static boolean isLegacyType(LogicalType type) {
return type instanceof LegacyTypeInformationType;
super(argumentCount, "All arguments requires to be a ARRAY type", LogicalTypeRoot.ARRAY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.inference.Signature.Argument;
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.utils.LogicalTypeMerging;
import org.apache.flink.table.types.utils.TypeConversions;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/** An {@link InputTypeStrategy} that expects that all arguments have a common type. */
@Internal
public class CommonCollectionInputTypeStrategy implements InputTypeStrategy {

private static final Argument COMMON_ARGUMENT = Argument.ofGroup("COMMON");

private final ArgumentCount argumentCount;

private final LogicalTypeRoot logicalTypeRoot;
private final String errorMessage;

public CommonCollectionInputTypeStrategy(
ArgumentCount argumentCount, String errorMessage, LogicalTypeRoot logicalTypeRoot) {
this.argumentCount = argumentCount;
this.errorMessage = errorMessage;
this.logicalTypeRoot = logicalTypeRoot;
}

@Override
public ArgumentCount getArgumentCount() {
return argumentCount;
}

@Override
public Optional<List<DataType>> inferInputTypes(
CallContext callContext, boolean throwOnFailure) {
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
List<LogicalType> argumentTypes =
argumentDataTypes.stream()
.map(DataType::getLogicalType)
.collect(Collectors.toList());

if (!argumentTypes.stream().allMatch(logicalType -> logicalType.is(logicalTypeRoot))) {
return callContext.fail(throwOnFailure, errorMessage);
}

if (argumentTypes.stream().anyMatch(CommonCollectionInputTypeStrategy::isLegacyType)) {
return Optional.of(argumentDataTypes);
}

Optional<LogicalType> commonType = LogicalTypeMerging.findCommonType(argumentTypes);

if (!commonType.isPresent()) {
return callContext.fail(
throwOnFailure,
"Could not find a common type for arguments: %s",
argumentDataTypes);
}

return commonType.map(
type ->
Collections.nCopies(
argumentTypes.size(), TypeConversions.fromLogicalToDataType(type)));
}

@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
Optional<Integer> minCount = argumentCount.getMinCount();
Optional<Integer> maxCount = argumentCount.getMaxCount();

int numberOfMandatoryArguments = minCount.orElse(0);

if (maxCount.isPresent()) {
return IntStream.range(numberOfMandatoryArguments, maxCount.get() + 1)
.mapToObj(count -> Signature.of(Collections.nCopies(count, COMMON_ARGUMENT)))
.collect(Collectors.toList());
}

List<Argument> arguments =
new ArrayList<>(Collections.nCopies(numberOfMandatoryArguments, COMMON_ARGUMENT));
arguments.add(Argument.ofGroupVarying("COMMON"));
return Collections.singletonList(Signature.of(arguments));
}

private static boolean isLegacyType(LogicalType type) {
return type instanceof LegacyTypeInformationType;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.types.inference.strategies;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.logical.LogicalTypeRoot;

/** An {@link InputTypeStrategy} that expects that all arguments have a common map type. */
@Internal
public final class CommonMapInputTypeStrategy extends CommonCollectionInputTypeStrategy {

public CommonMapInputTypeStrategy(ArgumentCount argumentCount) {
super(argumentCount, "All arguments requires to be a MAP type", LogicalTypeRoot.MAP);
}
}
Loading

0 comments on commit c2a6fee

Please sign in to comment.