Skip to content

Commit

Permalink
PARQUET-2444: Define/enforce contract for codecs
Browse files Browse the repository at this point in the history
* Added javadoc comments to specify the contracts in
  CompressionCodecFactory
* Updated the related unit test to validate the contract on the
  different implementaions
* Made LZ4RawCodec to implement direct decompressing so we can also
  cover this scenario in the unit test

Fixed discovered issues based on the new testing:
* Properly set the ByteBuffer indices for the related implementations
  according to the defined contract
* Fix issue at DirectCodecFactory that FullDirectDecompressor never
  worked
* Fix ParquetWriter builder to pass through the codec factory
  • Loading branch information
gszadovszky committed Mar 6, 2024
1 parent a93207f commit 22c6e80
Show file tree
Hide file tree
Showing 9 changed files with 310 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,137 @@
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

/**
* Factory for creating (and potentially caching) {@link BytesInputCompressor} and {@link BytesInputDecompressor}
* instances to compress/decompress page data.
* <p>
* The factory instance shall be released after use. The compressor/decompressor instances shall not be used after
* release.
*
* @see #release()
*/
public interface CompressionCodecFactory {

/**
* Returns a {@link BytesInputCompressor} instance for the specified codec name to be used for compressing page data.
* <p>
* The compressor is not thread-safe, so one instance for each working thread is required.
*
* @param codecName the codec name which the compressor instance is to be returned
* @return the compressor instance for the specified codec name
* @see BytesInputCompressor#release()
*/
BytesInputCompressor getCompressor(CompressionCodecName codecName);

/**
* Returns a {@link BytesInputDecompressor} instance for the specified codec name to be used for decompressing page
* data.
* <p>
* The decompressor is not thread-safe, so one instance for each working thread is required.
*
* @param codecName the codec name which the decompressor instance is to be returned
* @return the decompressor instance for the specified codec name
* @see BytesInputDecompressor#release()
*/
BytesInputDecompressor getDecompressor(CompressionCodecName codecName);

/**
* Releasing this factory instance.
* <p>
* Each compressor/decompressor instance shall be released before invoking this. Nor the compressor/decompressor
* instances retrieved from this factory nor this factory instance itself shall be used after release.
*
* @see BytesInputCompressor#release()
* @see BytesInputDecompressor#release()
*/
void release();

/**
* Compressor instance of a specific codec to be used for compressing page data.
* <p>
* This compressor shall be released after use. This compressor shall not be used after release.
*
* @see #release()
*/
interface BytesInputCompressor {

/**
* Compresses the specified {@link BytesInput} data and returns it as {@link BytesInput}.
* <p>
* Depending on the implementation {@code bytes} might be completely consumed. The returned {@link BytesInput}
* instance needs to be consumed before using this compressor again. This is because the implementation might use
* its internal buffer to directly provide the returned {@link BytesInput} instance.
*
* @param bytes the page data to be compressed
* @return a {@link BytesInput} containing the compressed data. Needs to be consumed before using this compressor
* again.
* @throws IOException if any I/O error occurs during the compression
*/
BytesInput compress(BytesInput bytes) throws IOException;

/**
* Returns the codec name of this compressor.
*
* @return the codec name
*/
CompressionCodecName getCodecName();

/**
* Releases this compressor instance.
* <p>
* No subsequent calls on this instance nor the returned {@link BytesInput} instance returned by
* {@link #compress(BytesInput)} shall be used after release.
*/
void release();
}

/**
* Decompressor instance of a specific codec to be used for decompressing page data.
* <p>
* This decompressor shall be released after use. This decompressor shall not be used after release.
*
* @see #release()
*/
interface BytesInputDecompressor {
BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException;

void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
/**
* Decompresses the specified {@link BytesInput} data and returns it as {@link BytesInput}.
* <p>
* The decompressed data must have the size specified. Depending on the implementation {@code bytes} might be
* completely consumed. The returned {@link BytesInput} instance needs to be consumed before using this decompressor
* again. This is because the implementation might use its internal buffer to directly provide the returned
* {@link BytesInput} instance.
*
* @param bytes the page data to be decompressed
* @param decompressedSize the exact size of the decompressed data
* @return a {@link BytesInput} containing the decompressed data. Needs to be consumed before using this
* decompressor again.
* @throws IOException if any I/O error occurs during the decompression
*/
BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException;

/**
* Decompresses {@code compressedSize} bytes from {@code input} from the current position. The decompressed bytes is
* to be written int {@code output} from its current position. The decompressed data must have the size specified.
* <p>
* {@code output} must have the available bytes of {@code decompressedSize}. According to the {@link ByteBuffer}
* contract the position of {@code input} will be increased by {@code compressedSize}, and the position of
* {@code output} will be increased by {@code decompressedSize}. (It means, one would have to flip the output buffer
* before reading the decompressed data from it.)
*
* @param input the input buffer where the data is to be decompressed from
* @param compressedSize the exact size of the compressed (input) data
* @param output the output buffer where the data is to be decompressed into
* @param decompressedSize the exact size of the decompressed (output) data
* @throws IOException if any I/O error occurs during the decompression
*/
void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize)
throws IOException;

/**
* Releases this decompressor instance. No subsequent calls on this instance nor the returned {@link BytesInput}
* instance returned by {@link #decompress(BytesInput, int)} shall be used after release.
*/
void release();
}
}
Binary file added parquet-hadoop/.pom.xml.swp
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,20 @@ public class CodecFactory implements CompressionCodecFactory {

static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() {
@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) {
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) {
Preconditions.checkArgument(
compressedSize == uncompressedSize,
"Non-compressed data did not have matching compressed and uncompressed sizes.");
output.clear();
output.put((ByteBuffer) input.duplicate().position(0).limit(compressedSize));
compressedSize == decompressedSize,
"Non-compressed data did not have matching compressed and decompressed sizes.");
Preconditions.checkArgument(
input.remaining() >= compressedSize, "Not enough bytes available in the input buffer");
int origLimit = input.limit();
input.limit(input.position() + compressedSize);
output.put(input);
input.limit(origLimit);
}

