diff --git a/pom.xml b/pom.xml
index 77422f3a..07d15745 100644
--- a/pom.xml
+++ b/pom.xml
@@ -310,6 +310,13 @@ limitations under the License.
test
+
+ org.apache.flink
+ flink-shaded-guava
+ 31.1-jre-17.0
+ test
+
+
org.jmockit
jmockit
diff --git a/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java b/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java
index 0e181126..20c91df3 100644
--- a/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java
+++ b/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java
@@ -23,14 +23,28 @@
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import com.starrocks.connector.flink.tools.DataUtil;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
@@ -46,11 +60,13 @@
// Converts arrow field data to Flink data.
public interface ArrowFieldConverter {
- String DATETIME_FORMAT_LONG = "yyyy-MM-dd HH:mm:ss.SSSSSS";
- String DATETIME_FORMAT_SHORT = "yyyy-MM-dd HH:mm:ss";
- String DATE_FORMAT = "yyyy-MM-dd";
-
- Object convert(Object arrowData);
+ /**
+ * Convert the data at the given position of the arrow vector to Flink data.
+ * @param vector the input arrow vector
+ * @param rowIndex the position of the data
+ * @return the flink data
+ */
+ Object convert(FieldVector vector, int rowIndex);
static void checkNullable(boolean isNullable, Object obj) {
if (obj == null && !isNullable) {
@@ -68,43 +84,141 @@ public BooleanConverter(boolean isNullable) {
}
@Override
- public Object convert(Object arrowData) {
- checkNullable(isNullable, arrowData);
- return ((int) arrowData) != 0;
+ public Object convert(FieldVector vector, int rowIndex) {
+ BitVector bitVector = (BitVector) vector;
+ Object value = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
+ checkNullable(isNullable, value);
+ return value;
+ }
+ }
+
+ // Convert from arrow tinyint to flink tinyint
+ class TinyIntConverter implements ArrowFieldConverter {
+
+ private final boolean isNullable;
+
+ public TinyIntConverter(boolean isNullable) {
+ this.isNullable = isNullable;
+ }
+
+ @Override
+ public Object convert(FieldVector vector, int rowIndex) {
+ TinyIntVector tinyIntVector = (TinyIntVector) vector;
+ Object value = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
+ checkNullable(isNullable, value);
+ return value;
+ }
+ }
+
+ // Convert from arrow smallint to flink smallint
+ class SmallIntConverter implements ArrowFieldConverter {
+
+ private final boolean isNullable;
+
+ public SmallIntConverter(boolean isNullable) {
+ this.isNullable = isNullable;
+ }
+
+ @Override
+ public Object convert(FieldVector vector, int rowIndex) {
+ SmallIntVector smallIntVector = (SmallIntVector) vector;
+ Object value = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
+ checkNullable(isNullable, value);
+ return value;
}
}
- // Convert from arrow numeric type to flink numeric type,
- // including tinyint, smallint, int, bigint, float, double
- class NumericConverter implements ArrowFieldConverter {
+ // Convert from arrow int to flink int
+ class IntConverter implements ArrowFieldConverter {
private final boolean isNullable;
- public NumericConverter(boolean isNullable) {
+ public IntConverter(boolean isNullable) {
this.isNullable = isNullable;
}
@Override
- public Object convert(Object arrowData) {
- checkNullable(isNullable, arrowData);
- return arrowData;
+ public Object convert(FieldVector vector, int rowIndex) {
+ IntVector intVector = (IntVector) vector;
+ Object value = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
+ checkNullable(isNullable, value);
+ return value;
+ }
+ }
+
+ // Convert from arrow bigint to flink bigint
+ class BigIntConverter implements ArrowFieldConverter {
+
+ private final boolean isNullable;
+
+ public BigIntConverter(boolean isNullable) {
+ this.isNullable = isNullable;
+ }
+
+ @Override
+ public Object convert(FieldVector vector, int rowIndex) {
+ BigIntVector bigIntVector = (BigIntVector) vector;
+ Object value = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
+ checkNullable(isNullable, value);
+ return value;
+ }
+ }
+
+ // Convert from arrow float to flink float
+ class FloatConverter implements ArrowFieldConverter {
+
+ private final boolean isNullable;
+
+ public FloatConverter(boolean isNullable) {
+ this.isNullable = isNullable;
+ }
+
+ @Override
+ public Object convert(FieldVector vector, int rowIndex) {
+ Float4Vector floatVector = (Float4Vector) vector;
+ Object value = floatVector.isNull(rowIndex) ? null : floatVector.get(rowIndex);
+ checkNullable(isNullable, value);
+ return value;
+ }
+ }
+
+ // Convert from arrow double to flink double
+ class DoubleConverter implements ArrowFieldConverter {
+
+ private final boolean isNullable;
+
+ public DoubleConverter(boolean isNullable) {
+ this.isNullable = isNullable;
+ }
+
+ @Override
+ public Object convert(FieldVector vector, int rowIndex) {
+ Float8Vector floatVector = (Float8Vector) vector;
+ Object value = floatVector.isNull(rowIndex) ? null : floatVector.get(rowIndex);
+ checkNullable(isNullable, value);
+ return value;
}
}
// Convert from arrow decimal type to flink decimal
class DecimalConverter implements ArrowFieldConverter {
+ private final int precision;
+ private final int scale;
private final boolean isNullable;
- public DecimalConverter(boolean isNullable) {
+ public DecimalConverter(int precision, int scale, boolean isNullable) {
+ this.precision = precision;
+ this.scale = scale;
this.isNullable = isNullable;
}
@Override
- public Object convert(Object arrowData) {
- checkNullable(isNullable, arrowData);
- BigDecimal value = (BigDecimal) arrowData;
- return DecimalData.fromBigDecimal(value, value.precision(), value.scale());
+ public Object convert(FieldVector vector, int rowIndex) {
+ DecimalVector decimalVector = (DecimalVector) vector;
+ BigDecimal value = decimalVector.isNull(rowIndex) ? null : decimalVector.getObject(rowIndex);
+ checkNullable(isNullable, value);
+ return value == null ? null : DecimalData.fromBigDecimal(value, precision, scale);
}
}
@@ -118,15 +232,19 @@ public StringConverter(boolean isNullable) {
}
@Override
- public Object convert(Object arrowData) {
- checkNullable(isNullable, arrowData);
- return new String((byte[]) arrowData);
+ public Object convert(FieldVector vector, int rowIndex) {
+ VarCharVector varCharVector = (VarCharVector) vector;
+ String value = varCharVector.isNull(rowIndex) ? null : new String(varCharVector.get(rowIndex));
+ checkNullable(isNullable, value);
+ return StringData.fromString(value);
}
}
// Convert from arrow varchar to flink date
class DateConverter implements ArrowFieldConverter {
+ private static final String DATE_FORMAT = "yyyy-MM-dd";
+
private final boolean isNullable;
public DateConverter(boolean isNullable) {
@@ -134,9 +252,13 @@ public DateConverter(boolean isNullable) {
}
@Override
- public Object convert(Object arrowData) {
- checkNullable(isNullable, arrowData);
- String value = new String((byte[]) arrowData);
+ public Object convert(FieldVector vector, int rowIndex) {
+ VarCharVector varCharVector = (VarCharVector) vector;
+ String value = varCharVector.isNull(rowIndex) ? null : new String(varCharVector.get(rowIndex));
+ checkNullable(isNullable, value);
+ if (value == null) {
+ return null;
+ }
LocalDate date = LocalDate.parse(value, DateTimeFormatter.ofPattern(DATE_FORMAT));
return (int) date.atStartOfDay().toLocalDate().toEpochDay();
}
@@ -145,6 +267,9 @@ public Object convert(Object arrowData) {
// Convert from arrow varchar to flink timestamp-related type
class TimestampConverter implements ArrowFieldConverter {
+ private static final String DATETIME_FORMAT_LONG = "yyyy-MM-dd HH:mm:ss.SSSSSS";
+ private static final String DATETIME_FORMAT_SHORT = "yyyy-MM-dd HH:mm:ss";
+
private final boolean isNullable;
public TimestampConverter(boolean isNullable) {
@@ -152,9 +277,13 @@ public TimestampConverter(boolean isNullable) {
}
@Override
- public Object convert(Object arrowData) {
- checkNullable(isNullable, arrowData);
- String value = new String((byte[]) arrowData);
+ public Object convert(FieldVector vector, int rowIndex) {
+ VarCharVector varCharVector = (VarCharVector) vector;
+ String value = varCharVector.isNull(rowIndex) ? null : new String(varCharVector.get(rowIndex));
+ checkNullable(isNullable, value);
+ if (value == null) {
+ return null;
+ }
if (value.length() < DATETIME_FORMAT_SHORT.length()) {
throw new IllegalArgumentException("Date value length shorter than DATETIME_FORMAT_SHORT");
}
@@ -180,13 +309,19 @@ public ArrayConverter(boolean isNullable, ArrowFieldConverter elementConverter)
}
@Override
- public Object convert(Object arrowData) {
- checkNullable(isNullable, arrowData);
- List> list = (List> ) arrowData;
- Object[] data = new Object[list.size()];
+ public Object convert(FieldVector vector, int rowIndex) {
+ ListVector listVector = (ListVector) vector;
+ if (listVector.isNull(rowIndex)) {
+ checkNullable(isNullable, null);
+ return null;
+ }
+
+ int index = rowIndex * ListVector.OFFSET_WIDTH;
+ int start = listVector.getOffsetBuffer().getInt(index);
+ int end = listVector.getOffsetBuffer().getInt(index + ListVector.OFFSET_WIDTH);
+ Object[] data = new Object[end - start];
for (int i = 0; i < data.length; i++) {
- Object obj = list.get(i);
- data[i] = obj == null ? null : elementConverter.convert(obj);
+ data[i] = elementConverter.convert(listVector.getDataVector(), start + i);
}
return new GenericArrayData(data);
}
@@ -204,19 +339,23 @@ public StructConverter(boolean isNullable, List childConver
}
@Override
- public Object convert(Object arrowData) {
- checkNullable(isNullable, arrowData);
- List> list = (List> ) arrowData;
- GenericRowData rowData = new GenericRowData(list.size());
- for (int i = 0; i < list.size(); i++) {
- Object obj = list.get(i);
- rowData.setField(i, obj == null ? null : childConverters.get(i).convert(obj));
+ public Object convert(FieldVector vector, int rowIndex) {
+ StructVector structVector = (StructVector) vector;
+ if (structVector.isNull(rowIndex)) {
+ checkNullable(isNullable, null);
+ return null;
+ }
+
+ GenericRowData rowData = new GenericRowData(structVector.size());
+ for (int i = 0; i < structVector.size(); i++) {
+ Object obj = childConverters.get(i).convert((FieldVector) structVector.getVectorById(i), rowIndex);
+ rowData.setField(i, obj);
}
return rowData;
}
}
- Map FLINK_AND_ARROW_TYPE_MAPPINGS = new HashMap() {{
+ Map FLINK_AND_ARROW_TYPE_MAPPING = new HashMap() {{
put(LogicalTypeRoot.BOOLEAN, Types.MinorType.BIT);
put(LogicalTypeRoot.TINYINT, Types.MinorType.TINYINT);
put(LogicalTypeRoot.SMALLINT, Types.MinorType.SMALLINT);
@@ -236,7 +375,7 @@ public Object convert(Object arrowData) {
}};
static void checkTypeCompatible(LogicalType flinkType, Field field) {
- Types.MinorType expectMinorType = FLINK_AND_ARROW_TYPE_MAPPINGS.get(flinkType.getTypeRoot());
+ Types.MinorType expectMinorType = FLINK_AND_ARROW_TYPE_MAPPING.get(flinkType.getTypeRoot());
if (expectMinorType == null) {
throw new UnsupportedOperationException(String.format(
"Flink type %s is not supported, and arrow type is %s",
@@ -258,14 +397,20 @@ static ArrowFieldConverter createConverter(LogicalType flinkType, Field arrowFie
case BOOLEAN:
return new BooleanConverter(flinkType.isNullable());
case TINYINT:
+ return new TinyIntConverter(flinkType.isNullable());
case SMALLINT:
+ return new SmallIntConverter(flinkType.isNullable());
case INTEGER:
+ return new IntConverter(flinkType.isNullable());
case BIGINT:
+ return new BigIntConverter(flinkType.isNullable());
case FLOAT:
+ return new FloatConverter(flinkType.isNullable());
case DOUBLE:
- return new NumericConverter(flinkType.isNullable());
+ return new DoubleConverter(flinkType.isNullable());
case DECIMAL:
- return new DecimalConverter(flinkType.isNullable());
+ DecimalType type = (DecimalType) flinkType;
+ return new DecimalConverter(type.getPrecision(), type.getScale(), flinkType.isNullable());
case DATE:
return new DateConverter(flinkType.isNullable());
case TIMESTAMP_WITHOUT_TIME_ZONE:
diff --git a/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java b/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java
index bdc96d2a..cec1f8e2 100644
--- a/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java
+++ b/src/main/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.java
@@ -15,10 +15,10 @@
package com.starrocks.connector.flink.row.source;
import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.util.Preconditions;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
-import com.starrocks.connector.flink.table.source.struct.StarRocksSchema;
import com.starrocks.thrift.TScanBatchResult;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
@@ -36,27 +36,22 @@
public class StarRocksSourceFlinkRows {
- private static Logger LOG = LoggerFactory.getLogger(StarRocksSourceFlinkRows.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StarRocksSourceFlinkRows.class);
private int offsetOfBatchForRead;
- private int rowCountOfBatch;
private int flinkRowsCount;
- private List sourceFlinkRows = new ArrayList<>();
+ private final List sourceFlinkRows = new ArrayList<>();
private final ArrowStreamReader arrowStreamReader;
private VectorSchemaRoot root;
- private List fieldVectors;
- private RootAllocator rootAllocator;
- private List columnRichInfos;
+ private final RootAllocator rootAllocator;
+ private final List columnRichInfos;
private final SelectColumn[] selectedColumns;
- private final StarRocksSchema starRocksSchema;
public StarRocksSourceFlinkRows(TScanBatchResult nextResult,
List columnRichInfos,
- StarRocksSchema srSchema,
SelectColumn[] selectColumns) {
this.columnRichInfos = columnRichInfos;
this.selectedColumns = selectColumns;
- this.starRocksSchema = srSchema;
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
byte[] bytes = nextResult.getRows();
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
@@ -68,11 +63,11 @@ public void init(List fieldConverters) throws IOException {
this.root = arrowStreamReader.getVectorSchemaRoot();
initFiledConverters(fieldConverters);
while (arrowStreamReader.loadNextBatch()) {
- fieldVectors = root.getFieldVectors();
+ List fieldVectors = root.getFieldVectors();
if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
continue;
}
- rowCountOfBatch = root.getRowCount();
+ int rowCountOfBatch = root.getRowCount();
for (int i = 0; i < rowCountOfBatch; i ++) {
sourceFlinkRows.add(new GenericRowData(this.selectedColumns.length));
}
@@ -80,8 +75,13 @@ public void init(List fieldConverters) throws IOException {
FieldVector fieldVector = fieldVectors.get(i);
ArrowFieldConverter converter = fieldConverters.get(i);
for (int rowIndex = 0; rowIndex < rowCountOfBatch; rowIndex++) {
- Object fieldValue = converter.convert(fieldVector.getObject(rowIndex));
- sourceFlinkRows.get(flinkRowsCount + rowIndex).setField(i, fieldValue);
+ try {
+ Object fieldValue = converter.convert(fieldVector, rowIndex);
+ sourceFlinkRows.get(flinkRowsCount + rowIndex).setField(i, fieldValue);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to convert arrow data for field " + fieldVector.getField().getName(), e);
+ }
}
}
flinkRowsCount += rowCountOfBatch;
@@ -97,6 +97,10 @@ private void initFiledConverters(List fieldConverters) {
for (int i = 0; i < schema.getFields().size(); i++) {
Field field = schema.getFields().get(i);
ColumnRichInfo flinkColumn = columnRichInfos.get(selectedColumns[i].getColumnIndexInFlinkTable());
+ Preconditions.checkState(field.getName().equals(flinkColumn.getColumnName()),
+ "The fields at position %s are not same between arrow and flink schemas. " +
+ "Arrow field name is %s, and flink field name is %s.",
+ i, field.getName(), flinkColumn.getColumnName());
ArrowFieldConverter converter = ArrowFieldConverter.createConverter(
flinkColumn.getDataType().getLogicalType(), field);
fieldConverters.add(converter);
diff --git a/src/main/java/com/starrocks/connector/flink/row/source/StarRocksToFlinkTrans.java b/src/main/java/com/starrocks/connector/flink/row/source/StarRocksToFlinkTrans.java
deleted file mode 100644
index 04f690f8..00000000
--- a/src/main/java/com/starrocks/connector/flink/row/source/StarRocksToFlinkTrans.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2021-present StarRocks, Inc. All rights reserved.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.starrocks.connector.flink.row.source;
-
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.types.Types;
-
-public interface StarRocksToFlinkTrans {
- Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable);
-}
-
diff --git a/src/main/java/com/starrocks/connector/flink/row/source/StarRocksToFlinkTranslators.java b/src/main/java/com/starrocks/connector/flink/row/source/StarRocksToFlinkTranslators.java
deleted file mode 100644
index d92020c6..00000000
--- a/src/main/java/com/starrocks/connector/flink/row/source/StarRocksToFlinkTranslators.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Copyright 2021-present StarRocks, Inc. All rights reserved.
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.starrocks.connector.flink.row.source;
-
-import com.starrocks.connector.flink.tools.DataUtil;
-
-
-import org.apache.arrow.vector.BigIntVector;
-import org.apache.arrow.vector.BitVector;
-import org.apache.arrow.vector.DecimalVector;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.SmallIntVector;
-import org.apache.arrow.vector.TinyIntVector;
-import org.apache.arrow.vector.VarCharVector;
-
-import org.apache.arrow.vector.types.Types;
-
-import org.apache.flink.table.data.DecimalData;
-
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-
-import org.apache.flink.util.Preconditions;
-
-
-
-
-import java.math.BigDecimal;
-
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-
-public class StarRocksToFlinkTranslators {
-
-
- public static final String DATETIME_FORMAT_LONG = "yyyy-MM-dd HH:mm:ss.SSSSSS";
- public static final String DATETIME_FORMAT_SHORT = "yyyy-MM-dd HH:mm:ss";
- public static final String DATE_FORMAT = "yyyy-MM-dd";
-
- public class ToFlinkDate implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.String => Flink Date
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.VARCHAR), "");
- Object[] result = new Object[rowCount];
- VarCharVector varCharVector = (VarCharVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex ++) {
- if (varCharVector.isNull(rowIndex)) {
- if (!nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = null;
- continue;
- }
- String value = new String(varCharVector.get(rowIndex));
- LocalDate date = LocalDate.parse(value, DateTimeFormatter.ofPattern(DATE_FORMAT));
- int timestamp = (int)date.atStartOfDay().toLocalDate().toEpochDay();
- result[rowIndex] = timestamp;
- }
- return result;
- }
-
- }
-
- public class ToFlinkTimestamp implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.String => Flink Timestamp
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.VARCHAR), "");
- Object[] result = new Object[rowCount];
- VarCharVector varCharVector = (VarCharVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex ++) {
- if (varCharVector.isNull(rowIndex)) {
- if (!nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = null;
- continue;
- }
- String value = new String(varCharVector.get(rowIndex));
- if (value.length() < DATETIME_FORMAT_SHORT.length()) {
- throw new IllegalArgumentException("Date value length shorter than DATETIME_FORMAT_SHORT");
- }
- if (value.length() == DATETIME_FORMAT_SHORT.length()) {
- StringBuilder sb = new StringBuilder(value).append(".");
- value = DataUtil.addZeroForNum(sb.toString(), DATETIME_FORMAT_LONG.length());
- }
- value = DataUtil.addZeroForNum(value, DATETIME_FORMAT_LONG.length());
- DateTimeFormatter df = DateTimeFormatter.ofPattern(DATETIME_FORMAT_LONG);
- LocalDateTime ldt = LocalDateTime.parse(value, df);
- result[rowIndex] = TimestampData.fromLocalDateTime(ldt);
- }
- return result;
- }
-
- }
-
- public class ToFlinkChar implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.String => Flink CHAR/VARCHAR/STRING
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.VARCHAR), "");
- Object[] result = new Object[rowCount];
- VarCharVector varCharVector = (VarCharVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex ++) {
- if (varCharVector.isNull(rowIndex)) {
- if (!nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = null;
- continue;
- }
- String value = new String(varCharVector.get(rowIndex));
- result[rowIndex] = StringData.fromString(value);
- }
- return result;
- }
-
- }
-
- public class ToFlinkBoolean implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.Bit => Flink boolean
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.BIT), "");
- Object[] result = new Object[rowCount];
- BitVector bitVector = (BitVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- Object fieldValue = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
- if (fieldValue == null && !nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = fieldValue;
- }
- return result;
- }
-
- }
-
- public class ToFlinkTinyInt implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.TinyInt => Flink TinyInt
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.TINYINT), "");
- Object[] result = new Object[rowCount];
- TinyIntVector tinyIntVector = (TinyIntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- Object fieldValue = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
- if (fieldValue == null && !nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = fieldValue;
- }
- return result;
- }
-
- }
-
- public class ToFlinkSmallInt implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.SmalInt => Flink SmalInt
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.SMALLINT), "");
- Object[] result = new Object[rowCount];
- SmallIntVector smallIntVector = (SmallIntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- Object fieldValue = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
- if (fieldValue == null && !nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = fieldValue;
- }
- return result;
- }
-
- }
-
- public class ToFlinkInt implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.Int => Flink Int
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.INT), "");
- Object[] result = new Object[rowCount];
- IntVector intVector = (IntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- Object fieldValue = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
- if (fieldValue == null && !nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = fieldValue;
- }
- return result;
- }
-
- }
-
- public class ToFlinkBigInt implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.BigInt => Flink BigInt
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.BIGINT), "");
- Object[] result = new Object[rowCount];
- BigIntVector bigIntVector = (BigIntVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- Object fieldValue = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
- if (fieldValue == null && !nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = fieldValue;
- }
- return result;
- }
-
- }
-
- public class ToFlinkFloat implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.Float => Flink Float
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.FLOAT4), "");
- Object[] result = new Object[rowCount];
- Float4Vector float4Vector = (Float4Vector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- Object fieldValue = float4Vector.isNull(rowIndex) ? null : float4Vector.get(rowIndex);
- if (fieldValue == null && !nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = fieldValue;
- }
- return result;
-
- }
-
- }
-
- public class ToFlinkDouble implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.Double => Flink Double
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.FLOAT8), "");
- Object[] result = new Object[rowCount];
- Float8Vector float8Vector = (Float8Vector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- Object fieldValue = float8Vector.isNull(rowIndex) ? null : float8Vector.get(rowIndex);
- if (fieldValue == null && !nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = fieldValue;
- }
- return result;
- }
-
- }
-
- public class ToFlinkDecimal implements StarRocksToFlinkTrans {
-
- @Override
- public Object[] transToFlinkData(Types.MinorType beShowDataType, FieldVector curFieldVector, int rowCount, int colIndex, boolean nullable) {
- // beShowDataType.Decimal => Flink Decimal
- Preconditions.checkArgument(beShowDataType.equals(Types.MinorType.DECIMAL), "");
- Object[] result = new Object[rowCount];
- DecimalVector decimalVector = (DecimalVector) curFieldVector;
- for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) {
- if (decimalVector.isNull(rowIndex)) {
- if (!nullable) {
- throwNullableException(colIndex);
- }
- result[rowIndex] = null;
- continue;
- }
- BigDecimal value = decimalVector.getObject(rowIndex);
- result[rowIndex] = DecimalData.fromBigDecimal(value, value.precision(), value.scale());
- }
- return result;
- }
- }
-
- private void throwNullableException(int colIndex) {
- throw new RuntimeException("Data could not be null. please check create table SQL, column index is: " + colIndex);
- }
-}
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceBeReader.java b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceBeReader.java
index cd182df2..15896276 100644
--- a/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceBeReader.java
+++ b/src/main/java/com/starrocks/connector/flink/table/source/StarRocksSourceBeReader.java
@@ -19,9 +19,7 @@
import com.starrocks.connector.flink.row.source.ArrowFieldConverter;
import com.starrocks.connector.flink.row.source.StarRocksSourceFlinkRows;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
-import com.starrocks.connector.flink.table.source.struct.Const;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
-import com.starrocks.connector.flink.table.source.struct.StarRocksSchema;
import com.starrocks.shade.org.apache.thrift.TException;
import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol;
import com.starrocks.shade.org.apache.thrift.protocol.TProtocol;
@@ -56,7 +54,6 @@ public class StarRocksSourceBeReader implements StarRocksSourceDataReader, Seria
private final SelectColumn[] selectColumns;
private String contextId;
private int readerOffset = 0;
- private StarRocksSchema srSchema;
private StarRocksSourceFlinkRows curFlinkRows;
private GenericRowData curData;
@@ -107,7 +104,7 @@ public void openScanner(List tablets, String opaqued_query_plan, StarRocks
TScanOpenParams params = new TScanOpenParams();
params.setTablet_ids(tablets);
params.setOpaqued_query_plan(opaqued_query_plan);
- params.setCluster(Const.DEFAULT_CLUSTER_NAME);
+ params.setCluster("default_cluster");
params.setDatabase(sourceOptions.getDatabaseName());
params.setTable(sourceOptions.getTableName());
params.setUser(sourceOptions.getUsername());
@@ -136,7 +133,6 @@ public void openScanner(List tablets, String opaqued_query_plan, StarRocks
} catch (TException e) {
throw new RuntimeException("Failed to open scanner." + e.getMessage());
}
- this.srSchema = StarRocksSchema.genSchema(result.getSelected_columns());
this.contextId = result.getContext_id();
LOG.info("Open scanner for {}:{} with context id {}, and there are {} tablets {}",
IP, PORT, contextId, tablets.size(), tablets);
@@ -184,7 +180,7 @@ public GenericRowData getNext() {
private void handleResult(TScanBatchResult result) {
try {
- curFlinkRows = new StarRocksSourceFlinkRows(result, columnRichInfos, srSchema, selectColumns);
+ curFlinkRows = new StarRocksSourceFlinkRows(result, columnRichInfos, selectColumns);
curFlinkRows.init(fieldConverters);
} catch (IOException e) {
throw new RuntimeException(e.getMessage());
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/struct/Column.java b/src/main/java/com/starrocks/connector/flink/table/source/struct/Column.java
deleted file mode 100644
index 921f3a89..00000000
--- a/src/main/java/com/starrocks/connector/flink/table/source/struct/Column.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.starrocks.connector.flink.table.source.struct;
-
-import javax.annotation.Nullable;
-
-import java.util.Optional;
-
-public class Column {
- private String name;
- @Nullable
- private String type;
- private String comment;
- private int precision;
- private int scale;
-
- public Column() {
- }
-
- public Column(String name, @Nullable String type, String comment, int precision, int scale) {
- this.name = name;
- this.type = type;
- this.comment = comment;
- this.precision = precision;
- this.scale = scale;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public Optional getType() {
- return Optional.ofNullable(type);
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public String getComment() {
- return comment;
- }
-
- public void setComment(String comment) {
- this.comment = comment;
- }
-
- public int getPrecision() {
- return precision;
- }
-
- public void setPrecision(int precision) {
- this.precision = precision;
- }
-
- public int getScale() {
- return scale;
- }
-
- public void setScale(int scale) {
- this.scale = scale;
- }
-}
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/struct/Const.java b/src/main/java/com/starrocks/connector/flink/table/source/struct/Const.java
deleted file mode 100644
index eed51a46..00000000
--- a/src/main/java/com/starrocks/connector/flink/table/source/struct/Const.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.starrocks.connector.flink.table.source.struct;
-
-import java.util.HashMap;
-
-
-import com.starrocks.connector.flink.row.source.StarRocksToFlinkTrans;
-import com.starrocks.connector.flink.row.source.StarRocksToFlinkTranslators;
-
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
-
-public class Const {
-
- public static String DEFAULT_CLUSTER_NAME = "default_cluster";
-
- // StarRocks
- public static final String DATA_TYPE_STARROCKS_DATE = "DATE";
- public static final String DATA_TYPE_STARROCKS_DATETIME = "DATETIME";
-
- public static final String DATA_TYPE_STARROCKS_CHAR = "CHAR";
- public static final String DATA_TYPE_STARROCKS_VARCHAR = "VARCHAR";
-
- public static final String DATA_TYPE_STARROCKS_BOOLEAN = "BOOLEAN";
-
- public static final String DATA_TYPE_STARROCKS_TINYINT = "TINYINT";
- public static final String DATA_TYPE_STARROCKS_SMALLINT = "SMALLINT";
- public static final String DATA_TYPE_STARROCKS_INT = "INT";
- public static final String DATA_TYPE_STARROCKS_BIGINT = "BIGINT";
- public static final String DATA_TYPE_STARROCKS_LARGEINT = "LARGEINT";
-
- public static final String DATA_TYPE_STARROCKS_FLOAT = "FLOAT";
- public static final String DATA_TYPE_STARROCKS_DOUBLE = "DOUBLE";
- public static final String DATA_TYPE_STARROCKS_DECIMAL = "DECIMAL";
- public static final String DATA_TYPE_STARROCKS_DECIMALV2 = "DECIMALV2";
- public static final String DATA_TYPE_STARROCKS_DECIMAL32 = "DECIMAL32";
- public static final String DATA_TYPE_STARROCKS_DECIMAL64 = "DECIMAL64";
- public static final String DATA_TYPE_STARROCKS_DECIMAL128 = "DECIMAL128";
-
-
- public static HashMap> DataTypeRelationMap = new HashMap>(){{
-
- put(LogicalTypeRoot.DATE, new HashMap() {{
- put(DATA_TYPE_STARROCKS_DATE, new StarRocksToFlinkTranslators().new ToFlinkDate());
- }
- });
- put(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, new HashMap() {{
- put(DATA_TYPE_STARROCKS_DATETIME, new StarRocksToFlinkTranslators().new ToFlinkTimestamp());
- }
- });
- put(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, new HashMap() {{
- put(DATA_TYPE_STARROCKS_DATETIME, new StarRocksToFlinkTranslators().new ToFlinkTimestamp());
- }
- });
- put(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, new HashMap() {{
- put(DATA_TYPE_STARROCKS_DATETIME, new StarRocksToFlinkTranslators().new ToFlinkTimestamp());
- }
- });
- put(LogicalTypeRoot.CHAR, new HashMap() {{
- put(DATA_TYPE_STARROCKS_CHAR, new StarRocksToFlinkTranslators().new ToFlinkChar());
- }
- });
- put(LogicalTypeRoot.VARCHAR, new HashMap() {{
- put(DATA_TYPE_STARROCKS_VARCHAR, new StarRocksToFlinkTranslators().new ToFlinkChar());
- put(DATA_TYPE_STARROCKS_LARGEINT, new StarRocksToFlinkTranslators().new ToFlinkChar());
- }
- });
- put(LogicalTypeRoot.BOOLEAN, new HashMap() {{
- put(DATA_TYPE_STARROCKS_BOOLEAN, new StarRocksToFlinkTranslators().new ToFlinkBoolean());
- }
- });
- put(LogicalTypeRoot.TINYINT, new HashMap() {{
- put(DATA_TYPE_STARROCKS_TINYINT, new StarRocksToFlinkTranslators().new ToFlinkTinyInt());
- }
- });
- put(LogicalTypeRoot.SMALLINT, new HashMap() {{
- put(DATA_TYPE_STARROCKS_SMALLINT, new StarRocksToFlinkTranslators().new ToFlinkSmallInt());
- }
- });
- put(LogicalTypeRoot.INTEGER, new HashMap() {{
- put(DATA_TYPE_STARROCKS_INT, new StarRocksToFlinkTranslators().new ToFlinkInt());
- }
- });
- put(LogicalTypeRoot.BIGINT, new HashMap() {{
- put(DATA_TYPE_STARROCKS_BIGINT, new StarRocksToFlinkTranslators().new ToFlinkBigInt());
- }
- });
- put(LogicalTypeRoot.FLOAT, new HashMap() {{
- put(DATA_TYPE_STARROCKS_FLOAT, new StarRocksToFlinkTranslators().new ToFlinkFloat());
- }
- });
- put(LogicalTypeRoot.DOUBLE, new HashMap() {{
- put(DATA_TYPE_STARROCKS_DOUBLE, new StarRocksToFlinkTranslators().new ToFlinkDouble());
- }
- });
- put(LogicalTypeRoot.DECIMAL, new HashMap() {{
- put(DATA_TYPE_STARROCKS_DECIMAL, new StarRocksToFlinkTranslators().new ToFlinkDecimal());
- put(DATA_TYPE_STARROCKS_DECIMALV2, new StarRocksToFlinkTranslators().new ToFlinkDecimal());
- put(DATA_TYPE_STARROCKS_DECIMAL32, new StarRocksToFlinkTranslators().new ToFlinkDecimal());
- put(DATA_TYPE_STARROCKS_DECIMAL64, new StarRocksToFlinkTranslators().new ToFlinkDecimal());
- put(DATA_TYPE_STARROCKS_DECIMAL128, new StarRocksToFlinkTranslators().new ToFlinkDecimal());
- }
- });
- }};
-}
diff --git a/src/main/java/com/starrocks/connector/flink/table/source/struct/StarRocksSchema.java b/src/main/java/com/starrocks/connector/flink/table/source/struct/StarRocksSchema.java
deleted file mode 100644
index b75e0ebc..00000000
--- a/src/main/java/com/starrocks/connector/flink/table/source/struct/StarRocksSchema.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.starrocks.connector.flink.table.source.struct;
-
-import com.starrocks.thrift.TScanColumnDesc;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-
-public class StarRocksSchema {
-
- private HashMap schemaMap;
- private List columns;
-
- public StarRocksSchema() {
- schemaMap = new HashMap<>();
- columns = new LinkedList<>();
- }
-
- public HashMap getSchema() {
- return schemaMap;
- }
-
- public List getColumns() {
- return columns;
- }
-
- public Column getColumn(int idx) {
- return columns.get(idx);
- }
-
- public void setSchema(HashMap schema) {
- this.schemaMap = schema;
- }
-
- public void put(String name, String type, String comment, int scale, int precision) {
- Column column = new Column(name, type, comment, scale, precision);
- columns.add(column);
- schemaMap.put(name, column);
- }
-
- public void put(Column column) {
- columns.add(column);
- schemaMap.put(column.getName(), column);
- }
-
- public Column get(String columnName) {
- return schemaMap.get(columnName);
- }
-
- public int size() {
- return schemaMap.size();
- }
-
- public static StarRocksSchema genSchema(List tscanColumnDescs) {
- StarRocksSchema schema = new StarRocksSchema();
- for (TScanColumnDesc desc : tscanColumnDescs) {
- // The type for some columns may be null, such as json column
- schema.put(desc.getName(), desc.getType() == null ? null : desc.getType().name(), "", 0, 0);
- }
- return schema;
- }
-}
diff --git a/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java b/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java
index 35674a55..43634d44 100644
--- a/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java
+++ b/src/test/java/com/starrocks/connector/flink/it/StarRocksITTestBase.java
@@ -44,7 +44,7 @@ public abstract class StarRocksITTestBase {
private static final Logger LOG = LoggerFactory.getLogger(StarRocksITTestBase.class);
- private static final boolean DEBUG_MODE = true;
+ private static final boolean DEBUG_MODE = false;
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
diff --git a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java
index d7b3aa9f..24c9cfc0 100644
--- a/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java
+++ b/src/test/java/com/starrocks/connector/flink/it/source/StarRocksSourceITTest.java
@@ -22,32 +22,150 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
import com.starrocks.connector.flink.it.StarRocksITTestBase;
import org.junit.Test;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
public class StarRocksSourceITTest extends StarRocksITTestBase {
@Test
public void testArrayType() throws Exception {
+ String tableName = createComplexTypeTable("testComplexType");
+ executeSrSQL(String.format("INSERT INTO `%s`.`%s` VALUES (%s)", DB_NAME, tableName,
+ "0, [true], [2], [3], [4], [5], [6.6], [7.7], [8.8], ['2024-03-22'], ['2024-03-22 12:00:00'], ['11'], ['12'], " +
+ "[[true], [true]], [[2], [2]], [[3], [3]], [[4], [4]], [[5], [5]], [[6.6], [6.6]], " +
+ "[[7.7], [7.7]], [[8.8], [8.8]], [['2024-03-22'], ['2024-03-22']], " +
+ "[['2024-03-22 12:00:00'], ['2024-03-22 12:00:00']], [['11'], ['11']], [['12'], ['12']]"
+ )
+ );
+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
String createSrcSQL = "CREATE TABLE sr_src(" +
- "c0 INT," +
- "c1 ARRAY," +
- "c2 ARRAY>" +
+ "c0 INT," +
+ "c1 ARRAY," +
+ "c2 ARRAY," +
+ "c3 ARRAY," +
+ "c4 ARRAY," +
+ "c5 ARRAY," +
+ "c6 ARRAY," +
+ "c7 ARRAY," +
+ "c8 ARRAY," +
+ "c9 ARRAY," +
+ "c10 ARRAY," +
+ "c11 ARRAY," +
+ "c12 ARRAY," +
+ "c13 ARRAY>," +
+ "c14 ARRAY>," +
+ "c15 ARRAY>," +
+ "c16 ARRAY>," +
+ "c17 ARRAY>," +
+ "c18 ARRAY>," +
+ "c19 ARRAY>," +
+ "c20 ARRAY>," +
+ "c21 ARRAY>," +
+ "c22 ARRAY>," +
+ "c23 ARRAY>," +
+ "c24 ARRAY>" +
") WITH ( " +
"'connector' = 'starrocks'," +
"'jdbc-url'='" + getJdbcUrl() + "'," +
"'scan-url'='" + String.join(";", getHttpUrls()) + "'," +
- "'database-name' = 'test'," +
- "'table-name' = 't0'," +
+ "'database-name' = '" + DB_NAME + "'," +
+ "'table-name' = '" + tableName + "'," +
"'username' = 'root'," +
"'password' = ''" +
")";
tEnv.executeSql(createSrcSQL);
- tEnv.executeSql("SELECT * FROM sr_src;").print();
+ List results =
+ CollectionUtil.iteratorToList(
+ tEnv.executeSql("SELECT * FROM sr_src").collect());
+ Row row = Row.of(
+ 0,
+ new Boolean[] {true},
+ new Byte[] {(byte) 2},
+ new Short[] {(short) 3},
+ new Integer[] {4},
+ new Long[] {5L},
+ new Float[] {6.6f},
+ new Double[] {7.7},
+ new BigDecimal[] {new BigDecimal("8.8000000000")},
+ new LocalDate[] {LocalDate.of(2024, 3, 22)},
+ new LocalDateTime[] {LocalDateTime.of(2024, 3, 22, 12, 0, 0)},
+ new String[] {"11"},
+ new String[] {"12"},
+ new Boolean[][] {{true}, {true}},
+ new Byte[][] {{(byte) 2}, {(byte) 2}},
+ new Short[][] {{(short) 3}, {(short) 3}},
+ new Integer[][] {{4}, {4}},
+ new Long[][] {{5L}, {5L}},
+ new Float[][] {{6.6f}, {6.6f}},
+ new Double[][] {{7.7}, {7.7}},
+ new BigDecimal[][] {{new BigDecimal("8.8000000000")}, {new BigDecimal("8.8000000000")}},
+ new LocalDate[][] {
+ {LocalDate.of(2024, 3, 22)},
+ {LocalDate.of(2024, 3, 22)}
+ },
+ new LocalDateTime[][] {
+ {LocalDateTime.of(2024, 3, 22, 12, 0, 0)},
+ {LocalDateTime.of(2024, 3, 22, 12, 0, 0)},
+ },
+ new String[][] {{"11"}, {"11"}},
+ new String[][] {{"12"}, {"12"}}
+ );
+ assertThat(results).containsExactlyInAnyOrderElementsOf(Collections.singleton(row));
+ }
+
+ private String createComplexTypeTable(String tablePrefix) throws Exception {
+ String tableName = tablePrefix + "_" + genRandomUuid();
+ String createStarRocksTable =
+ String.format(
+ "CREATE TABLE `%s`.`%s` (" +
+ "c0 INT," +
+ "c1 ARRAY," +
+ "c2 ARRAY," +
+ "c3 ARRAY," +
+ "c4 ARRAY," +
+ "c5 ARRAY," +
+ "c6 ARRAY," +
+ "c7 ARRAY," +
+ "c8 ARRAY," +
+ "c9 ARRAY," +
+ "c10 ARRAY," +
+ "c11 ARRAY," +
+ "c12 ARRAY," +
+ "c13 ARRAY>," +
+ "c14 ARRAY>," +
+ "c15 ARRAY>," +
+ "c16 ARRAY>," +
+ "c17 ARRAY>," +
+ "c18 ARRAY>," +
+ "c19 ARRAY>," +
+ "c20 ARRAY>," +
+ "c21 ARRAY>," +
+ "c22 ARRAY>," +
+ "c23 ARRAY>," +
+ "c24 ARRAY>" +
+ ") ENGINE = OLAP " +
+ "DUPLICATE KEY(c0) " +
+ "DISTRIBUTED BY HASH (c0) BUCKETS 8 " +
+ "PROPERTIES (" +
+ "\"replication_num\" = \"1\"" +
+ ")",
+ DB_NAME, tableName);
+ executeSrSQL(createStarRocksTable);
+ return tableName;
}
}
diff --git a/src/test/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRowsTest.java b/src/test/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRowsTest.java
index 7b8ab51b..49b49d0e 100644
--- a/src/test/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRowsTest.java
+++ b/src/test/java/com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRowsTest.java
@@ -14,26 +14,23 @@
package com.starrocks.connector.flink.row.source;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+
import com.starrocks.connector.flink.it.source.StarRocksSourceBaseTest;
import com.starrocks.connector.flink.table.source.StarRocksSourceCommonFunc;
import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
-import com.starrocks.connector.flink.table.source.struct.Const;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
-import com.starrocks.connector.flink.table.source.struct.StarRocksSchema;
import com.starrocks.thrift.TPrimitiveType;
import com.starrocks.thrift.TScanBatchResult;
import com.starrocks.thrift.TScanColumnDesc;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.BufferedReader;
-import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
@@ -42,49 +39,43 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class StarRocksSourceFlinkRowsTest extends StarRocksSourceBaseTest {
-
- protected StarRocksSchema srSchema = new StarRocksSchema();
- protected StarRocksSchema srWrongOrderSchema = new StarRocksSchema();
- protected StarRocksSchema srLessSchema = new StarRocksSchema();
+
protected SelectColumn[] selectColumns;
protected List columnRichInfos;
protected String curPath = System.getProperty("user.dir");
protected Map columnMap;
@Before
- public void initParams() {
-
- srSchema = StarRocksSchema.genSchema(this.rightOrderList());
- srWrongOrderSchema = StarRocksSchema.genSchema(this.wrongOrderList());
- srLessSchema = StarRocksSchema.genSchema(this.lessList());
+ public void initParams() {
columnMap = StarRocksSourceCommonFunc.genColumnMap(TABLE_SCHEMA_NOT_NULL);
columnRichInfos = StarRocksSourceCommonFunc.genColumnRichInfo(columnMap);
selectColumns = StarRocksSourceCommonFunc.genSelectedColumns(columnMap, OPTIONS, columnRichInfos);
}
@Test
- public void testGenFlinkRows() throws FileNotFoundException, IOException {
+ public void testGenFlinkRows() throws IOException {
String fileName = curPath + "/src/test/resources/data/source/rowsData";
String line;
try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
line = br.readLine();
}
- assertTrue(line != null);
- String dataStrArray[] = line.split(",");
+ assertNotNull(line);
+ String[] dataStrArray = line.split(",");
ArrayList byteList = new ArrayList<>();
- for (int i = 0; i < dataStrArray.length; i ++) {
- byteList.add((byte)Integer.parseInt(dataStrArray[i].trim()));
+ for (String s : dataStrArray) {
+ byteList.add((byte) Integer.parseInt(s.trim()));
}
byte[] byteArray = new byte[byteList.size()];
for (int i = 0; i < byteArray.length; i ++) {
- byteArray[i] = byteList.get(i).byteValue();
+ byteArray[i] = byteList.get(i);
}
TScanBatchResult nextResult = new TScanBatchResult();
nextResult.setRows(byteArray);
- StarRocksSourceFlinkRows flinkRows1 = new StarRocksSourceFlinkRows(nextResult, columnRichInfos, srSchema, selectColumns);
+ StarRocksSourceFlinkRows flinkRows1 = new StarRocksSourceFlinkRows(nextResult, columnRichInfos, selectColumns);
List fieldConverters = new ArrayList<>();
flinkRows1.init(fieldConverters);
checkFlinkRows(flinkRows1);
@@ -160,33 +151,33 @@ public void checkFlinkRows(StarRocksSourceFlinkRows flinkRows) {
}
@Test
- public void testGenFlinkRowsWithNull() throws FileNotFoundException, IOException {
+ public void testGenFlinkRowsWithNull() throws IOException {
String fileName = curPath + "/src/test/resources/data/source/rowsDataWithNull";
String line;
try (BufferedReader br = new BufferedReader(new FileReader(fileName))){
line = br.readLine();
}
- assertTrue(line != null);
- String dataStrArray[] = line.split(",");
+ assertNotNull(line);
+ String[] dataStrArray = line.split(",");
ArrayList byteList = new ArrayList<>();
- for (int i = 0; i < dataStrArray.length; i ++) {
- byteList.add((byte)Integer.parseInt(dataStrArray[i].trim()));
+ for (String s : dataStrArray) {
+ byteList.add((byte) Integer.parseInt(s.trim()));
}
byte[] byteArray = new byte[byteList.size()];
for (int i = 0; i < byteArray.length; i ++) {
- byteArray[i] = byteList.get(i).byteValue();
+ byteArray[i] = byteList.get(i);
}
TScanBatchResult nextResult = new TScanBatchResult();
nextResult.setRows(byteArray);
- StarRocksSourceFlinkRows flinkRows = new StarRocksSourceFlinkRows(nextResult, columnRichInfos, srSchema, selectColumns);
+ StarRocksSourceFlinkRows flinkRows = new StarRocksSourceFlinkRows(nextResult, columnRichInfos, selectColumns);
String eMsg = null;
try {
List fieldConverters = new ArrayList<>();
flinkRows.init(fieldConverters);
} catch (Exception e){
- eMsg = e.getMessage();
+ eMsg = e.getCause().getMessage();
}
- assertTrue(eMsg.contains("Data could not be null. please check create table SQL, column index is"));
+ assertTrue(eMsg.contains("The value is null for a non-nullable column"));
}
@Test
@@ -205,126 +196,6 @@ public void testParallel() {
});
}
- @Test
- public void testDataTypeTrans() {
-
- Const.DataTypeRelationMap.entrySet().stream().forEach(entry -> {
- if (entry.getKey().equals(LogicalTypeRoot.DATE)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DATE)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkDate);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DATETIME)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkTimestamp);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DATETIME)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkTimestamp);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DATETIME)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkTimestamp);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.CHAR)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_CHAR)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkChar);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.VARCHAR)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_VARCHAR)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkChar);
- }
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_LARGEINT)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkChar);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.BOOLEAN)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_BOOLEAN)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkBoolean);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.TINYINT)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_TINYINT)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkTinyInt);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.SMALLINT)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_SMALLINT)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkSmallInt);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.INTEGER)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_INT)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkInt);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.BIGINT)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_BIGINT)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkBigInt);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.FLOAT)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_FLOAT)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkFloat);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.DOUBLE)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DOUBLE)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkDouble);
- }
- });
- }
- if (entry.getKey().equals(LogicalTypeRoot.DECIMAL)) {
- entry.getValue().entrySet().stream().forEach(type -> {
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DECIMAL)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkDecimal);
- }
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DECIMALV2)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkDecimal);
- }
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DECIMAL32)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkDecimal);
- }
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DECIMAL64)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkDecimal);
- }
- if (type.getKey().equals(Const.DATA_TYPE_STARROCKS_DECIMAL128)) {
- assertTrue(type.getValue() instanceof StarRocksToFlinkTranslators.ToFlinkDecimal);
- }
- });
- }
- });
- }
-
public List rightOrderList() {
List list = new ArrayList<>();
// date_1