Skip to content

Commit

Permalink
Merge pull request #13 from TongchengOpenSource/filter_transform
Browse files Browse the repository at this point in the history
[Improve][Transform] Remove can't find field exception
  • Loading branch information
xiaochen-zhou authored Apr 10, 2024
2 parents 5bd2dc6 + f8cca48 commit 341328f
Showing 1 changed file with 11 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,8 @@ public FilterFieldTransform(
fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new String[0]);
List<String> canNotFoundFields =
Arrays.stream(fields)
.filter(
field -> {
try {
seaTunnelRowType.indexOf(field);
return false;
} catch (Exception e) {
return true;
}
})
.filter(field -> seaTunnelRowType.indexOf(field, false) == -1)
.collect(Collectors.toList());

if (!CollectionUtils.isEmpty(canNotFoundFields)) {
throw TransformCommonError.cannotFindInputFieldsError(
getPluginName(), canNotFoundFields);
Expand All @@ -81,7 +72,10 @@ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
for (int i = 0; i < fields.length; i++) {
values[i] = inputRow.getField(inputValueIndex[i]);
}
return new SeaTunnelRow(values);
SeaTunnelRow outputRow = new SeaTunnelRow(values);
outputRow.setRowKind(inputRow.getRowKind());
outputRow.setTableId(inputRow.getTableId());
return outputRow;
}

@Override
Expand All @@ -94,17 +88,13 @@ protected TableSchema transformTableSchema() {

inputValueIndex = new int[filterFields.size()];
ArrayList<String> outputFieldNames = new ArrayList<>();
List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();
for (int i = 0; i < filterFields.size(); i++) {
String field = filterFields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(field);
if (inputFieldIndex == -1) {
throw TransformCommonError.cannotFindInputFieldError(getPluginName(), field);
}
inputValueIndex[i] = inputFieldIndex;
outputColumns.add(
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy());
outputFieldNames.add(
inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).getName());
outputColumns.add(inputColumns.get(inputFieldIndex).copy());
outputFieldNames.add(inputColumns.get(inputFieldIndex).getName());
}

List<ConstraintKey> outputConstraintKeys =
Expand All @@ -123,10 +113,9 @@ protected TableSchema transformTableSchema() {
.collect(Collectors.toList());

PrimaryKey copiedPrimaryKey = null;
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
&& outputFieldNames.containsAll(
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
copiedPrimaryKey = inputCatalogTable.getTableSchema().getPrimaryKey().copy();
PrimaryKey primaryKey = inputCatalogTable.getTableSchema().getPrimaryKey();
if (primaryKey != null && outputFieldNames.containsAll(primaryKey.getColumnNames())) {
copiedPrimaryKey = primaryKey.copy();
}

return TableSchema.builder()
Expand Down

0 comments on commit 341328f

Please sign in to comment.