Skip to content

Commit

Permalink
[Improve] Code clean for SeaTunnel transform (#5810)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Nov 10, 2023
1 parent 59cccb6 commit 50fe637
Show file tree
Hide file tree
Showing 14 changed files with 46 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,12 @@ public CatalogTable getProducedCatalogTable() {
private CatalogTable transformCatalogTable() {
TableIdentifier tableIdentifier = transformTableIdentifier();
TableSchema tableSchema = transformTableSchema();
CatalogTable catalogTable =
CatalogTable.of(
tableIdentifier,
tableSchema,
inputCatalogTable.getOptions(),
inputCatalogTable.getPartitionKeys(),
inputCatalogTable.getComment());
return catalogTable;
return CatalogTable.of(
tableIdentifier,
tableSchema,
inputCatalogTable.getOptions(),
inputCatalogTable.getPartitionKeys(),
inputCatalogTable.getComment());
}

protected abstract TableSchema transformTableSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ protected TableSchema transformTableSchema() {

TableSchema.Builder builder = TableSchema.builder();
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
builder = builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
}
builder = builder.constraintKey(copiedConstraintKeys);
builder.constraintKey(copiedConstraintKeys);
List<Column> columns =
inputCatalogTable.getTableSchema().getColumns().stream()
.map(Column::copy)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ protected TableSchema transformTableSchema() {

TableSchema.Builder builder = TableSchema.builder();
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) {
builder = builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
}
builder = builder.constraintKey(copiedConstraintKeys);
builder.constraintKey(copiedConstraintKeys);
List<Column> columns =
inputCatalogTable.getTableSchema().getColumns().stream()
.map(Column::copy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
public class CopyFieldTransform extends MultipleFieldOutputTransform {
public static final String PLUGIN_NAME = "Copy";

private CopyTransformConfig config;
private final CopyTransformConfig config;
private List<String> fieldNames;
private List<Integer> fieldOriginalIndexs;
private List<SeaTunnelDataType> fieldTypes;
private List<Integer> fieldOriginalIndexes;
private List<SeaTunnelDataType<?>> fieldTypes;

public CopyFieldTransform(CopyTransformConfig copyTransformConfig, CatalogTable catalogTable) {
super(catalogTable);
Expand All @@ -59,8 +59,8 @@ public String getPluginName() {
private void initOutputFields(
SeaTunnelRowType inputRowType, LinkedHashMap<String, String> fields) {
List<String> fieldNames = new ArrayList<>();
List<Integer> fieldOriginalIndexs = new ArrayList<>();
List<SeaTunnelDataType> fieldsType = new ArrayList<>();
List<Integer> fieldOriginalIndexes = new ArrayList<>();
List<SeaTunnelDataType<?>> fieldsType = new ArrayList<>();
for (Map.Entry<String, String> field : fields.entrySet()) {
String srcField = field.getValue();
int srcFieldIndex = inputRowType.indexOf(srcField);
Expand All @@ -69,11 +69,11 @@ private void initOutputFields(
"Cannot find [" + srcField + "] field in input row type");
}
fieldNames.add(field.getKey());
fieldOriginalIndexs.add(srcFieldIndex);
fieldOriginalIndexes.add(srcFieldIndex);
fieldsType.add(inputRowType.getFieldType(srcFieldIndex));
}
this.fieldNames = fieldNames;
this.fieldOriginalIndexs = fieldOriginalIndexs;
this.fieldOriginalIndexes = fieldOriginalIndexes;
this.fieldTypes = fieldsType;
}

Expand Down Expand Up @@ -111,14 +111,14 @@ protected Column[] getOutputColumns() {
@Override
protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[fieldNames.size()];
for (int i = 0; i < fieldOriginalIndexs.size(); i++) {
for (int i = 0; i < fieldOriginalIndexes.size(); i++) {
fieldValues[i] =
clone(fieldTypes.get(i), inputRow.getField(fieldOriginalIndexs.get(i)));
clone(fieldTypes.get(i), inputRow.getField(fieldOriginalIndexes.get(i)));
}
return fieldValues;
}

private Object clone(SeaTunnelDataType dataType, Object value) {
private Object clone(SeaTunnelDataType<?> dataType, Object value) {
if (value == null) {
return null;
}
Expand All @@ -138,18 +138,12 @@ private Object clone(SeaTunnelDataType dataType, Object value) {
return value;
case BYTES:
byte[] bytes = (byte[]) value;
if (bytes == null) {
return null;
}
byte[] newBytes = new byte[bytes.length];
System.arraycopy(bytes, 0, newBytes, 0, bytes.length);
return newBytes;
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
Object[] array = (Object[]) value;
if (array == null) {
return null;
}
Object newArray =
Array.newInstance(arrayType.getElementType().getTypeClass(), array.length);
for (int i = 0; i < array.length; i++) {
Expand All @@ -159,7 +153,7 @@ private Object clone(SeaTunnelDataType dataType, Object value) {
case MAP:
MapType mapType = (MapType) dataType;
Map map = (Map) value;
Map newMap = new HashMap();
Map<Object, Object> newMap = new HashMap<>();
for (Object key : map.keySet()) {
newMap.put(
clone(mapType.getKeyType(), key),
Expand All @@ -169,9 +163,6 @@ private Object clone(SeaTunnelDataType dataType, Object value) {
case ROW:
SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
SeaTunnelRow row = (SeaTunnelRow) value;
if (row == null) {
return null;
}

Object[] newFields = new Object[rowType.getTotalFields()];
for (int i = 0; i < rowType.getTotalFields(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
@Slf4j
public class FieldMapperTransform extends AbstractCatalogSupportTransform {
public static String PLUGIN_NAME = "FieldMapper";
private FieldMapperTransformConfig config;
private final FieldMapperTransformConfig config;
private List<Integer> needReaderColIndex;

public FieldMapperTransform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class FilterFieldTransform extends AbstractCatalogSupportTransform {
public static final String PLUGIN_NAME = "Filter";
private int[] inputValueIndex;
private String[] fields;
private final String[] fields;

public FilterFieldTransform(
@NonNull ReadonlyConfig config, @NonNull CatalogTable catalogTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ public String getPluginName() {

private void initConfig(ReadonlyConfig config) {
if (config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS) == null) {
excludeKinds =
new HashSet<RowKind>(config.get(FilterRowKinkTransformConfig.EXCLUDE_KINDS));
excludeKinds = new HashSet<>(config.get(FilterRowKinkTransformConfig.EXCLUDE_KINDS));
} else {
includeKinds =
new HashSet<RowKind>(config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS));
includeKinds = new HashSet<>(config.get(FilterRowKinkTransformConfig.INCLUDE_KINDS));
}
if ((includeKinds.isEmpty() && excludeKinds.isEmpty())
|| (!includeKinds.isEmpty() && !excludeKinds.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.stream.Collectors;

public class ReplaceTransform extends SingleFieldOutputTransform {
private ReadonlyConfig config;
private final ReadonlyConfig config;
private int inputFieldIndex;

public ReplaceTransform(
Expand Down Expand Up @@ -65,9 +65,8 @@ protected Object getOutputFieldValue(SeaTunnelRowAccessor inputRow) {
}

boolean isRegex =
config.get(ReplaceTransformConfig.KEY_IS_REGEX) == null
? false
: config.get(ReplaceTransformConfig.KEY_IS_REGEX);
config.get(ReplaceTransformConfig.KEY_IS_REGEX) != null
&& config.get(ReplaceTransformConfig.KEY_IS_REGEX);
if (isRegex) {
if (config.get(ReplaceTransformConfig.KEY_REPLACE_FIRST)) {
return inputFieldValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
import lombok.NonNull;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class SplitTransform extends MultipleFieldOutputTransform {
private SplitTransformConfig splitTransformConfig;
private int splitFieldIndex;
private final SplitTransformConfig splitTransformConfig;
private final int splitFieldIndex;

public SplitTransform(
@NonNull SplitTransformConfig splitTransformConfig,
Expand Down Expand Up @@ -79,14 +77,11 @@ protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {

@Override
protected Column[] getOutputColumns() {
List<PhysicalColumn> collect =
Arrays.stream(splitTransformConfig.getOutputFields())
.map(
fieldName -> {
return PhysicalColumn.of(
fieldName, BasicType.STRING_TYPE, 200, true, "", "");
})
.collect(Collectors.toList());
return collect.toArray(new Column[0]);
return Arrays.stream(splitTransformConfig.getOutputFields())
.map(
fieldName ->
PhysicalColumn.of(
fieldName, BasicType.STRING_TYPE, 200, true, "", ""))
.toArray(Column[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public class SQLTransform extends AbstractCatalogSupportTransform {
.defaultValue(ZETA.name())
.withDescription("The SQL engine type");

private String query;
private final String query;

private EngineType engineType;
private final EngineType engineType;

private transient SQLEngine sqlEngine;

Expand Down Expand Up @@ -126,7 +126,7 @@ protected TableSchema transformTableSchema() {
if (inputCatalogTable.getTableSchema().getPrimaryKey() != null
&& outputColumns.containsAll(
inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames())) {
builder = builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
builder.primaryKey(inputCatalogTable.getTableSchema().getPrimaryKey().copy());
}

List<ConstraintKey> outputConstraintKeys =
Expand All @@ -144,7 +144,7 @@ protected TableSchema transformTableSchema() {
.map(ConstraintKey::copy)
.collect(Collectors.toList());

builder = builder.constraintKey(outputConstraintKeys);
builder.constraintKey(outputConstraintKeys);

String[] fieldNames = outRowType.getFieldNames();
SeaTunnelDataType<?>[] fieldTypes = outRowType.getFieldTypes();
Expand Down
Loading

0 comments on commit 50fe637

Please sign in to comment.