From f69f773f01b23153dd684babecd3278b56985791 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Sat, 11 Nov 2023 10:18:51 +0800 Subject: [PATCH] [Improve] Add common error for transform (#5815) --- .../transform/copy/CopyFieldTransform.java | 34 +- .../FieldMapperTransformException.java | 37 --- .../FilterFieldTransformErrorCode.java | 42 --- .../exception/TransformCommonError.java | 46 +++ ...ode.java => TransformCommonErrorCode.java} | 10 +- .../exception/TransformException.java | 11 +- .../fieldmapper/FieldMapperTransform.java | 21 +- .../filter/FilterFieldTransform.java | 21 +- .../transform/jsonpath/JsonPathTransform.java | 15 +- .../transform/replace/ReplaceTransform.java | 15 +- .../transform/split/SplitTransform.java | 12 +- .../exception/TransformErrorTest.java | 291 ++++++++++++++++++ 12 files changed, 415 insertions(+), 140 deletions(-) delete mode 100644 seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java delete mode 100644 seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java create mode 100644 seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java rename seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/{FieldMapperTransformErrorCode.java => TransformCommonErrorCode.java} (74%) create mode 100644 seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java index 3455b8e8b4b..1718030db68 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransform.java @@ -25,8 +25,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; +import org.apache.seatunnel.transform.exception.TransformCommonError; import java.lang.reflect.Array; import java.util.ArrayList; @@ -63,10 +65,11 @@ private void initOutputFields( List> fieldsType = new ArrayList<>(); for (Map.Entry field : fields.entrySet()) { String srcField = field.getValue(); - int srcFieldIndex = inputRowType.indexOf(srcField); - if (srcFieldIndex == -1) { - throw new IllegalArgumentException( - "Cannot find [" + srcField + "] field in input row type"); + int srcFieldIndex; + try { + srcFieldIndex = inputRowType.indexOf(srcField); + } catch (IllegalArgumentException e) { + throw TransformCommonError.cannotFindInputFieldError(getPluginName(), srcField); } fieldNames.add(field.getKey()); fieldOriginalIndexes.add(srcFieldIndex); @@ -113,12 +116,15 @@ protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) { Object[] fieldValues = new Object[fieldNames.size()]; for (int i = 0; i < fieldOriginalIndexes.size(); i++) { fieldValues[i] = - clone(fieldTypes.get(i), inputRow.getField(fieldOriginalIndexes.get(i))); + clone( + fieldNames.get(i), + fieldTypes.get(i), + inputRow.getField(fieldOriginalIndexes.get(i))); } return fieldValues; } - private Object clone(SeaTunnelDataType dataType, Object value) { + private Object clone(String field, SeaTunnelDataType dataType, Object value) { if (value == null) { return null; } @@ -147,7 +153,7 @@ private Object clone(SeaTunnelDataType dataType, Object value) { Object newArray = Array.newInstance(arrayType.getElementType().getTypeClass(), array.length); for (int i = 0; i < array.length; i++) { - Array.set(newArray, i, clone(arrayType.getElementType(), array[i])); + Array.set(newArray, i, clone(field, arrayType.getElementType(), array[i])); } return newArray; case MAP: @@ -156,8 +162,8 @@ private Object clone(SeaTunnelDataType dataType, Object value) { Map newMap = new HashMap<>(); for (Object key : map.keySet()) { newMap.put( - clone(mapType.getKeyType(), key), - clone(mapType.getValueType(), map.get(key))); + clone(field, mapType.getKeyType(), key), + clone(field, mapType.getValueType(), map.get(key))); } return newMap; case ROW: @@ -166,7 +172,11 @@ private Object clone(SeaTunnelDataType dataType, Object value) { Object[] newFields = new Object[rowType.getTotalFields()]; for (int i = 0; i < rowType.getTotalFields(); i++) { - newFields[i] = clone(rowType.getFieldType(i), row.getField(i)); + newFields[i] = + clone( + rowType.getFieldName(i), + rowType.getFieldType(i), + row.getField(i)); } SeaTunnelRow newRow = new SeaTunnelRow(newFields); newRow.setRowKind(row.getRowKind()); @@ -175,8 +185,8 @@ private Object clone(SeaTunnelDataType dataType, Object value) { case NULL: return null; default: - throw new UnsupportedOperationException( - "Unsupported type: " + dataType.getSqlType()); + throw CommonError.unsupportedDataType( + getPluginName(), dataType.getSqlType().toString(), field); } } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java deleted file mode 100644 index d6d5ba16ebd..00000000000 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformException.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.transform.exception; - -import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; -import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; - -public class FieldMapperTransformException extends SeaTunnelRuntimeException { - public FieldMapperTransformException( - SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { - super(seaTunnelErrorCode, errorMessage); - } - - public FieldMapperTransformException( - SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { - super(seaTunnelErrorCode, errorMessage, cause); - } - - public FieldMapperTransformException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { - super(seaTunnelErrorCode, cause); - } -} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java deleted file mode 100644 index 59d2d070878..00000000000 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FilterFieldTransformErrorCode.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.transform.exception; - -import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; - -public enum FilterFieldTransformErrorCode implements SeaTunnelErrorCode { - FILTER_FIELD_NOT_FOUND("FILTER_FIELD_TRANSFORM-01", "filter field not found"); - - private final String code; - private final String description; - - FilterFieldTransformErrorCode(String code, String description) { - this.code = code; - this.description = description; - } - - @Override - public String getCode() { - return this.code; - } - - @Override - public String getDescription() { - return this.description; - } -} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java new file mode 100644 index 00000000000..b35df6a448b --- /dev/null +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonError.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.exception; + +import org.apache.seatunnel.common.exception.CommonError; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND; +import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND; + +/** The common error of SeaTunnel transform. Please refer {@link CommonError} */ +public class TransformCommonError { + + public static TransformException cannotFindInputFieldError(String transform, String field) { + Map params = new HashMap<>(); + params.put("field", field); + params.put("transform", transform); + return new TransformException(INPUT_FIELD_NOT_FOUND, params); + } + + public static TransformException cannotFindInputFieldsError( + String transform, List fields) { + Map params = new HashMap<>(); + params.put("fields", String.join(",", fields)); + params.put("transform", transform); + return new TransformException(INPUT_FIELDS_NOT_FOUND, params); + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java similarity index 74% rename from seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java rename to seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java index 5c5de9c444d..4a5eea66c77 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/FieldMapperTransformErrorCode.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformCommonErrorCode.java @@ -19,14 +19,18 @@ import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; -public enum FieldMapperTransformErrorCode implements SeaTunnelErrorCode { +enum TransformCommonErrorCode implements SeaTunnelErrorCode { INPUT_FIELD_NOT_FOUND( - "FIELD_MAPPER_TRANSFORM-01", "field mapper input field not found in inputRowType"); + "TRANSFORM_COMMON-01", + "The input field '' of '' transform not found in upstream schema"), + INPUT_FIELDS_NOT_FOUND( + "TRANSFORM_COMMON-02", + "The input fields '' of '' transform not found in upstream schema"); private final String code; private final String description; - FieldMapperTransformErrorCode(String code, String description) { + TransformCommonErrorCode(String code, String description) { this.code = code; this.description = description; } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java index 8d1838473cd..77467a7bd26 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/exception/TransformException.java @@ -20,17 +20,14 @@ import org.apache.seatunnel.common.exception.SeaTunnelErrorCode; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import java.util.Map; + public class TransformException extends SeaTunnelRuntimeException { public TransformException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) { super(seaTunnelErrorCode, errorMessage); } - public TransformException( - SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) { - super(seaTunnelErrorCode, errorMessage, cause); - } - - public TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) { - super(seaTunnelErrorCode, cause); + TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Map params) { + super(seaTunnelErrorCode, params); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java index 6d4e312f203..037d4ba7424 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransform.java @@ -27,9 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform; -import org.apache.seatunnel.transform.exception.FieldMapperTransformErrorCode; -import org.apache.seatunnel.transform.exception.FieldMapperTransformException; -import org.apache.seatunnel.transform.exception.TransformException; +import org.apache.seatunnel.transform.exception.TransformCommonError; import org.apache.commons.collections4.CollectionUtils; @@ -56,11 +54,18 @@ public FieldMapperTransform( SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); List notFoundField = fieldMapper.keySet().stream() - .filter(field -> seaTunnelRowType.indexOf(field) == -1) + .filter( + field -> { + try { + seaTunnelRowType.indexOf(field); + return false; + } catch (Exception e) { + return true; + } + }) .collect(Collectors.toList()); if (!CollectionUtils.isEmpty(notFoundField)) { - throw new TransformException( - FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND, notFoundField.toString()); + throw TransformCommonError.cannotFindInputFieldsError(getPluginName(), notFoundField); } } @@ -97,9 +102,7 @@ protected TableSchema transformTableSchema() { (key, value) -> { int fieldIndex = inputFieldNames.indexOf(key); if (fieldIndex < 0) { - throw new FieldMapperTransformException( - FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND, - "Can not found field " + key + " from inputRowType"); + throw TransformCommonError.cannotFindInputFieldError(getPluginName(), key); } Column oldColumn = inputColumns.get(fieldIndex); PhysicalColumn outputColumn = diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java index 0105149036b..aaf3168e1b6 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransform.java @@ -27,8 +27,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform; -import org.apache.seatunnel.transform.exception.FilterFieldTransformErrorCode; -import org.apache.seatunnel.transform.exception.TransformException; +import org.apache.seatunnel.transform.exception.TransformCommonError; import org.apache.commons.collections4.CollectionUtils; @@ -53,13 +52,20 @@ public FilterFieldTransform( fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new String[0]); List canNotFoundFields = Arrays.stream(fields) - .filter(field -> seaTunnelRowType.indexOf(field) == -1) + .filter( + field -> { + try { + seaTunnelRowType.indexOf(field); + return false; + } catch (Exception e) { + return true; + } + }) .collect(Collectors.toList()); if (!CollectionUtils.isEmpty(canNotFoundFields)) { - throw new TransformException( - FilterFieldTransformErrorCode.FILTER_FIELD_NOT_FOUND, - canNotFoundFields.toString()); + throw TransformCommonError.cannotFindInputFieldsError( + getPluginName(), canNotFoundFields); } } @@ -92,8 +98,7 @@ protected TableSchema transformTableSchema() { String field = filterFields.get(i); int inputFieldIndex = seaTunnelRowType.indexOf(field); if (inputFieldIndex == -1) { - throw new IllegalArgumentException( - "Cannot find [" + field + "] field in input row type"); + throw TransformCommonError.cannotFindInputFieldError(getPluginName(), field); } inputValueIndex[i] = inputFieldIndex; outputColumns.add( diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java index 874d29da8b0..6f5397e887b 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/jsonpath/JsonPathTransform.java @@ -24,10 +24,12 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonError; import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.format.json.JsonToRowConverters; import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; +import org.apache.seatunnel.transform.exception.TransformCommonError; import org.apache.seatunnel.transform.exception.TransformException; import com.jayway.jsonpath.JsonPath; @@ -42,7 +44,6 @@ import java.util.concurrent.ConcurrentHashMap; import static org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.JSON_PATH_COMPILE_ERROR; -import static org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.SRC_FIELD_NOT_FOUND; @Slf4j public class JsonPathTransform extends MultipleFieldOutputTransform { @@ -108,10 +109,7 @@ private void initSrcFieldIndexArr() { ColumnConfig columnConfig = columnConfigs.get(i); String srcField = columnConfig.getSrcField(); if (!fieldNameSet.contains(srcField)) { - throw new TransformException( - SRC_FIELD_NOT_FOUND, - String.format( - "JsonPathTransform config not found src_field:[%s]", srcField)); + throw TransformCommonError.cannotFindInputFieldError(getPluginName(), srcField); } this.srcFieldIndexArr[i] = seaTunnelRowType.indexOf(srcField); } @@ -161,9 +159,10 @@ private Object doTransform( jsonString = JsonUtils.toJsonString(row.getFields()); break; default: - throw new UnsupportedOperationException( - "JsonPathTransform unsupported sourceDataType: " - + inputDataType.getSqlType()); + throw CommonError.unsupportedDataType( + getPluginName(), + inputDataType.getSqlType().toString(), + columnConfig.getSrcField()); } Object result = JSON_PATH_CACHE.get(columnConfig.getPath()).read(jsonString); JsonNode jsonNode = JsonUtils.toJsonNode(result); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java index a99aab49b05..5c5451fce74 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransform.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; import org.apache.seatunnel.transform.common.SingleFieldOutputTransform; +import org.apache.seatunnel.transform.exception.TransformCommonError; import org.apache.commons.collections4.CollectionUtils; @@ -50,10 +51,10 @@ public String getPluginName() { } private void initOutputFields(SeaTunnelRowType inputRowType, String replaceField) { - inputFieldIndex = inputRowType.indexOf(replaceField); - if (inputFieldIndex == -1) { - throw new IllegalArgumentException( - "Cannot find [" + replaceField + "] field in input row type"); + try { + inputFieldIndex = inputRowType.indexOf(replaceField); + } catch (IllegalArgumentException e) { + throw TransformCommonError.cannotFindInputFieldError(getPluginName(), replaceField); } } @@ -102,10 +103,8 @@ protected Column getOutputColumn() { .KEY_REPLACE_FIELD))) .collect(Collectors.toList()); if (CollectionUtils.isEmpty(collect)) { - throw new IllegalArgumentException( - "Cannot find [" - + config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD) - + "] field in input catalog table"); + throw TransformCommonError.cannotFindInputFieldError( + getPluginName(), config.get(ReplaceTransformConfig.KEY_REPLACE_FIELD)); } return collect.get(0).copy(); } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java index 46c38639fdd..c1ead2dd0b5 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransform.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform; import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor; +import org.apache.seatunnel.transform.exception.TransformCommonError; import lombok.NonNull; @@ -39,12 +40,11 @@ public SplitTransform( super(catalogTable); this.splitTransformConfig = splitTransformConfig; SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); - splitFieldIndex = seaTunnelRowType.indexOf(splitTransformConfig.getSplitField()); - if (splitFieldIndex == -1) { - throw new IllegalArgumentException( - "Cannot find [" - + splitTransformConfig.getSplitField() - + "] field in input row type"); + try { + splitFieldIndex = seaTunnelRowType.indexOf(splitTransformConfig.getSplitField()); + } catch (IllegalArgumentException e) { + throw TransformCommonError.cannotFindInputFieldError( + getPluginName(), splitTransformConfig.getSplitField()); } this.outputCatalogTable = getProducedCatalogTable(); } diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java new file mode 100644 index 00000000000..c3055118094 --- /dev/null +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/exception/TransformErrorTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.exception; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.transform.copy.CopyFieldTransformFactory; +import org.apache.seatunnel.transform.copy.CopyTransformConfig; +import org.apache.seatunnel.transform.fieldmapper.FieldMapperTransformConfig; +import org.apache.seatunnel.transform.fieldmapper.FieldMapperTransformFactory; +import org.apache.seatunnel.transform.filter.FilterFieldTransformConfig; +import org.apache.seatunnel.transform.filter.FilterFieldTransformFactory; +import org.apache.seatunnel.transform.jsonpath.JsonPathTransformConfig; +import org.apache.seatunnel.transform.jsonpath.JsonPathTransformFactory; +import org.apache.seatunnel.transform.replace.ReplaceTransformConfig; +import org.apache.seatunnel.transform.replace.ReplaceTransformFactory; +import org.apache.seatunnel.transform.split.SplitTransformConfig; +import org.apache.seatunnel.transform.split.SplitTransformFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class TransformErrorTest { + + private static final CatalogTable table = + CatalogTableUtil.getCatalogTable( + "test", + "test", + "test", + "test", + new SeaTunnelRowType( + new String[] {"name"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE})); + + @Test + void testFieldMapperTransformWithError() { + ReadonlyConfig config = + ReadonlyConfig.fromMap( + new HashMap() { + { + put( + FieldMapperTransformConfig.FIELD_MAPPER.key(), + new HashMap() { + { + put("age", "age1"); + } + }); + } + }); + TableTransformFactoryContext context = + new TableTransformFactoryContext( + Collections.singletonList(table), + config, + Thread.currentThread().getContextClassLoader()); + TransformException exception = + Assertions.assertThrows( + TransformException.class, + () -> + new FieldMapperTransformFactory() + .createTransform(context) + .createTransform()); + Assertions.assertEquals( + "ErrorCode:[TRANSFORM_COMMON-02], ErrorDescription:[The input fields 'age' of 'FieldMapper' transform not found in upstream schema]", + exception.getMessage()); + } + + @Test + void testCopyTransformWithError() { + ReadonlyConfig config = + ReadonlyConfig.fromMap( + new HashMap() { + { + put( + CopyTransformConfig.FIELDS.key(), + new HashMap() { + { + put("ageA", "age1"); + } + }); + } + }); + TableTransformFactoryContext context = + new TableTransformFactoryContext( + Collections.singletonList(table), + config, + Thread.currentThread().getContextClassLoader()); + TransformException exception = + Assertions.assertThrows( + TransformException.class, + () -> + new CopyFieldTransformFactory() + .createTransform(context) + .createTransform()); + Assertions.assertEquals( + "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input field 'age1' of 'Copy' transform not found in upstream schema]", + exception.getMessage()); + + ReadonlyConfig config2 = + ReadonlyConfig.fromMap( + new HashMap() { + { + put(CopyTransformConfig.SRC_FIELD.key(), "ageB"); + put(CopyTransformConfig.DEST_FIELD.key(), "age1"); + } + }); + TableTransformFactoryContext context2 = + new TableTransformFactoryContext( + Collections.singletonList(table), + config2, + Thread.currentThread().getContextClassLoader()); + TransformException exception2 = + Assertions.assertThrows( + TransformException.class, + () -> + new CopyFieldTransformFactory() + .createTransform(context2) + .createTransform()); + Assertions.assertEquals( + "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input field 'ageB' of 'Copy' transform not found in upstream schema]", + exception2.getMessage()); + } + + @Test + void testFilterTransformWithError() { + ReadonlyConfig config = + ReadonlyConfig.fromMap( + new HashMap() { + { + put( + FilterFieldTransformConfig.KEY_FIELDS.key(), + new ArrayList() { + { + add("age"); + add("gender"); + } + }); + } + }); + TableTransformFactoryContext context = + new TableTransformFactoryContext( + Collections.singletonList(table), + config, + Thread.currentThread().getContextClassLoader()); + TransformException exception = + Assertions.assertThrows( + TransformException.class, + () -> + new FilterFieldTransformFactory() + .createTransform(context) + .createTransform()); + Assertions.assertEquals( + "ErrorCode:[TRANSFORM_COMMON-02], ErrorDescription:[The input fields 'age,gender' of 'Filter' transform not found in upstream schema]", + exception.getMessage()); + } + + @Test + void testJsonPathTransformWithError() { + ReadonlyConfig config = + ReadonlyConfig.fromMap( + new HashMap() { + { + put( + JsonPathTransformConfig.COLUMNS.key(), + new ArrayList>() { + { + add( + new HashMap() { + { + put( + JsonPathTransformConfig.PATH + .key(), + "path"); + put( + JsonPathTransformConfig + .SRC_FIELD + .key(), + "age"); + put( + JsonPathTransformConfig + .DEST_FIELD + .key(), + "age2"); + } + }); + } + }); + } + }); + TableTransformFactoryContext context = + new TableTransformFactoryContext( + Collections.singletonList(table), + config, + Thread.currentThread().getContextClassLoader()); + TransformException exception = + Assertions.assertThrows( + TransformException.class, + () -> + new JsonPathTransformFactory() + .createTransform(context) + .createTransform()); + Assertions.assertEquals( + "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input field 'age' of 'JsonPath' transform not found in upstream schema]", + exception.getMessage()); + } + + @Test + void testReplaceTransformWithError() { + ReadonlyConfig config = + ReadonlyConfig.fromMap( + new HashMap() { + { + put(ReplaceTransformConfig.KEY_REPLACE_FIELD.key(), "age"); + put(ReplaceTransformConfig.KEY_PATTERN.key(), "1"); + put(ReplaceTransformConfig.KEY_REPLACEMENT.key(), "2"); + put(ReplaceTransformConfig.KEY_IS_REGEX.key(), "false"); + put(ReplaceTransformConfig.KEY_REPLACE_FIRST.key(), "false"); + } + }); + TableTransformFactoryContext context = + new TableTransformFactoryContext( + Collections.singletonList(table), + config, + Thread.currentThread().getContextClassLoader()); + TransformException exception = + Assertions.assertThrows( + TransformException.class, + () -> + new ReplaceTransformFactory() + .createTransform(context) + .createTransform()); + Assertions.assertEquals( + "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input field 'age' of 'Replace' transform not found in upstream schema]", + exception.getMessage()); + } + + @Test + void testSplitTransformWithError() { + ReadonlyConfig config = + ReadonlyConfig.fromMap( + new HashMap() { + { + put(SplitTransformConfig.KEY_SPLIT_FIELD.key(), "age"); + put( + SplitTransformConfig.KEY_OUTPUT_FIELDS.key(), + Arrays.asList("age1", "age2")); + put(SplitTransformConfig.KEY_SEPARATOR.key(), ","); + } + }); + TableTransformFactoryContext context = + new TableTransformFactoryContext( + Collections.singletonList(table), + config, + Thread.currentThread().getContextClassLoader()); + TransformException exception = + Assertions.assertThrows( + TransformException.class, + () -> + new SplitTransformFactory() + .createTransform(context) + .createTransform()); + Assertions.assertEquals( + "ErrorCode:[TRANSFORM_COMMON-01], ErrorDescription:[The input field 'age' of 'Split' transform not found in upstream schema]", + exception.getMessage()); + } +}