@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) {
public BytesInput decompress(BytesInput bytes, int decompressedSize) {
return bytes;
}

Expand Down Expand Up @@ -150,7 +154,7 @@ class HeapBytesDecompressor extends BytesDecompressor {
}

@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException {
final BytesInput decompressed;
if (decompressor != null) {
decompressor.reset();
Expand All @@ -162,20 +166,27 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx
// This change will load the decompressor stream into heap a little earlier, since the problem it solves
// only happens in the ZSTD codec, so this modification is only made for ZSTD streams.
if (codec instanceof ZstandardCodec) {
decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
decompressed = BytesInput.copy(BytesInput.from(is, decompressedSize));
is.close();
} else {
decompressed = BytesInput.from(is, uncompressedSize);
decompressed = BytesInput.from(is, decompressedSize);
}
return decompressed;
}

@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize)
throws IOException {
Preconditions.checkArgument(
input.remaining() >= compressedSize, "Not enough bytes available in the input buffer");
int origLimit = input.limit();
int origPosition = input.position();
input.limit(origPosition + compressedSize);
ByteBuffer decompressed =
decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
decompress(BytesInput.from(input), decompressedSize).toByteBuffer();
output.put(decompressed);
input.limit(origLimit);
input.position(origPosition + compressedSize);
}

public void release() {
Expand Down Expand Up @@ -338,9 +349,9 @@ public abstract static class BytesCompressor implements CompressionCodecFactory.
*/
@Deprecated
public abstract static class BytesDecompressor implements CompressionCodecFactory.BytesInputDecompressor {
public abstract BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException;
public abstract BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException;

public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize)
throws IOException;

public abstract void release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,7 @@ public DataPage visit(DataPageV1 dataPageV1) {
decompressedBuffer,
dataPageV1.getUncompressedSize());
setDecompressMetrics(bytes, start);

// HACKY: sometimes we need to do `flip` because the position of output bytebuffer is
// not reset.
if (decompressedBuffer.position() != 0) {
decompressedBuffer.flip();
}
decompressedBuffer.flip();
decompressed = BytesInput.from(decompressedBuffer);
} else { // use on-heap buffer
if (null != blockDecryptor) {
Expand Down Expand Up @@ -225,9 +220,6 @@ public DataPage visit(DataPageV2 dataPageV2) {
}
BytesInput pageBytes = dataPageV2.getData();
try {
BytesInput decompressed;
long compressedSize;

if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) {
ByteBuffer byteBuffer = pageBytes.toByteBuffer(releaser);
if (!byteBuffer.isDirect()) {
Expand All @@ -236,7 +228,7 @@ public DataPage visit(DataPageV2 dataPageV2) {
if (blockDecryptor != null) {
byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
}
compressedSize = byteBuffer.limit();
long compressedSize = byteBuffer.limit();
if (dataPageV2.isCompressed()) {
int uncompressedSize = Math.toIntExact(dataPageV2.getUncompressedSize()
- dataPageV2.getDefinitionLevels().size()
Expand All @@ -248,12 +240,7 @@ public DataPage visit(DataPageV2 dataPageV2) {
decompressor.decompress(
byteBuffer, (int) compressedSize, decompressedBuffer, uncompressedSize);
setDecompressMetrics(pageBytes, start);

// HACKY: sometimes we need to do `flip` because the position of output bytebuffer is
// not reset.
if (decompressedBuffer.position() != 0) {
decompressedBuffer.flip();
}
decompressedBuffer.flip();
pageBytes = BytesInput.from(decompressedBuffer);
} else {
pageBytes = BytesInput.from(byteBuffer);
Expand Down
Loading

0 comments on commit 22c6e80

Please sign in to comment.