diff --git a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java index 4f31a3f8a8..561dcb899c 100644 --- a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java +++ b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java @@ -24,28 +24,137 @@ import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +/** + * Factory for creating (and potentially caching) {@link BytesInputCompressor} and {@link BytesInputDecompressor} + * instances to compress/decompress page data. + *

+ * The factory instance shall be released after use. The compressor/decompressor instances shall not be used after + * release. + * + * @see #release() + */ public interface CompressionCodecFactory { + /** + * Returns a {@link BytesInputCompressor} instance for the specified codec name to be used for compressing page data. + *

+ * The compressor is not thread-safe, so one instance for each working thread is required. + * + * @param codecName the codec name which the compressor instance is to be returned + * @return the compressor instance for the specified codec name + * @see BytesInputCompressor#release() + */ BytesInputCompressor getCompressor(CompressionCodecName codecName); + /** + * Returns a {@link BytesInputDecompressor} instance for the specified codec name to be used for decompressing page + * data. + *

+ * The decompressor is not thread-safe, so one instance for each working thread is required. + * + * @param codecName the codec name which the decompressor instance is to be returned + * @return the decompressor instance for the specified codec name + * @see BytesInputDecompressor#release() + */ BytesInputDecompressor getDecompressor(CompressionCodecName codecName); + /** + * Releasing this factory instance. + *

+ * Each compressor/decompressor instance shall be released before invoking this. Nor the compressor/decompressor + * instances retrieved from this factory nor this factory instance itself shall be used after release. + * + * @see BytesInputCompressor#release() + * @see BytesInputDecompressor#release() + */ void release(); + /** + * Compressor instance of a specific codec to be used for compressing page data. + *

+ * This compressor shall be released after use. This compressor shall not be used after release. + * + * @see #release() + */ interface BytesInputCompressor { + + /** + * Compresses the specified {@link BytesInput} data and returns it as {@link BytesInput}. + *

+ * Depending on the implementation {@code bytes} might be completely consumed. The returned {@link BytesInput} + * instance needs to be consumed before using this compressor again. This is because the implementation might use + * its internal buffer to directly provide the returned {@link BytesInput} instance. + * + * @param bytes the page data to be compressed + * @return a {@link BytesInput} containing the compressed data. Needs to be consumed before using this compressor + * again. + * @throws IOException if any I/O error occurs during the compression + */ BytesInput compress(BytesInput bytes) throws IOException; + /** + * Returns the codec name of this compressor. + * + * @return the codec name + */ CompressionCodecName getCodecName(); + /** + * Releases this compressor instance. + *

+ * No subsequent calls on this instance nor the returned {@link BytesInput} instance returned by + * {@link #compress(BytesInput)} shall be used after release. + */ void release(); } + /** + * Decompressor instance of a specific codec to be used for decompressing page data. + *

+ * This decompressor shall be released after use. This decompressor shall not be used after release. + * + * @see #release() + */ interface BytesInputDecompressor { - BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException; - void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + /** + * Decompresses the specified {@link BytesInput} data and returns it as {@link BytesInput}. + *

+ * The decompressed data must have the size specified. Depending on the implementation {@code bytes} might be + * completely consumed. The returned {@link BytesInput} instance needs to be consumed before using this decompressor + * again. This is because the implementation might use its internal buffer to directly provide the returned + * {@link BytesInput} instance. + * + * @param bytes the page data to be decompressed + * @param decompressedSize the exact size of the decompressed data + * @return a {@link BytesInput} containing the decompressed data. Needs to be consumed before using this + * decompressor again. + * @throws IOException if any I/O error occurs during the decompression + */ + BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException; + + /** + * Decompresses {@code compressedSize} bytes from {@code input} from the current position. The decompressed bytes is + * to be written int {@code output} from its current position. The decompressed data must have the size specified. + *

+ * {@code output} must have the available bytes of {@code decompressedSize}. According to the {@link ByteBuffer} + * contract the position of {@code input} will be increased by {@code compressedSize}, and the position of + * {@code output} will be increased by {@code decompressedSize}. (It means, one would have to flip the output buffer + * before reading the decompressed data from it.) + * + * @param input the input buffer where the data is to be decompressed from + * @param compressedSize the exact size of the compressed (input) data + * @param output the output buffer where the data is to be decompressed into + * @param decompressedSize the exact size of the decompressed (output) data + * @throws IOException if any I/O error occurs during the decompression + */ + void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) throws IOException; + /** + * Releases this decompressor instance. No subsequent calls on this instance nor the returned {@link BytesInput} + * instance returned by {@link #decompress(BytesInput, int)} shall be used after release. + */ void release(); } } diff --git a/parquet-hadoop/.pom.xml.swp b/parquet-hadoop/.pom.xml.swp new file mode 100644 index 0000000000..a7c77ae55a Binary files /dev/null and b/parquet-hadoop/.pom.xml.swp differ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index 98c7002feb..f0775484c5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -56,16 +56,20 @@ public class CodecFactory implements CompressionCodecFactory { static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() { @Override - public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) { + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) { Preconditions.checkArgument( - compressedSize == uncompressedSize, - "Non-compressed data did not have matching compressed and uncompressed sizes."); - output.clear(); - output.put((ByteBuffer) input.duplicate().position(0).limit(compressedSize)); + compressedSize == decompressedSize, + "Non-compressed data did not have matching compressed and decompressed sizes."); + Preconditions.checkArgument( + input.remaining() >= compressedSize, "Not enough bytes available in the input buffer"); + int origLimit = input.limit(); + input.limit(input.position() + compressedSize); + output.put(input); + input.limit(origLimit); } @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) { + public BytesInput decompress(BytesInput bytes, int decompressedSize) { return bytes; } @@ -150,7 +154,7 @@ class HeapBytesDecompressor extends BytesDecompressor { } @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { final BytesInput decompressed; if (decompressor != null) { decompressor.reset(); @@ -162,20 +166,27 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx // This change will load the decompressor stream into heap a little earlier, since the problem it solves // only happens in the ZSTD codec, so this modification is only made for ZSTD streams. if (codec instanceof ZstandardCodec) { - decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize)); + decompressed = BytesInput.copy(BytesInput.from(is, decompressedSize)); is.close(); } else { - decompressed = BytesInput.from(is, uncompressedSize); + decompressed = BytesInput.from(is, decompressedSize); } return decompressed; } @Override - public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) throws IOException { + Preconditions.checkArgument( + input.remaining() >= compressedSize, "Not enough bytes available in the input buffer"); + int origLimit = input.limit(); + int origPosition = input.position(); + input.limit(origPosition + compressedSize); ByteBuffer decompressed = - decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); + decompress(BytesInput.from(input), decompressedSize).toByteBuffer(); output.put(decompressed); + input.limit(origLimit); + input.position(origPosition + compressedSize); } public void release() { @@ -338,9 +349,9 @@ public abstract static class BytesCompressor implements CompressionCodecFactory. */ @Deprecated public abstract static class BytesDecompressor implements CompressionCodecFactory.BytesInputDecompressor { - public abstract BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException; + public abstract BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException; - public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) throws IOException; public abstract void release(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index cc20c15629..c7fc22b29f 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -168,12 +168,7 @@ public DataPage visit(DataPageV1 dataPageV1) { decompressedBuffer, dataPageV1.getUncompressedSize()); setDecompressMetrics(bytes, start); - - // HACKY: sometimes we need to do `flip` because the position of output bytebuffer is - // not reset. - if (decompressedBuffer.position() != 0) { - decompressedBuffer.flip(); - } + decompressedBuffer.flip(); decompressed = BytesInput.from(decompressedBuffer); } else { // use on-heap buffer if (null != blockDecryptor) { @@ -225,9 +220,6 @@ public DataPage visit(DataPageV2 dataPageV2) { } BytesInput pageBytes = dataPageV2.getData(); try { - BytesInput decompressed; - long compressedSize; - if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) { ByteBuffer byteBuffer = pageBytes.toByteBuffer(releaser); if (!byteBuffer.isDirect()) { @@ -236,7 +228,7 @@ public DataPage visit(DataPageV2 dataPageV2) { if (blockDecryptor != null) { byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD); } - compressedSize = byteBuffer.limit(); + long compressedSize = byteBuffer.limit(); if (dataPageV2.isCompressed()) { int uncompressedSize = Math.toIntExact(dataPageV2.getUncompressedSize() - dataPageV2.getDefinitionLevels().size() @@ -248,12 +240,7 @@ public DataPage visit(DataPageV2 dataPageV2) { decompressor.decompress( byteBuffer, (int) compressedSize, decompressedBuffer, uncompressedSize); setDecompressMetrics(pageBytes, start); - - // HACKY: sometimes we need to do `flip` because the position of output bytebuffer is - // not reset. - if (decompressedBuffer.position() != 0) { - decompressedBuffer.flip(); - } + decompressedBuffer.flip(); pageBytes = BytesInput.from(decompressedBuffer); } else { pageBytes = BytesInput.from(byteBuffer); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index f509dce55a..523e57dbf4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -68,7 +68,8 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable { try { tempClass = Class.forName("org.apache.hadoop.io.compress.DirectDecompressionCodec"); tempCreateMethod = tempClass.getMethod("createDirectDecompressor"); - tempDecompressMethod = tempClass.getMethod("decompress", ByteBuffer.class, ByteBuffer.class); + Class tempClass2 = Class.forName("org.apache.hadoop.io.compress.DirectDecompressor"); + tempDecompressMethod = tempClass2.getMethod("decompress", ByteBuffer.class, ByteBuffer.class); } catch (ClassNotFoundException | NoSuchMethodException e) { // do nothing, the class will just be assigned null } @@ -150,27 +151,25 @@ private IndirectDecompressor(Decompressor decompressor) { } @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { decompressor.reset(); byte[] inputBytes = bytes.toByteArray(); decompressor.setInput(inputBytes, 0, inputBytes.length); - byte[] output = new byte[uncompressedSize]; - decompressor.decompress(output, 0, uncompressedSize); + byte[] output = new byte[decompressedSize]; + decompressor.decompress(output, 0, decompressedSize); return BytesInput.from(output); } @Override - public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) throws IOException { decompressor.reset(); byte[] inputBytes = new byte[compressedSize]; - input.position(0); input.get(inputBytes); decompressor.setInput(inputBytes, 0, inputBytes.length); - byte[] outputBytes = new byte[uncompressedSize]; - decompressor.decompress(outputBytes, 0, uncompressedSize); - output.clear(); + byte[] outputBytes = new byte[decompressedSize]; + decompressor.decompress(outputBytes, 0, decompressedSize); output.put(outputBytes); } @@ -193,14 +192,14 @@ private abstract class BaseDecompressor extends BytesDecompressor { } @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { try (ByteBufferReleaser releaser = inputAllocator.getReleaser()) { ByteBuffer input = bytes.toByteBuffer(releaser); - ByteBuffer output = outputAllocator.allocate(uncompressedSize); + ByteBuffer output = outputAllocator.allocate(decompressedSize); int size = decompress(input.slice(), output.slice()); - if (size != uncompressedSize) { + if (size != decompressedSize) { throw new DirectCodecPool.ParquetCompressionCodecException( - "Unexpected decompressed size: " + size + " != " + uncompressedSize); + "Unexpected decompressed size: " + size + " != " + decompressedSize); } output.limit(size); return BytesInput.from(output); @@ -210,26 +209,26 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx abstract int decompress(ByteBuffer input, ByteBuffer output) throws IOException; @Override - public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) throws IOException { + int origInputLimit = input.limit(); input.limit(input.position() + compressedSize); - output.limit(output.position() + uncompressedSize); + int origOutputLimit = output.limit(); + output.limit(output.position() + decompressedSize); int size = decompress(input.slice(), output.slice()); - if (size != uncompressedSize) { + if (size != decompressedSize) { throw new DirectCodecPool.ParquetCompressionCodecException( - "Unexpected decompressed size: " + size + " != " + uncompressedSize); + "Unexpected decompressed size: " + size + " != " + decompressedSize); } input.position(input.limit()); - output.position(output.position() + uncompressedSize); + input.limit(origInputLimit); + output.position(output.limit()); + output.limit(origOutputLimit); } @Override public void release() { - try { - AutoCloseables.uncheckedClose(outputAllocator, inputAllocator); - } finally { - closeDecompressor(); - } + AutoCloseables.uncheckedClose(outputAllocator, inputAllocator, this::closeDecompressor); } abstract void closeDecompressor(); @@ -264,11 +263,7 @@ public BytesInput compress(BytesInput bytes) throws IOException { @Override public void release() { - try { - AutoCloseables.uncheckedClose(outputAllocator, inputAllocator); - } finally { - closeCompressor(); - } + AutoCloseables.uncheckedClose(outputAllocator, inputAllocator, this::closeCompressor); } abstract void closeCompressor(); @@ -296,22 +291,22 @@ private FullDirectDecompressor(Object decompressor) { } @Override - public BytesInput decompress(BytesInput compressedBytes, int uncompressedSize) throws IOException { + public BytesInput decompress(BytesInput compressedBytes, int decompressedSize) throws IOException { // Similarly to non-direct decompressors, we reset before use, if possible (see HeapBytesDecompressor) if (decompressor instanceof Decompressor) { ((Decompressor) decompressor).reset(); } - return super.decompress(compressedBytes, uncompressedSize); + return super.decompress(compressedBytes, decompressedSize); } @Override - public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) throws IOException { // Similarly to non-direct decompressors, we reset before use, if possible (see HeapBytesDecompressor) if (decompressor instanceof Decompressor) { ((Decompressor) decompressor).reset(); } - super.decompress(input, compressedSize, output, uncompressedSize); + super.decompress(input, compressedSize, output, decompressedSize); } @Override @@ -322,7 +317,10 @@ int decompress(ByteBuffer input, ByteBuffer output) { } catch (IllegalAccessException | InvocationTargetException e) { throw new DirectCodecPool.ParquetCompressionCodecException(e); } - return output.position() - startPos; + int size = output.position() - startPos; + // Some decompressors flip the output buffer, some don't: + // Let's rely on the limit if the position did not change + return size == 0 ? output.limit() : size; } @Override @@ -338,22 +336,20 @@ void closeDecompressor() { public class NoopDecompressor extends BytesDecompressor { @Override - public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int decompressedSize) throws IOException { - Preconditions.checkArgument( - compressedSize == uncompressedSize, - "Non-compressed data did not have matching compressed and uncompressed sizes."); - output.clear(); - output.put((ByteBuffer) input.duplicate().position(0).limit(compressedSize)); + NO_OP_DECOMPRESSOR.decompress(input, compressedSize, output, decompressedSize); } @Override - public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { - return bytes; + public BytesInput decompress(BytesInput bytes, int decompressedSize) throws IOException { + return NO_OP_DECOMPRESSOR.decompress(bytes, decompressedSize); } @Override - public void release() {} + public void release() { + NO_OP_DECOMPRESSOR.release(); + } } public class SnappyDecompressor extends BaseDecompressor { @@ -416,6 +412,8 @@ private class ZstdCompressor extends BaseCompressor { context = new ZstdCompressCtx(); context.setLevel(configuration.getInt( ZstandardCodec.PARQUET_COMPRESS_ZSTD_LEVEL, ZstandardCodec.DEFAULT_PARQUET_COMPRESS_ZSTD_LEVEL)); + context.setWorkers(configuration.getInt( + ZstandardCodec.PARQUET_COMPRESS_ZSTD_WORKERS, ZstandardCodec.DEFAULTPARQUET_COMPRESS_ZSTD_WORKERS)); } @Override @@ -449,16 +447,18 @@ public NoopCompressor() {} @Override public BytesInput compress(BytesInput bytes) throws IOException { - return bytes; + return NO_OP_COMPRESSOR.compress(bytes); } @Override public CompressionCodecName getCodecName() { - return CompressionCodecName.UNCOMPRESSED; + return NO_OP_COMPRESSOR.getCodecName(); } @Override - public void release() {} + public void release() { + NO_OP_COMPRESSOR.release(); + } } static class DirectCodecPool { @@ -486,7 +486,8 @@ public class CodecPool { private CodecPool(final CompressionCodec codec) { try { - boolean supportDirectDecompressor = codec.getClass() == DIRECT_DECOMPRESSION_CODEC_CLASS; + boolean supportDirectDecompressor = DIRECT_DECOMPRESSION_CODEC_CLASS != null + && DIRECT_DECOMPRESSION_CODEC_CLASS.isAssignableFrom(codec.getClass()); compressorPool = new GenericObjectPool( new BasePoolableObjectFactory() { public Object makeObject() throws Exception { @@ -533,8 +534,7 @@ public Object makeObject() throws Exception { directDecompressorPool = new GenericObjectPool( new BasePoolableObjectFactory() { public Object makeObject() throws Exception { - return CREATE_DIRECT_DECOMPRESSOR_METHOD.invoke( - DIRECT_DECOMPRESSION_CODEC_CLASS); + return CREATE_DIRECT_DECOMPRESSOR_METHOD.invoke(codec); } }, Integer.MAX_VALUE); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 22dc7e30f0..a76b843e70 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -916,6 +916,7 @@ public ParquetWriter build() throws IOException { mode, getWriteSupport(conf), codecName, + codecFactory, rowGroupSize, enableValidation, conf, @@ -928,6 +929,7 @@ public ParquetWriter build() throws IOException { mode, getWriteSupport(conf), codecName, + codecFactory, rowGroupSize, enableValidation, conf, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java index fd8c1a81ea..6a25c5982b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawCodec.java @@ -28,6 +28,8 @@ import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.DirectDecompressionCodec; +import org.apache.hadoop.io.compress.DirectDecompressor; /** * Lz4 raw compression codec for Parquet. This codec type has been introduced @@ -39,7 +41,7 @@ * below for reference. * https://github.com/apache/parquet-format/blob/master/Compression.md */ -public class Lz4RawCodec implements Configurable, CompressionCodec { +public class Lz4RawCodec implements Configurable, CompressionCodec, DirectDecompressionCodec { private Configuration conf; @@ -68,6 +70,11 @@ public Decompressor createDecompressor() { return new Lz4RawDecompressor(); } + @Override + public DirectDecompressor createDirectDecompressor() { + return new Lz4RawDecompressor(); + } + @Override public CompressionInputStream createInputStream(InputStream stream) throws IOException { return createInputStream(stream, createDecompressor()); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java index 7477bda875..68839d2814 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/Lz4RawDecompressor.java @@ -21,8 +21,9 @@ import io.airlift.compress.lz4.Lz4Decompressor; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.io.compress.DirectDecompressor; -public class Lz4RawDecompressor extends NonBlockedDecompressor { +public class Lz4RawDecompressor extends NonBlockedDecompressor implements DirectDecompressor { private Lz4Decompressor decompressor = new Lz4Decompressor(); @@ -41,4 +42,9 @@ protected int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws uncompressed.rewind(); return uncompressedSize; } + + @Override + public void decompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException { + uncompress(compressed, uncompressed); + } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java index 3a17795881..c78ee09ecc 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDirectCodecFactory.java @@ -19,14 +19,17 @@ import static org.apache.parquet.hadoop.metadata.CompressionCodecName.BROTLI; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4; +import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZ4_RAW; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.LZO; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Random; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.DirectByteBufferAllocator; @@ -55,13 +58,12 @@ private enum Decompression { private void test(int size, CompressionCodecName codec, boolean useOnHeapCompression, Decompression decomp) { try (TrackingByteBufferAllocator allocator = TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator()); ByteBufferReleaser releaser = new ByteBufferReleaser(allocator)) { - final CodecFactory codecFactory = + final CodecFactory directCodecFactory = CodecFactory.createDirectCodecFactory(new Configuration(), allocator, pageSize); + final CodecFactory heapCodecFactory = new CodecFactory(new Configuration(), pageSize); ByteBuffer rawBuf = allocator.allocate(size); releaser.releaseLater(rawBuf); final byte[] rawArr = new byte[size]; - ByteBuffer outBuf = allocator.allocate(size * 2); - releaser.releaseLater(outBuf); final Random r = new Random(); final byte[] random = new byte[1024]; int pos = 0; @@ -73,56 +75,51 @@ private void test(int size, CompressionCodecName codec, boolean useOnHeapCompres } rawBuf.flip(); - final BytesInputCompressor c = codecFactory.getCompressor(codec); - final BytesInputDecompressor d = codecFactory.getDecompressor(codec); + final BytesInputCompressor directCompressor = directCodecFactory.getCompressor(codec); + final BytesInputDecompressor directDecompressor = directCodecFactory.getDecompressor(codec); + final BytesInputCompressor heapCompressor = heapCodecFactory.getCompressor(codec); + final BytesInputDecompressor heapDecompressor = heapCodecFactory.getDecompressor(codec); - final BytesInput compressed; + if (codec == LZ4_RAW) { + // Hadoop codecs support direct decompressors only if the related native libraries are available. + // This is not the case for our CI so let's rely on LZ4_RAW where the implementation is our own. + Assert.assertTrue( + String.format("The hadoop codec %s should support direct decompression", codec), + directDecompressor instanceof DirectCodecFactory.FullDirectDecompressor); + } + + final BytesInput directCompressed; if (useOnHeapCompression) { - compressed = c.compress(BytesInput.from(rawArr)); + directCompressed = directCompressor.compress(BytesInput.from(rawArr)); } else { - compressed = c.compress(BytesInput.from(rawBuf)); + directCompressed = directCompressor.compress(BytesInput.from(rawBuf)); } - switch (decomp) { - case OFF_HEAP: { - final ByteBuffer buf = compressed.toByteBuffer(); - final ByteBuffer b = allocator.allocate(buf.capacity()); - try { - b.put(buf); - b.flip(); - d.decompress(b, (int) compressed.size(), outBuf, size); - for (int i = 0; i < size; i++) { - Assert.assertTrue("Data didn't match at " + i, outBuf.get(i) == rawBuf.get(i)); - } - } finally { - allocator.release(b); - } - break; - } + BytesInput heapCompressed = heapCompressor.compress(BytesInput.from(rawArr)); - case OFF_HEAP_BYTES_INPUT: { - final ByteBuffer buf = compressed.toByteBuffer(); - final ByteBuffer b = allocator.allocate(buf.limit()); - try { - b.put(buf); - b.flip(); - final BytesInput input = d.decompress(BytesInput.from(b), size); - Assert.assertArrayEquals( - String.format("While testing codec %s", codec), input.toByteArray(), rawArr); - } finally { - allocator.release(b); - } - break; - } - case ON_HEAP: { - final byte[] buf = compressed.toByteArray(); - final BytesInput input = d.decompress(BytesInput.from(buf), size); - Assert.assertArrayEquals(input.toByteArray(), rawArr); - break; - } - } - c.release(); - d.release(); + // Validate direct => direct + validateDecompress( + size, + codec, + decomp, + directCompressed.copy(releaser), + allocator, + directDecompressor, + rawBuf, + rawArr); + + // Validate heap => direct + validateDecompress(size, codec, decomp, heapCompressed, allocator, directDecompressor, rawBuf, rawArr); + + // Validate direct => heap + validateDecompress(size, codec, decomp, directCompressed, allocator, heapDecompressor, rawBuf, rawArr); + + directCompressor.release(); + directDecompressor.release(); + directCodecFactory.release(); + heapCompressor.release(); + heapDecompressor.release(); + heapCodecFactory.release(); } catch (Exception e) { final String msg = String.format( "Failure while testing Codec: %s, OnHeapCompressionInput: %s, Decompression Mode: %s, Data Size: %d", @@ -132,6 +129,69 @@ private void test(int size, CompressionCodecName codec, boolean useOnHeapCompres } } + private static void validateDecompress( + int size, + CompressionCodecName codec, + Decompression decomp, + BytesInput compressed, + ByteBufferAllocator allocator, + BytesInputDecompressor d, + ByteBuffer rawBuf, + byte[] rawArr) + throws IOException { + switch (decomp) { + case OFF_HEAP: { + final ByteBuffer buf = compressed.toByteBuffer(); + final ByteBuffer b = allocator.allocate(buf.capacity() + 20); + final ByteBuffer outBuf = allocator.allocate(size + 20); + final int shift = 10; + try { + b.position(shift); + b.put(buf); + b.position(shift); + outBuf.position(shift); + d.decompress(b, (int) compressed.size(), outBuf, size); + Assert.assertEquals( + "Input buffer position mismatch for codec " + codec, + compressed.size() + shift, + b.position()); + Assert.assertEquals( + "Output buffer position mismatch for codec " + codec, size + shift, outBuf.position()); + for (int i = 0; i < size; i++) { + Assert.assertTrue( + String.format("Data didn't match at %d, while testing codec %s", i, codec), + outBuf.get(shift + i) == rawBuf.get(i)); + } + } finally { + allocator.release(b); + allocator.release(outBuf); + } + break; + } + + case OFF_HEAP_BYTES_INPUT: { + final ByteBuffer buf = compressed.toByteBuffer(); + final ByteBuffer b = allocator.allocate(buf.limit()); + try { + b.put(buf); + b.flip(); + final BytesInput input = d.decompress(BytesInput.from(b), size); + Assert.assertArrayEquals( + String.format("While testing codec %s", codec), input.toByteArray(), rawArr); + } finally { + allocator.release(b); + } + break; + } + case ON_HEAP: { + final byte[] buf = compressed.toByteArray(); + final BytesInput input = d.decompress(BytesInput.from(buf), size); + Assert.assertArrayEquals(String.format("While testing codec %s", codec), input.toByteArray(), rawArr); + break; + } + } + } + @Test public void createDirectFactoryWithHeapAllocatorFails() { String errorMsg =