From 706500c09200d0e447e3cc75945abe57378edc2d Mon Sep 17 00:00:00 2001 From: PengFei Li Date: Mon, 25 Mar 2024 00:52:04 +0800 Subject: [PATCH] Support map type Signed-off-by: PengFei Li --- .../flink/row/source/ArrowFieldConverter.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java b/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java index 20c91df3..464b08fa 100644 --- a/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java +++ b/src/main/java/com/starrocks/connector/flink/row/source/ArrowFieldConverter.java @@ -22,6 +22,7 @@ import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; @@ -29,6 +30,7 @@ import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.util.Preconditions; @@ -44,6 +46,7 @@ import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.Field; @@ -355,6 +358,41 @@ public Object convert(FieldVector vector, int rowIndex) { } } + // Convert from arrow map to flink map + class MapConverter implements ArrowFieldConverter { + + private final boolean isNullable; + private final ArrowFieldConverter keyConverter; + private final ArrowFieldConverter valueConverter; + + public MapConverter(boolean isNullable, ArrowFieldConverter keyConverter, ArrowFieldConverter valueConverter) { + this.isNullable = isNullable; + this.keyConverter = keyConverter; + this.valueConverter = valueConverter; + } + + @Override + public Object convert(FieldVector vector, int rowIndex) { + MapVector mapVector = (MapVector) vector; + if (mapVector.isNull(rowIndex)) { + checkNullable(isNullable, null); + return null; + } + + Map map = new HashMap<>(); + StructVector structVector = (StructVector) mapVector.getDataVector(); + int index = rowIndex * ListVector.OFFSET_WIDTH; + int start = mapVector.getOffsetBuffer().getInt(index); + int end = mapVector.getOffsetBuffer().getInt(index + ListVector.OFFSET_WIDTH); + for (int i = start; i < end; i++) { + Object key = keyConverter.convert((FieldVector) structVector.getVectorById(0), i); + Object value = valueConverter.convert((FieldVector) structVector.getVectorById(1), i); + map.put(key, value); + } + return new GenericMapData(map); + } + } + Map FLINK_AND_ARROW_TYPE_MAPPING = new HashMap() {{ put(LogicalTypeRoot.BOOLEAN, Types.MinorType.BIT); put(LogicalTypeRoot.TINYINT, Types.MinorType.TINYINT); @@ -372,6 +410,7 @@ public Object convert(FieldVector vector, int rowIndex) { put(LogicalTypeRoot.VARCHAR, Types.MinorType.VARCHAR); put(LogicalTypeRoot.ARRAY, Types.MinorType.LIST); put(LogicalTypeRoot.ROW, Types.MinorType.STRUCT); + put(LogicalTypeRoot.MAP, Types.MinorType.MAP); }}; static void checkTypeCompatible(LogicalType flinkType, Field field) { @@ -431,6 +470,14 @@ static ArrowFieldConverter createConverter(LogicalType flinkType, Field arrowFie fieldConverters.add(createConverter(rowType.getTypeAt(i), arrowField.getChildren().get(i))); } return new StructConverter(flinkType.isNullable(), fieldConverters); + case MAP: + MapType mapType = (MapType) flinkType; + Field structField = arrowField.getChildren().get(0); + ArrowFieldConverter keyConverter = createConverter( + mapType.getKeyType(), structField.getChildren().get(0)); + ArrowFieldConverter valueConverter = createConverter( + mapType.getKeyType(), structField.getChildren().get(1)); + return new MapConverter(mapType.isNullable(), keyConverter, valueConverter); default: throw new UnsupportedOperationException("Unsupported type " + flinkType); }