Skip to content

Commit

Permalink
Support merging of rowgroups during file rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
MaheshGPai authored and mpairamanat committed Jul 15, 2023
1 parent 1bdb3c6 commit 1f511a6
Show file tree
Hide file tree
Showing 7 changed files with 1,097 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand {
required = false)
String codec;

@Parameter(
names = {"-m", "--merge-rowgroups"},
description = "<true/false>",
required = false)
boolean mergeRowGroups;

@Parameter(
names = {"-s", "--max-rowgroup-size"},
description = "<max size of the merged rowgroups>",
required = false)
long maxRowGroupSize;

public RewriteCommand(Logger console) {
super(console);
}
Expand Down Expand Up @@ -108,6 +120,14 @@ private RewriteOptions buildOptionsOrFail() {
builder.transform(codecName);
}

if (mergeRowGroups) {
Preconditions.checkArgument(maxRowGroupSize > 0,
"If merge rowgroup is enabled, max rowgroups size should be specified");
Preconditions.checkArgument(null != codec,
"If merge rowgroup is enabled, new compression codec needs to be specified");
builder.enableRowGroupMerge();
builder.maxRowGroupSize(maxRowGroupSize);
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.CompressionConverter;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
Expand Down Expand Up @@ -1034,32 +1035,7 @@ public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
boolean dropColumns) throws IOException {
startBlock(rowGroup.getRowCount());

Map<String, ColumnChunkMetaData> columnsToCopy =
new HashMap<String, ColumnChunkMetaData>();
for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
columnsToCopy.put(chunk.getPath().toDotString(), chunk);
}

List<ColumnChunkMetaData> columnsInOrder =
new ArrayList<ColumnChunkMetaData>();

for (ColumnDescriptor descriptor : schema.getColumns()) {
String path = ColumnPath.get(descriptor.getPath()).toDotString();
ColumnChunkMetaData chunk = columnsToCopy.remove(path);
if (chunk != null) {
columnsInOrder.add(chunk);
} else {
throw new IllegalArgumentException(String.format(
"Missing column '%s', cannot copy row group: %s", path, rowGroup));
}
}

// complain if some columns would be dropped and that's not okay
if (!dropColumns && !columnsToCopy.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Columns cannot be copied (missing from target schema): %s",
String.join(", ", columnsToCopy.keySet())));
}
List<ColumnChunkMetaData> columnsInOrder = getColumnsInOrder(rowGroup, dropColumns);

// copy the data for all chunks
long start = -1;
Expand Down Expand Up @@ -1158,6 +1134,44 @@ public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream f
currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
}

private List<ColumnChunkMetaData> getColumnsInOrder(BlockMetaData rowGroup, boolean dropColumns) {
return getColumnsInOrder(rowGroup, schema, dropColumns);
}

/**
* @param rowGroup row group containing columns
* @param schema the schema to use for column ordering
* @param dropColumns whether we should drop columns that are not defined in the provided schema
*/
public static List<ColumnChunkMetaData> getColumnsInOrder(BlockMetaData rowGroup,
MessageType schema, boolean dropColumns) {
Map<String, ColumnChunkMetaData> columnsToCopy = new HashMap<>();
for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
columnsToCopy.put(chunk.getPath().toDotString(), chunk);
}

List<ColumnChunkMetaData> columnsInOrder = new ArrayList<>();

for (ColumnDescriptor descriptor : schema.getColumns()) {
String path = ColumnPath.get(descriptor.getPath()).toDotString();
ColumnChunkMetaData chunk = columnsToCopy.remove(path);
if (chunk != null) {
columnsInOrder.add(chunk);
} else {
throw new IllegalArgumentException(String.format(
"Missing column '%s', cannot copy row group: %s", path, rowGroup));
}
}

// complain if some columns would be dropped and that's not okay
if (!dropColumns && !columnsToCopy.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Columns cannot be copied (missing from target schema): %s",
String.join(", ", columnsToCopy.keySet())));
}
return columnsInOrder;
}

// Buffers for the copy function.
private static final ThreadLocal<byte[]> COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]);

Expand Down
Loading

0 comments on commit 1f511a6

Please sign in to comment.