Skip to content

Commit

Permalink
Merge pull request #15 from TongchengOpenSource/revert-13-filter_tran…
Browse files Browse the repository at this point in the history
…sform

Revert "[Improve][Transform] Remove can't find field exception"
  • Loading branch information
xiaochen-zhou authored Apr 11, 2024
2 parents 92ffa4b + 5f3dbce commit a917337
Showing 1 changed file with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,17 @@ public FilterFieldTransform(
fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new String[0]);
List<String> canNotFoundFields =
Arrays.stream(fields)
.filter(field -> seaTunnelRowType.indexOf(field, false) == -1)
.filter(
field -> {
try {
seaTunnelRowType.indexOf(field);
return false;
} catch (Exception e) {
return true;
}
})
.collect(Collectors.toList());

if (!CollectionUtils.isEmpty(canNotFoundFields)) {
throw TransformCommonError.cannotFindInputFieldsError(
getPluginName(), canNotFoundFields);
Expand All @@ -72,10 +81,7 @@ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
for (int i = 0; i < fields.length; i++) {
values[i] = inputRow.getField(inputValueIndex[i]);
}
SeaTunnelRow outputRow = new SeaTunnelRow(values);
outputRow.setRowKind(inputRow.getRowKind());
outputRow.setTableId(inputRow.getTableId());
return outputRow;
return new SeaTunnelRow(values);
}

@Override
Expand All @@ -88,13 +94,17 @@ protected TableSchema transformTableSchema() {

inputValueIndex = new int[filterFields.size()];
ArrayList<String> outputFieldNames = new ArrayList<>();
List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();
for (int i = 0; i < filterFields.size(); i++) {
String field = filterFields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(field);
if (inputFieldIndex == -1) {
throw TransformCommonError.cannotFindInputFieldError(getPluginName(), field);
}
inputValueIndex[i] = inputFieldIndex;
outputColumns.add(inputColumns.get(inputFieldIndex).copy());
outputFieldNames.add(inputColumns.get(inputFieldIndex).getName());
outputColumns.add(
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
outputFieldNames.add(
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).getName());
}

List<ConstraintKey> outputConstraintKeys =
Expand All @@ -113,9 +123,10 @@ protected TableSchema transformTableSchema() {
.collect(Collectors.toList());

PrimaryKey copiedPrimaryKey = null;
PrimaryKey primaryKey = inputCatalogTable.getTableSchema().getPrimaryKey();
if (primaryKey != null && outputFieldNames.containsAll(primaryKey.getColumnNames())) {
copiedPrimaryKey = primaryKey.copy();
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
&& outputFieldNames.containsAll(
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
copiedPrimaryKey = inputCatalogTable.getTableSchema().getPrimaryKey().copy();
}

return TableSchema.builder()
Expand Down

0 comments on commit a917337

Please sign in to comment.