Skip to content

Commit

Permalink
Support map type
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Mar 24, 2024
1 parent 5ca71a0 commit 706500c
Showing 1 changed file with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@

import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

Expand All @@ -44,6 +46,7 @@
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.Field;
Expand Down Expand Up @@ -355,6 +358,41 @@ public Object convert(FieldVector vector, int rowIndex) {
}
}

// Convert from arrow map to flink map
class MapConverter implements ArrowFieldConverter {

private final boolean isNullable;
private final ArrowFieldConverter keyConverter;
private final ArrowFieldConverter valueConverter;

public MapConverter(boolean isNullable, ArrowFieldConverter keyConverter, ArrowFieldConverter valueConverter) {
this.isNullable = isNullable;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
}

@Override
public Object convert(FieldVector vector, int rowIndex) {
MapVector mapVector = (MapVector) vector;
if (mapVector.isNull(rowIndex)) {
checkNullable(isNullable, null);
return null;
}

Map<Object, Object> map = new HashMap<>();
StructVector structVector = (StructVector) mapVector.getDataVector();
int index = rowIndex * ListVector.OFFSET_WIDTH;
int start = mapVector.getOffsetBuffer().getInt(index);
int end = mapVector.getOffsetBuffer().getInt(index + ListVector.OFFSET_WIDTH);
for (int i = start; i < end; i++) {
Object key = keyConverter.convert((FieldVector) structVector.getVectorById(0), i);
Object value = valueConverter.convert((FieldVector) structVector.getVectorById(1), i);
map.put(key, value);
}
return new GenericMapData(map);
}
}

Map<LogicalTypeRoot, Types.MinorType> FLINK_AND_ARROW_TYPE_MAPPING = new HashMap<LogicalTypeRoot, Types.MinorType>() {{
put(LogicalTypeRoot.BOOLEAN, Types.MinorType.BIT);
put(LogicalTypeRoot.TINYINT, Types.MinorType.TINYINT);
Expand All @@ -372,6 +410,7 @@ public Object convert(FieldVector vector, int rowIndex) {
put(LogicalTypeRoot.VARCHAR, Types.MinorType.VARCHAR);
put(LogicalTypeRoot.ARRAY, Types.MinorType.LIST);
put(LogicalTypeRoot.ROW, Types.MinorType.STRUCT);
put(LogicalTypeRoot.MAP, Types.MinorType.MAP);
}};

static void checkTypeCompatible(LogicalType flinkType, Field field) {
Expand Down Expand Up @@ -431,6 +470,14 @@ static ArrowFieldConverter createConverter(LogicalType flinkType, Field arrowFie
fieldConverters.add(createConverter(rowType.getTypeAt(i), arrowField.getChildren().get(i)));
}
return new StructConverter(flinkType.isNullable(), fieldConverters);
case MAP:
MapType mapType = (MapType) flinkType;
Field structField = arrowField.getChildren().get(0);
ArrowFieldConverter keyConverter = createConverter(
mapType.getKeyType(), structField.getChildren().get(0));
ArrowFieldConverter valueConverter = createConverter(
mapType.getKeyType(), structField.getChildren().get(1));
return new MapConverter(mapType.isNullable(), keyConverter, valueConverter);
default:
throw new UnsupportedOperationException("Unsupported type " + flinkType);
}
Expand Down

0 comments on commit 706500c

Please sign in to comment.