From f8cca48ba29ea67562dd12c25f114086df0ea7bf Mon Sep 17 00:00:00 2001 From: ClownXC Date: Wed, 10 Apr 2024 12:18:54 +0800 Subject: [PATCH] remove can't find field exception --- .../filter/FilterFieldTransform.java | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java index aaf3168e1b6..deadbbcab10 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java @@ -52,17 +52,8 @@ public FilterFieldTransform( fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new String[0]); List canNotFoundFields = Arrays.stream(fields) - .filter( - field -> { - try { - seaTunnelRowType.indexOf(field); - return false; - } catch (Exception e) { - return true; - } - }) + .filter(field -> seaTunnelRowType.indexOf(field, false) == -1) .collect(Collectors.toList()); - if (!CollectionUtils.isEmpty(canNotFoundFields)) { throw TransformCommonError.cannotFindInputFieldsError( getPluginName(), canNotFoundFields); @@ -81,7 +72,10 @@ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) { for (int i = 0; i < fields.length; i++) { values[i] = inputRow.getField(inputValueIndex[i]); } - return new SeaTunnelRow(values); + SeaTunnelRow outputRow = new SeaTunnelRow(values); + outputRow.setRowKind(inputRow.getRowKind()); + outputRow.setTableId(inputRow.getTableId()); + return outputRow; } @Override @@ -94,17 +88,13 @@ protected TableSchema transformTableSchema() { inputValueIndex = new int[filterFields.size()]; ArrayList outputFieldNames = new ArrayList<>(); + List inputColumns = inputCatalogTable.getTableSchema().getColumns(); for (int i = 0; i < filterFields.size(); i++) { String field = filterFields.get(i); int inputFieldIndex = seaTunnelRowType.indexOf(field); - if (inputFieldIndex == -1) { - throw TransformCommonError.cannotFindInputFieldError(getPluginName(), field); - } inputValueIndex[i] = inputFieldIndex; - outputColumns.add( - inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).copy()); - outputFieldNames.add( - inputCatalogTable.getTableSchema().getColumns().get(inputFieldIndex).getName()); + outputColumns.add(inputColumns.get(inputFieldIndex).copy()); + outputFieldNames.add(inputColumns.get(inputFieldIndex).getName()); } List outputConstraintKeys = @@ -123,10 +113,9 @@ protected TableSchema transformTableSchema() { .collect(Collectors.toList()); PrimaryKey copiedPrimaryKey = null; - if (inputCatalogTable.getTableSchema().getPrimaryKey() != null - && outputFieldNames.containsAll( - inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) { - copiedPrimaryKey = inputCatalogTable.getTableSchema().getPrimaryKey().copy(); + PrimaryKey primaryKey = inputCatalogTable.getTableSchema().getPrimaryKey(); + if (primaryKey != null && outputFieldNames.containsAll(primaryKey.getColumnNames())) { + copiedPrimaryKey = primaryKey.copy(); } return TableSchema.builder()