Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support to field filtering of complex union type in ORC #150

Merged
merged 4 commits into from
Oct 13, 2023

Conversation

yiqiangin
Copy link
Contributor

@yiqiangin yiqiangin commented Oct 6, 2023

Problem
Currently the query on the whole column of complex union type works in case that the data are stored in ORC file, (e.g. the query like select c1 from tbl where c1 is a type of complex union, however, field filtering on the column of complex union in the query does not work (e.g the query like select c1.field0 from tbl where field0 is one type in the complex union). The root cause of the problem is as follows:
Two types of schema are used in Iceberg to read the data from ORC file per the columns defined in the user query: one is table schema from table metastore and used by the user query which is defined in Iceberg format, namely Iceberg schema; the other is file schema which is defined in ORC file and data are stored complying with this schema, namely ORC schema. As Iceberg does not support union type natively, complex union is defined as a struct type in Iceberg schema. The current code assumes that the types inside a union from ORC schema should match the type fields inside a union struct from Iceberg schema when ORC union reader is created.
In case of field filtering of union type, the current code only prune the schema of union in Iceberg schema with the projected fields, while the union of ORC schema still contains all the types. It results in the mismatch between ORC schema and Iceberg schema for the union in this case.
However, as all the contents of each data type in a union in Orc file should be read by Orc readers correctly no matter this data type is projected or not based on the decoding procedure of Orc file, all the types in a union from Orc schema are needed to create the corresponding type readers in OrcUnionReader even in case of column projection. Therefore the union in Orc schema cannot be pruned like what is done to the union struct in Iceberg schema.

Solution
Assuming there are N types in a union, there are N+1 fields including "tag" field in the struct corresponding to the union in Iceberg schema. The user can project any K fields (K>=1 and K<=N+1 and including the tag field) of the union in a query. The case of without column projection equals to full fields projection namely K=N+1. Therefore the solution does not differentiate the cases of with and without column projections.
In addition, the order of the types in a union in Iceberg schema can be identified from its field name like "field0".."fieldK". K is the index which can be used to match the order of the types in the union of Orc schema.

In the code of create the readers of all types in the union of Orc schema (namely OrcSchemaWithTypeVisitor.visitUnion), checking the fields of the struct corresponding union in Iceberg schema to create a map between the order index and the field type in Iceberg schema. When iterating through all the types in Orc schema, using the order index to check if the corresponding type exists in the map, if yes which means the field is projected, creating the option of creating the reader with the type in Iceberg schema, otherwise, creating the option with a pseudo Iceberg type converted from Orc schema.

In the code of OrcUnionReader, Iceberg schema needs to be passed into it. The fields of the returned row should be constructed based on the fields in Iceberg schema not the types in Orc schema. If tag field is projected, one more field is added in the beginning of the row and updated with the index of the field in Orc file.

Test
All the test cases in TestSparkOrcUnions.java with a new test case testComplexUnionWithColumnProjection

Manual test with Spark3 with all the following queries on a table with a union:

val df = spark.sql("select c1.field0 from u_yiqding.orc_union_table_test")

val df = spark.sql("select c1.field0,c1.field1 from u_yiqding.orc_union_table_test")

val df = spark.sql("select c1.tag,c1.field0 from u_yiqding.orc_union_table_test")

val df = spark.sql("select c1.tag,c1.field0,c1.field1 from u_yiqding.orc_union_table_test")

val df = spark.sql("select c1.field1,c1.field0,c1.tag from u_yiqding.orc_union_table_test")

val df = spark.sql("select c1.field1,c1.field0 from u_yiqding.orc_union_table_test")

The schema of the table u_yiqding.orc_union_table_test is as follows:

# col_name            	data_type           	comment

c1                  	uniontype<int,string>

@aastha25
Copy link

@yiqiangin thanks for the PR.
Could you add some more details in the description and examples in the description

(1) what is currently supported prior to this PR, for example - I think only spark.sql("select c1 from u_yiqding.orc_union_table_test")? does this mean reads only work if the ORC file has values for all the types inside the uniontype? Also, please add the schema of u_yiqding.orc_union_table_test in the description.

(2) "types inside a union from Orc schema should match the type fields inside a union struct from Iceberg schema when Orc union reader is created."
the latter is constructed from the ORC data files and the former is constructed using ORC file schema / catalog schema?

@yiqiangin
Copy link
Contributor Author

(1) what is currently supported prior to this PR, for example - I think only spark.sql("select c1 from u_yiqding.orc_union_table_test")? does this mean reads only work if the ORC file has values for all the types inside the uniontype? Also, please add the schema of u_yiqding.orc_union_table_test in the description.

Yes, currently it only works querying the whole column of complex union like `spark.sql("select c1 from u_yiqding.orc_union_table_test"). It does not require ORC file has values for all the types in the union. For each row, the column of complex union could only have value for one type. The returned result of the query has null value for the type which does not have valid value in the row. I have added this explaination in the description.

(2) "types inside a union from Orc schema should match the type fields inside a union struct from Iceberg schema when Orc union reader is created." the latter is constructed from the ORC data files and the former is constructed using ORC file schema / catalog schema?

Two types of schema are used in Iceberg to read the data from ORC file per the columns defined in the user query: one is table schema from table metastore and used by the user query which is defined in Iceberg format, namely Iceberg schema; the other is file schema which is defined in ORC file and data are stored complying with this schema, namely ORC schema. As Iceberg does not support union type natively, complex union is defined as a struct type in Iceberg schema.
This explanation has been added in the description as well.

Copy link

@aastha25 aastha25 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on the PR @yiqiangin . I have left minor doc comments.

@yiqiangin yiqiangin merged commit 563e7d9 into linkedin:li-0.11.x Oct 13, 2023
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants