Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to field filtering of complex union type in ORC #150

Merged
merged 4 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public TypeDescription type() {
public static final String ICEBERG_LONG_TYPE_ATTRIBUTE = "iceberg.long-type";
static final String ICEBERG_FIELD_LENGTH = "iceberg.length";

public static final String ICEBERG_UNION_TAG_FIELD_NAME = "tag";
private static final ImmutableMultimap<Type.TypeID, TypeDescription.Category> TYPE_MAPPING =
ImmutableMultimap.<Type.TypeID, TypeDescription.Category>builder()
.put(Type.TypeID.BOOLEAN, TypeDescription.Category.BOOLEAN)
Expand Down Expand Up @@ -381,9 +382,9 @@ private static TypeDescription buildOrcProjectForStructType(Integer fieldId, Typ

private static TypeDescription getOrcSchemaForUnionType(Type type, boolean isRequired, Map<Integer, OrcField> mapping,
OrcField orcField) {
TypeDescription orcType;

if (orcField.type.getChildren().size() == 1) { // single type union
orcType = TypeDescription.createUnion();
TypeDescription orcType = TypeDescription.createUnion();

TypeDescription childOrcStructType = TypeDescription.createStruct();
for (Types.NestedField nestedField : type.asStructType().fields()) {
Expand All @@ -399,13 +400,33 @@ private static TypeDescription getOrcSchemaForUnionType(Type type, boolean isReq
}

orcType.addUnionChild(childOrcStructType);
return orcType;
} else { // complex union
orcType = TypeDescription.createUnion();
List<Types.NestedField> nestedFields = type.asStructType().fields();
for (Types.NestedField nestedField : nestedFields.subList(1, nestedFields.size())) {
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addUnionChild(childType);
return getOrcSchemaForComplexUnionType(type, isRequired, mapping, orcField);
}
}

private static TypeDescription getOrcSchemaForComplexUnionType(Type type, boolean isRequired,
Map<Integer, OrcField> mapping,
OrcField orcField) {
TypeDescription orcType = TypeDescription.createUnion();
List<Types.NestedField> nestedFields = type.asStructType().fields();
yiqiangin marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < orcField.type.getChildren().size(); ++i) {
TypeDescription childOrcType = orcField.type.getChildren().get(i);
boolean typeProjectedInIcebergSchema = false;
for (Types.NestedField nestedField : nestedFields) {
if (!nestedField.name().equals(ICEBERG_UNION_TAG_FIELD_NAME) &&
Integer.parseInt(nestedField.name().substring(5)) == i) {
yiqiangin marked this conversation as resolved.
Show resolved Hide resolved
// child type is projected in Iceberg schema
TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(),
isRequired && nestedField.isRequired(), mapping);
orcType.addUnionChild(childType);
typeProjectedInIcebergSchema = true;
break;
}
}
if (!typeProjectedInIcebergSchema) {
orcType.addUnionChild(childOrcType);
}
}
return orcType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
package org.apache.iceberg.orc;

import java.util.List;
import java.util.Optional;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.TypeDescription;


public abstract class OrcSchemaWithTypeVisitor<T> {

public static <T> T visit(
org.apache.iceberg.Schema iSchema, TypeDescription schema, OrcSchemaWithTypeVisitor<T> visitor) {
return visit(iSchema.asStruct(), schema, visitor);
Expand Down Expand Up @@ -78,14 +79,73 @@ protected T visitUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisito
if (types.size() == 1) { // single type union
options.add(visit(type, types.get(0), visitor));
} else { // complex union
for (int i = 0; i < types.size(); i += 1) {
options.add(visit(type.asStructType().fields().get(i + 1).type(), types.get(i), visitor));
}
visitComplexUnion(type, union, visitor, options);
}

return visitor.union(type, union, options);
}

/*
A complex union with multiple types of Orc schema is converted into a struct with multiple fields of Iceberg schema.
Also an extra tag field is added into the struct of Iceberg schema during the conversion.
Given an example of complex union in both Orc and Iceberg:
Orc schema: {"name":"unionCol","type":["int","string"]}
yiqiangin marked this conversation as resolved.
Show resolved Hide resolved
Iceberg schema: struct<0: tag: required int, 1: field0: optional int, 2: field1: optional string>
The fields in the struct of Iceberg schema are expected to be stored in the same order
as the corresponding types in the union of Orc schema.
Except the tag field, the fields in the struct of Iceberg schema are the same as the types in the union of Orc schema
in the general case. In case of field projection, the fields in the struct of Iceberg schema only contains
the fields to be projected which equals to a subset of the types in the union of ORC schema.
Therefore, this function visits the complex union with the consideration of both cases.
*/
private <T> void visitComplexUnion(Type type, TypeDescription union, OrcSchemaWithTypeVisitor<T> visitor,
List<T> options) {
int typeIndex = 0;
int fieldIndexInStruct = 0;
while (typeIndex < union.getChildren().size()) {
TypeDescription schema = union.getChildren().get(typeIndex);
boolean relatedFieldInStructFound = false;
Types.StructType struct = type.asStructType();
if (fieldIndexInStruct < struct.fields().size() &&
ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME
.equals(struct.fields().get(fieldIndexInStruct).name())) {
fieldIndexInStruct++;
}

if (fieldIndexInStruct < struct.fields().size()) {
String structFieldName = type.asStructType().fields().get(fieldIndexInStruct).name();
int indexFromStructFieldName = Integer.parseInt(structFieldName.substring(5));
if (typeIndex == indexFromStructFieldName) {
relatedFieldInStructFound = true;
T option = visit(type.asStructType().fields().get(fieldIndexInStruct).type(), schema, visitor);
options.add(option);
fieldIndexInStruct++;
}
}
if (!relatedFieldInStructFound) {
visitNotProjectedTypeInComplexUnion(schema, visitor, options, typeIndex);
}
typeIndex++;
}
}

// If a field is not projected, a corresponding field in the struct of Iceberg schema cannot be found
// for current type of union in Orc schema, a reader for current type still needs to be created and
// used to make the reading of Orc file successfully. In this case, a pseudo Iceberg type is converted from
// the Orc schema and is used to create the option for the reader of the current type which still can
// read the corresponding content in Orc file successfully.
private static <T> void visitNotProjectedTypeInComplexUnion(TypeDescription schema,
OrcSchemaWithTypeVisitor<T> visitor,
List<T> options,
int typeIndex) {
OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor();
schemaConverter.beforeField("field" + typeIndex, schema);
schema.setAttribute(org.apache.iceberg.orc.ORCSchemaUtil.ICEBERG_ID_ATTRIBUTE, "-1");
yiqiangin marked this conversation as resolved.
Show resolved Hide resolved
Optional<Types.NestedField> icebergSchema = OrcToIcebergVisitor.visit(schema, schemaConverter);
schemaConverter.afterField("field" + typeIndex, schema);
options.add(visit(icebergSchema.get().type(), schema, visitor));
}

public T record(Types.StructType iStruct, TypeDescription record, List<String> names, List<T> fields) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public OrcValueReader<?> record(

@Override
public OrcValueReader<?> union(Type expected, TypeDescription union, List<OrcValueReader<?>> options) {
return SparkOrcValueReaders.union(options);
return SparkOrcValueReaders.union(options, expected);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
package org.apache.iceberg.spark.data;

import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.orc.OrcValueReader;
import org.apache.iceberg.orc.OrcValueReaders;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
Expand Down Expand Up @@ -71,8 +74,8 @@ static OrcValueReader<?> struct(
return new StructReader(readers, struct, idToConstant);
}

static OrcValueReader<?> union(List<OrcValueReader<?>> readers) {
return new UnionReader(readers);
static OrcValueReader<?> union(List<OrcValueReader<?>> readers, Type expected) {
return new UnionReader(readers, expected);
}

static OrcValueReader<?> array(OrcValueReader<?> elementReader) {
Expand Down Expand Up @@ -166,12 +169,39 @@ protected void set(InternalRow struct, int pos, Object value) {

static class UnionReader implements OrcValueReader<Object> {
private final OrcValueReader[] readers;
private final Type expected;
yiqiangin marked this conversation as resolved.
Show resolved Hide resolved
private int[] projectedFieldIdsToIdxInReturnedRow;
private boolean isTagFieldProjected;
private int numOfFieldsInReturnedRow;

private UnionReader(List<OrcValueReader<?>> readers) {
private UnionReader(List<OrcValueReader<?>> readers, Type expected) {
this.readers = new OrcValueReader[readers.size()];
for (int i = 0; i < this.readers.length; i += 1) {
this.readers[i] = readers.get(i);
}
this.expected = expected;

if (this.readers.length > 1) {
// Creating an integer array to track the mapping between the index of fields to be projected
// and the index of the value for the field stored in the returned row,
// if the value for a field equals to -1, it means the value of this field should not be stored
yiqiangin marked this conversation as resolved.
Show resolved Hide resolved
// in the returned row
this.projectedFieldIdsToIdxInReturnedRow = new int[readers.size()];
Arrays.fill(this.projectedFieldIdsToIdxInReturnedRow, -1);
this.numOfFieldsInReturnedRow = 0;
this.isTagFieldProjected = false;

for (Types.NestedField expectedStructField : expected.asStructType().fields()) {
String fieldName = expectedStructField.name();
if (fieldName.equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) {
this.isTagFieldProjected = true;
this.numOfFieldsInReturnedRow++;
continue;
}
int projectedFieldIndex = Integer.valueOf(fieldName.substring(5));
this.projectedFieldIdsToIdxInReturnedRow[projectedFieldIndex] = this.numOfFieldsInReturnedRow++;
}
}
}

@Override
Expand All @@ -183,12 +213,17 @@ public Object nonNullRead(ColumnVector vector, int row) {
if (readers.length == 1) {
return value;
} else {
InternalRow struct = new GenericInternalRow(readers.length + 1);
for (int i = 0; i < readers.length; i += 1) {
struct.setNullAt(i + 1);
InternalRow struct = new GenericInternalRow(numOfFieldsInReturnedRow);
for (int i = 0; i < struct.numFields(); i += 1) {
struct.setNullAt(i);
}
if (this.isTagFieldProjected) {
struct.update(0, fieldIndex);
}

if (this.projectedFieldIdsToIdxInReturnedRow[fieldIndex] != -1) {
struct.update(this.projectedFieldIdsToIdxInReturnedRow[fieldIndex], value);
}
struct.update(0, fieldIndex);
struct.update(fieldIndex + 1, value);

return struct;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.IntStream;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.orc.ORCSchemaUtil;
import org.apache.iceberg.orc.OrcBatchReader;
import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor;
import org.apache.iceberg.orc.OrcValueReader;
Expand Down Expand Up @@ -434,10 +435,19 @@ public ColumnVector getChild(int ordinal) {
private static class UnionConverter implements Converter {
private final Type type;
private final List<Converter> optionConverters;
private boolean isTagFieldProjected;

private UnionConverter(Type type, List<Converter> optionConverters) {
this.type = type;
this.optionConverters = optionConverters;
if (optionConverters.size() > 1) {
for (Types.NestedField field : type.asStructType().fields()) {
if (field.name().equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) {
this.isTagFieldProjected = true;
break;
}
}
}
}

@Override
Expand All @@ -449,13 +459,23 @@ public ColumnVector convert(org.apache.orc.storage.ql.exec.vector.ColumnVector v
List<Types.NestedField> fields = type.asStructType().fields();
List<ColumnVector> fieldVectors = Lists.newArrayListWithExpectedSize(fields.size());

LongColumnVector longColumnVector = new LongColumnVector();
longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray();
// Adding ColumnVector for tag field into fieldVectors when the tag field is projected in Iceberg schema
if (isTagFieldProjected) {
LongColumnVector longColumnVector = new LongColumnVector();
longColumnVector.vector = Arrays.stream(unionColumnVector.tags).asLongStream().toArray();

fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector,
OrcValueReaders.ints(), batchOffsetInFile));
for (int i = 0; i < fields.size() - 1; i += 1) {
fieldVectors.add(optionConverters.get(i).convert(unionColumnVector.fields[i], batchSize, batchOffsetInFile));
fieldVectors.add(new PrimitiveOrcColumnVector(Types.IntegerType.get(), batchSize, longColumnVector,
OrcValueReaders.ints(), batchOffsetInFile));
}

// Adding ColumnVector for each field projected in Iceberg schema into fieldVectors
for (int i = 0; i < fields.size(); ++i) {
Types.NestedField field = fields.get(i);
if (!field.name().equals(ORCSchemaUtil.ICEBERG_UNION_TAG_FIELD_NAME)) {
int typeIdx = Integer.parseInt(field.name().substring(5));
fieldVectors.add(optionConverters.get(typeIdx)
.convert(unionColumnVector.fields[typeIdx], batchSize, batchOffsetInFile));
}
}

return new BaseOrcColumnVector(type.asStructType(), batchSize, vector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,82 @@ public void testComplexUnion() throws IOException {
}
}

@Test
public void testComplexUnionWithColumnProjection() throws IOException {
TypeDescription orcSchema =
TypeDescription.fromString("struct<unionCol:uniontype<int,string>>");

Schema expectedSchema = new Schema(
Types.NestedField.optional(0, "unionCol", Types.StructType.of(
Types.NestedField.optional(1, "field0", Types.IntegerType.get())))
);

final InternalRow expectedFirstRow = new GenericInternalRow(1);
final InternalRow field1 = new GenericInternalRow(1);
field1.update(0, 0);
expectedFirstRow.update(0, field1);

final InternalRow expectedSecondRow = new GenericInternalRow(1);
final InternalRow field2 = new GenericInternalRow(1);
field2.update(0, null);
expectedSecondRow.update(0, field2);

Configuration conf = new Configuration();

File orcFile = temp.newFile();
Path orcFilePath = new Path(orcFile.getPath());

Writer writer = OrcFile.createWriter(orcFilePath,
OrcFile.writerOptions(conf)
.setSchema(orcSchema).overwrite(true));

VectorizedRowBatch batch = orcSchema.createRowBatch();
LongColumnVector longColumnVector = new LongColumnVector(NUM_OF_ROWS);
BytesColumnVector bytesColumnVector = new BytesColumnVector(NUM_OF_ROWS);
UnionColumnVector complexUnion = new UnionColumnVector(NUM_OF_ROWS, longColumnVector, bytesColumnVector);

complexUnion.init();

for (int i = 0; i < NUM_OF_ROWS; i += 1) {
complexUnion.tags[i] = i % 2;
longColumnVector.vector[i] = i;
String stringValue = "foo-" + i;
bytesColumnVector.setVal(i, stringValue.getBytes(StandardCharsets.UTF_8));
}

batch.size = NUM_OF_ROWS;
batch.cols[0] = complexUnion;

writer.addRowBatch(batch);
batch.reset();
writer.close();

// Test non-vectorized reader
List<InternalRow> actualRows = Lists.newArrayList();
try (CloseableIterable<InternalRow> reader = ORC.read(Files.localInput(orcFile))
.project(expectedSchema)
.createReaderFunc(readOrcSchema -> new SparkOrcReader(expectedSchema, readOrcSchema))
.build()) {
reader.forEach(actualRows::add);

Assert.assertEquals(actualRows.size(), NUM_OF_ROWS);
assertEquals(expectedSchema, expectedFirstRow, actualRows.get(0));
assertEquals(expectedSchema, expectedSecondRow, actualRows.get(1));
}

// Test vectorized reader
try (CloseableIterable<ColumnarBatch> reader = ORC.read(Files.localInput(orcFile))
.project(expectedSchema)
.createBatchedReaderFunc(readOrcSchema ->
VectorizedSparkOrcReaders.buildReader(expectedSchema, readOrcSchema, ImmutableMap.of()))
.build()) {
final Iterator<InternalRow> actualRowsIt = batchesToRows(reader.iterator());

assertEquals(expectedSchema, expectedFirstRow, actualRowsIt.next());
assertEquals(expectedSchema, expectedSecondRow, actualRowsIt.next());
}
}

@Test
public void testDeeplyNestedUnion() throws IOException {
TypeDescription orcSchema =
Expand Down
Loading