Skip to content

Commit

Permalink
Merge pull request #16 from TongchengOpenSource/filter_transform_2
Browse files Browse the repository at this point in the history
[Improve][Transform] Remove can't find field exception
  • Loading branch information
xiaochen-zhou authored Apr 19, 2024
2 parents 4f4fd7b + 344f17c commit e8b94a8
Showing 1 changed file with 19 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,23 @@
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

@Slf4j
public class FilterFieldTransform extends AbstractCatalogSupportTransform {
public static final String PLUGIN_NAME = "Filter";
private int[] inputValueIndex;
private final String[] fields;
private final List<String> fields;

public FilterFieldTransform(
@NonNull ReadonlyConfig config, @NonNull CatalogTable catalogTable) {
super(catalogTable);
SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new String[0]);
fields = config.get(FilterFieldTransformConfig.KEY_FIELDS);
List<String> canNotFoundFields =
Arrays.stream(fields)
.filter(
field -> {
try {
seaTunnelRowType.indexOf(field);
return false;
} catch (Exception e) {
return true;
}
})
fields.stream()
.filter(field -> seaTunnelRowType.indexOf(field, false) == -1)
.collect(Collectors.toList());

if (!CollectionUtils.isEmpty(canNotFoundFields)) {
Expand All @@ -77,34 +68,32 @@ public String getPluginName() {
@Override
protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
// todo reuse array container if not remove fields
Object[] values = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
Object[] values = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
values[i] = inputRow.getField(inputValueIndex[i]);
}
return new SeaTunnelRow(values);
SeaTunnelRow outputRow = new SeaTunnelRow(values);
outputRow.setRowKind(inputRow.getRowKind());
outputRow.setTableId(inputRow.getTableId());
return outputRow;
}

@Override
protected TableSchema transformTableSchema() {
List<String> filterFields = Arrays.asList(fields);
List<Column> outputColumns = new ArrayList<>();

SeaTunnelRowType seaTunnelRowType =
inputCatalogTable.getTableSchema().toPhysicalRowDataType();

inputValueIndex = new int[filterFields.size()];
inputValueIndex = new int[fields.size()];
ArrayList<String> outputFieldNames = new ArrayList<>();
for (int i = 0; i < filterFields.size(); i++) {
String field = filterFields.get(i);
List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();
for (int i = 0; i < fields.size(); i++) {
String field = fields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(field);
if (inputFieldIndex == -1) {
throw TransformCommonError.cannotFindInputFieldError(getPluginName(), field);
}
inputValueIndex[i] = inputFieldIndex;
outputColumns.add(
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
outputFieldNames.add(
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).getName());
outputColumns.add(inputColumns.get(inputFieldIndex).copy());
outputFieldNames.add(inputColumns.get(inputFieldIndex).getName());
}

List<ConstraintKey> outputConstraintKeys =
Expand All @@ -123,10 +112,9 @@ protected TableSchema transformTableSchema() {
.collect(Collectors.toList());

PrimaryKey copiedPrimaryKey = null;
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
&& outputFieldNames.containsAll(
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
copiedPrimaryKey = inputCatalogTable.getTableSchema().getPrimaryKey().copy();
PrimaryKey primaryKey = inputCatalogTable.getTableSchema().getPrimaryKey();
if (primaryKey != null && outputFieldNames.containsAll(primaryKey.getColumnNames())) {
copiedPrimaryKey = primaryKey.copy();
}

return TableSchema.builder()
Expand Down

0 comments on commit e8b94a8

Please sign in to comment.