-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PARQUET-1381: Support merging of rowgroups during file rewrite #1121
base: master
Are you sure you want to change the base?
Conversation
Taking forward a PR that had remained inactive. Original PR - #775 |
1a84d5c
to
1f511a6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simply did an initial review. I
@@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand { | |||
required = false) | |||
String codec; | |||
|
|||
@Parameter( | |||
names = {"-m", "--merge-rowgroups"}, | |||
description = "<true/false>", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add a brief description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
@Parameter( | ||
names = {"-s", "--max-rowgroup-size"}, | ||
description = "<max size of the merged rowgroups>", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to say it is used together with --merge-rowgroups=true
in the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
builder.enableRowGroupMerge(); | ||
builder.maxRowGroupSize(maxRowGroupSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about use a single function? Like builder.mergeRowGroups(maxRowGroupSize)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have made changes as per the comment. I'm fine either way.
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.nio.ByteBuffer; | ||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not use import star.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected
import org.apache.parquet.schema.MessageType; | ||
import org.apache.parquet.schema.PrimitiveType; | ||
|
||
public class RowGroupMerger { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public class RowGroupMerger { | |
class RowGroupMerger { |
It would be good not to make it public for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably you need to relocate it into the rewrite
package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
initNextReader(); | ||
} | ||
while(reader != null); | ||
new RowGroupMerger(schema, newCodecName, v2EncodingHint).merge(readers, maxRowGroupSize, writer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't review it in depth. Does it handle encryption or masking properties internally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Underneath, it uses the same instance of ParquetFileWriter which handles these operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice feature @MaheshGPai. I'm wondering similar features too, thanks for your work.
By the way, do you have any performance number comparing this with rewrote by query engines such as Spark/Hive.
List<ParquetFileReader> readers = new ArrayList<>(); | ||
do { | ||
readers.add(reader); | ||
initNextReader(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like v2EncodingHint
only checks the first parquet file..
Should all the files to be checked?
DictionaryPage dictPage = columnReader.readDictionaryPage(); | ||
Dictionary decodedDictionary = null; | ||
if (dictPage != null) { | ||
decodedDictionary = dictPage.getEncoding().initDictionary(column.getColumnDesc(), dictPage); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the process of page encoding correctly: parquet tries to use dictionary encoding by default, If the dictionary grows too big, whether in size or number of distinct values, the encoding will fall back to the plain encoding. The check and fallback logic happens when emit the first page.
So when we are merging multiple column chunks from different row groups, if the first column chunks is dictionary encoded and others are not because it fallbacks to plain encoding, we should disable the dictionary encoding for that column on purpose to avoid introducing overhead.
Current logic doesn't handle that, it will use dictionary encoding if the column chunk in the first row group to be merged use dictionary encoding.
|
||
if (mergedBlock == null && estimator.estimate(blockMeta) > maxRowGroupSize) { | ||
//save it directly without re encoding it | ||
saveBlockTo(ReadOnlyMergedBlock.of(blockMeta, group, schema, compressor), writer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked related code, seems that startColumn
and endColumn
doesn't maintain bloom filter....
It might be hard to maintain bloom filters when merging multiple row groups, but it should be possible and easy to maintain bloom filter for only one row group. See ParquetWriter#L337 for related code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, so it might be good to integrate this with ParquetRewriter
if one row group does not need to be merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for quick update!
I know this PR comes from another PR which was created long before ParquetRewriter
was implemented. However, my main concern is that the current implementation of RowGroupMerger
diverges from ParquetRewriter
, which makes it difficult to maintain in the future. For example, RowGroupMerger seems does not support column masking (nullify column values) if RewriterOptions has requested to do so. And it has duplicate implementation (i.e. ReadOnlyMergedBlock) if a row group does not need to merge which ParquetRewriter has already supported. Could you consider to consolidate these implementations? Otherwise it would not be easy if we want to add more features to the rewriter.
|
||
if (mergedBlock == null && estimator.estimate(blockMeta) > maxRowGroupSize) { | ||
//save it directly without re encoding it | ||
saveBlockTo(ReadOnlyMergedBlock.of(blockMeta, group, schema, compressor), writer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, so it might be good to integrate this with ParquetRewriter
if one row group does not need to be merged.
@Override | ||
public DataPage visit(DataPageV1 pageV1) { | ||
|
||
return new DataPageV1(compress(pageV1.getBytes(), compressor), pageV1.getValueCount(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does DataPageV1 require to compress again here but DataPageV2 does not (line 384 below)?
|
||
newValuesWriter.reset(); | ||
|
||
long firstRowIndex = pageV1.getFirstRowIndex().orElse(-1L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot simply copy firstRowIndex
if pages are not from the 1st row group in this MutableMergedBlock.
This is a great initiative. Do you still have plan to address the feedback @MaheshGPai ? |
@shangxinli I do plan to work on it. But I have not had time to get to this. |
Hi @MaheshGPai, thanks for the contribution. If you don't have time to work on this, I can continue with it. |
@ConeyLiu Please feel free to continue. I'll not be able to look at this for another week or so. |
OK, I will deep into it. |
Make sure you have checked all steps below.
Jira
Tests
Commits
Documentation