Skip to content

Commit

Permalink
Fix compile failure
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Mar 24, 2024
1 parent 2e15487 commit 5ca71a0
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 840 deletions.
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,13 @@ limitations under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-17.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.jmockit</groupId>
<artifactId>jmockit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,28 @@
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

import com.starrocks.connector.flink.tools.DataUtil;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DecimalVector;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;

Expand All @@ -46,11 +60,13 @@
// Converts arrow field data to Flink data.
public interface ArrowFieldConverter {

String DATETIME_FORMAT_LONG = "yyyy-MM-dd HH:mm:ss.SSSSSS";
String DATETIME_FORMAT_SHORT = "yyyy-MM-dd HH:mm:ss";
String DATE_FORMAT = "yyyy-MM-dd";

Object convert(Object arrowData);
/**
* Convert the data at the given position of the arrow vector to Flink data.
* @param vector the input arrow vector
* @param rowIndex the position of the data
* @return the flink data
*/
Object convert(FieldVector vector, int rowIndex);

static void checkNullable(boolean isNullable, Object obj) {
if (obj == null && !isNullable) {
Expand All @@ -68,43 +84,141 @@ public BooleanConverter(boolean isNullable) {
}

@Override
public Object convert(Object arrowData) {
checkNullable(isNullable, arrowData);
return ((int) arrowData) != 0;
public Object convert(FieldVector vector, int rowIndex) {
BitVector bitVector = (BitVector) vector;
Object value = bitVector.isNull(rowIndex) ? null : bitVector.get(rowIndex) != 0;
checkNullable(isNullable, value);
return value;
}
}

// Convert from arrow tinyint to flink tinyint
class TinyIntConverter implements ArrowFieldConverter {

private final boolean isNullable;

public TinyIntConverter(boolean isNullable) {
this.isNullable = isNullable;
}

@Override
public Object convert(FieldVector vector, int rowIndex) {
TinyIntVector tinyIntVector = (TinyIntVector) vector;
Object value = tinyIntVector.isNull(rowIndex) ? null : tinyIntVector.get(rowIndex);
checkNullable(isNullable, value);
return value;
}
}

// Convert from arrow smallint to flink smallint
class SmallIntConverter implements ArrowFieldConverter {

private final boolean isNullable;

public SmallIntConverter(boolean isNullable) {
this.isNullable = isNullable;
}

@Override
public Object convert(FieldVector vector, int rowIndex) {
SmallIntVector smallIntVector = (SmallIntVector) vector;
Object value = smallIntVector.isNull(rowIndex) ? null : smallIntVector.get(rowIndex);
checkNullable(isNullable, value);
return value;
}
}

// Convert from arrow numeric type to flink numeric type,
// including tinyint, smallint, int, bigint, float, double
class NumericConverter implements ArrowFieldConverter {
// Convert from arrow int to flink int
class IntConverter implements ArrowFieldConverter {

private final boolean isNullable;

public NumericConverter(boolean isNullable) {
public IntConverter(boolean isNullable) {
this.isNullable = isNullable;
}

@Override
public Object convert(Object arrowData) {
checkNullable(isNullable, arrowData);
return arrowData;
public Object convert(FieldVector vector, int rowIndex) {
IntVector intVector = (IntVector) vector;
Object value = intVector.isNull(rowIndex) ? null : intVector.get(rowIndex);
checkNullable(isNullable, value);
return value;
}
}

// Convert from arrow bigint to flink bigint
class BigIntConverter implements ArrowFieldConverter {

private final boolean isNullable;

public BigIntConverter(boolean isNullable) {
this.isNullable = isNullable;
}

@Override
public Object convert(FieldVector vector, int rowIndex) {
BigIntVector bigIntVector = (BigIntVector) vector;
Object value = bigIntVector.isNull(rowIndex) ? null : bigIntVector.get(rowIndex);
checkNullable(isNullable, value);
return value;
}
}

// Convert from arrow float to flink float
class FloatConverter implements ArrowFieldConverter {

private final boolean isNullable;

public FloatConverter(boolean isNullable) {
this.isNullable = isNullable;
}

@Override
public Object convert(FieldVector vector, int rowIndex) {
Float4Vector floatVector = (Float4Vector) vector;
Object value = floatVector.isNull(rowIndex) ? null : floatVector.get(rowIndex);
checkNullable(isNullable, value);
return value;
}
}

// Convert from arrow double to flink double
class DoubleConverter implements ArrowFieldConverter {

private final boolean isNullable;

public DoubleConverter(boolean isNullable) {
this.isNullable = isNullable;
}

@Override
public Object convert(FieldVector vector, int rowIndex) {
Float8Vector floatVector = (Float8Vector) vector;
Object value = floatVector.isNull(rowIndex) ? null : floatVector.get(rowIndex);
checkNullable(isNullable, value);
return value;
}
}

// Convert from arrow decimal type to flink decimal
class DecimalConverter implements ArrowFieldConverter {

private final int precision;
private final int scale;
private final boolean isNullable;

public DecimalConverter(boolean isNullable) {
public DecimalConverter(int precision, int scale, boolean isNullable) {
this.precision = precision;
this.scale = scale;
this.isNullable = isNullable;
}

@Override
public Object convert(Object arrowData) {
checkNullable(isNullable, arrowData);
BigDecimal value = (BigDecimal) arrowData;
return DecimalData.fromBigDecimal(value, value.precision(), value.scale());
public Object convert(FieldVector vector, int rowIndex) {
DecimalVector decimalVector = (DecimalVector) vector;
BigDecimal value = decimalVector.isNull(rowIndex) ? null : decimalVector.getObject(rowIndex);
checkNullable(isNullable, value);
return value == null ? null : DecimalData.fromBigDecimal(value, precision, scale);
}
}

Expand All @@ -118,25 +232,33 @@ public StringConverter(boolean isNullable) {
}

@Override
public Object convert(Object arrowData) {
checkNullable(isNullable, arrowData);
return new String((byte[]) arrowData);
public Object convert(FieldVector vector, int rowIndex) {
VarCharVector varCharVector = (VarCharVector) vector;
String value = varCharVector.isNull(rowIndex) ? null : new String(varCharVector.get(rowIndex));
checkNullable(isNullable, value);
return StringData.fromString(value);
}
}

// Convert from arrow varchar to flink date
class DateConverter implements ArrowFieldConverter {

private static final String DATE_FORMAT = "yyyy-MM-dd";

private final boolean isNullable;

public DateConverter(boolean isNullable) {
this.isNullable = isNullable;
}

@Override
public Object convert(Object arrowData) {
checkNullable(isNullable, arrowData);
String value = new String((byte[]) arrowData);
public Object convert(FieldVector vector, int rowIndex) {
VarCharVector varCharVector = (VarCharVector) vector;
String value = varCharVector.isNull(rowIndex) ? null : new String(varCharVector.get(rowIndex));
checkNullable(isNullable, value);
if (value == null) {
return null;
}
LocalDate date = LocalDate.parse(value, DateTimeFormatter.ofPattern(DATE_FORMAT));
return (int) date.atStartOfDay().toLocalDate().toEpochDay();
}
Expand All @@ -145,16 +267,23 @@ public Object convert(Object arrowData) {
// Convert from arrow varchar to flink timestamp-related type
class TimestampConverter implements ArrowFieldConverter {

private static final String DATETIME_FORMAT_LONG = "yyyy-MM-dd HH:mm:ss.SSSSSS";
private static final String DATETIME_FORMAT_SHORT = "yyyy-MM-dd HH:mm:ss";

private final boolean isNullable;

public TimestampConverter(boolean isNullable) {
this.isNullable = isNullable;
}

@Override
public Object convert(Object arrowData) {
checkNullable(isNullable, arrowData);
String value = new String((byte[]) arrowData);
public Object convert(FieldVector vector, int rowIndex) {
VarCharVector varCharVector = (VarCharVector) vector;
String value = varCharVector.isNull(rowIndex) ? null : new String(varCharVector.get(rowIndex));
checkNullable(isNullable, value);
if (value == null) {
return null;
}
if (value.length() < DATETIME_FORMAT_SHORT.length()) {
throw new IllegalArgumentException("Date value length shorter than DATETIME_FORMAT_SHORT");
}
Expand All @@ -180,13 +309,19 @@ public ArrayConverter(boolean isNullable, ArrowFieldConverter elementConverter)
}

@Override
public Object convert(Object arrowData) {
checkNullable(isNullable, arrowData);
List<?> list = (List<?> ) arrowData;
Object[] data = new Object[list.size()];
public Object convert(FieldVector vector, int rowIndex) {
ListVector listVector = (ListVector) vector;
if (listVector.isNull(rowIndex)) {
checkNullable(isNullable, null);
return null;
}

int index = rowIndex * ListVector.OFFSET_WIDTH;
int start = listVector.getOffsetBuffer().getInt(index);
int end = listVector.getOffsetBuffer().getInt(index + ListVector.OFFSET_WIDTH);
Object[] data = new Object[end - start];
for (int i = 0; i < data.length; i++) {
Object obj = list.get(i);
data[i] = obj == null ? null : elementConverter.convert(obj);
data[i] = elementConverter.convert(listVector.getDataVector(), start + i);
}
return new GenericArrayData(data);
}
Expand All @@ -204,19 +339,23 @@ public StructConverter(boolean isNullable, List<ArrowFieldConverter> childConver
}

@Override
public Object convert(Object arrowData) {
checkNullable(isNullable, arrowData);
List<?> list = (List<?> ) arrowData;
GenericRowData rowData = new GenericRowData(list.size());
for (int i = 0; i < list.size(); i++) {
Object obj = list.get(i);
rowData.setField(i, obj == null ? null : childConverters.get(i).convert(obj));
public Object convert(FieldVector vector, int rowIndex) {
StructVector structVector = (StructVector) vector;
if (structVector.isNull(rowIndex)) {
checkNullable(isNullable, null);
return null;
}

GenericRowData rowData = new GenericRowData(structVector.size());
for (int i = 0; i < structVector.size(); i++) {
Object obj = childConverters.get(i).convert((FieldVector) structVector.getVectorById(i), rowIndex);
rowData.setField(i, obj);
}
return rowData;
}
}

Map<LogicalTypeRoot, Types.MinorType> FLINK_AND_ARROW_TYPE_MAPPINGS = new HashMap<LogicalTypeRoot, Types.MinorType>() {{
Map<LogicalTypeRoot, Types.MinorType> FLINK_AND_ARROW_TYPE_MAPPING = new HashMap<LogicalTypeRoot, Types.MinorType>() {{
put(LogicalTypeRoot.BOOLEAN, Types.MinorType.BIT);
put(LogicalTypeRoot.TINYINT, Types.MinorType.TINYINT);
put(LogicalTypeRoot.SMALLINT, Types.MinorType.SMALLINT);
Expand All @@ -236,7 +375,7 @@ public Object convert(Object arrowData) {
}};

static void checkTypeCompatible(LogicalType flinkType, Field field) {
Types.MinorType expectMinorType = FLINK_AND_ARROW_TYPE_MAPPINGS.get(flinkType.getTypeRoot());
Types.MinorType expectMinorType = FLINK_AND_ARROW_TYPE_MAPPING.get(flinkType.getTypeRoot());
if (expectMinorType == null) {
throw new UnsupportedOperationException(String.format(
"Flink type %s is not supported, and arrow type is %s",
Expand All @@ -258,14 +397,20 @@ static ArrowFieldConverter createConverter(LogicalType flinkType, Field arrowFie
case BOOLEAN:
return new BooleanConverter(flinkType.isNullable());
case TINYINT:
return new TinyIntConverter(flinkType.isNullable());
case SMALLINT:
return new SmallIntConverter(flinkType.isNullable());
case INTEGER:
return new IntConverter(flinkType.isNullable());
case BIGINT:
return new BigIntConverter(flinkType.isNullable());
case FLOAT:
return new FloatConverter(flinkType.isNullable());
case DOUBLE:
return new NumericConverter(flinkType.isNullable());
return new DoubleConverter(flinkType.isNullable());
case DECIMAL:
return new DecimalConverter(flinkType.isNullable());
DecimalType type = (DecimalType) flinkType;
return new DecimalConverter(type.getPrecision(), type.getScale(), flinkType.isNullable());
case DATE:
return new DateConverter(flinkType.isNullable());
case TIMESTAMP_WITHOUT_TIME_ZONE:
Expand Down
Loading

0 comments on commit 5ca71a0

Please sign in to comment.