Skip to content

Commit

Permalink
PARQUET-2440: Avoid getting Hadoop codec for internal compressor/deco…
Browse files Browse the repository at this point in the history
…mpressor
  • Loading branch information
gszadovszky committed Feb 29, 2024
1 parent 0eec215 commit 633d311
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;
Expand All @@ -52,6 +54,40 @@ public class CodecFactory implements CompressionCodecFactory {
protected final ParquetConfiguration configuration;
protected final int pageSize;

static final BytesDecompressor NO_OP_DECOMPRESSOR = new BytesDecompressor() {
@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) {
Preconditions.checkArgument(
compressedSize == uncompressedSize,
"Non-compressed data did not have matching compressed and uncompressed sizes.");
output.clear();
output.put((ByteBuffer) input.duplicate().position(0).limit(compressedSize));
}

@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) {
return bytes;
}

@Override
public void release() {}
};

static final BytesCompressor NO_OP_COMPRESSOR = new BytesCompressor() {
@Override
public BytesInput compress(BytesInput bytes) {
return bytes;
}

@Override
public CompressionCodecName getCodecName() {
return CompressionCodecName.UNCOMPRESSED;
}

@Override
public void release() {}
};

/**
* Create a new codec factory.
*
Expand Down Expand Up @@ -108,37 +144,28 @@ class HeapBytesDecompressor extends BytesDecompressor {
private final CompressionCodec codec;
private final Decompressor decompressor;

HeapBytesDecompressor(CompressionCodecName codecName) {
this.codec = getCodec(codecName);
if (codec != null) {
decompressor = CodecPool.getDecompressor(codec);
} else {
decompressor = null;
}
HeapBytesDecompressor(CompressionCodec codec) {
this.codec = Objects.requireNonNull(codec);
decompressor = CodecPool.getDecompressor(codec);
}

@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
final BytesInput decompressed;
if (codec != null) {
if (decompressor != null) {
decompressor.reset();
}
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

// We need to explicitly close the ZstdDecompressorStream here to release the resources it holds to
// avoid
// off-heap memory fragmentation issue, see https://issues.apache.org/jira/browse/PARQUET-2160.
// This change will load the decompressor stream into heap a little earlier, since the problem it solves
// only happens in the ZSTD codec, so this modification is only made for ZSTD streams.
if (codec instanceof ZstandardCodec) {
decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
is.close();
} else {
decompressed = BytesInput.from(is, uncompressedSize);
}
if (decompressor != null) {
decompressor.reset();
}
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);

// We need to explicitly close the ZstdDecompressorStream here to release the resources it holds to
// avoid off-heap memory fragmentation issue, see https://issues.apache.org/jira/browse/PARQUET-2160.
// This change will load the decompressor stream into heap a little earlier, since the problem it solves
// only happens in the ZSTD codec, so this modification is only made for ZSTD streams.
if (codec instanceof ZstandardCodec) {
decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
is.close();
} else {
decompressed = bytes;
decompressed = BytesInput.from(is, uncompressedSize);
}
return decompressed;
}
Expand Down Expand Up @@ -168,36 +195,25 @@ class HeapBytesCompressor extends BytesCompressor {
private final ByteArrayOutputStream compressedOutBuffer;
private final CompressionCodecName codecName;

HeapBytesCompressor(CompressionCodecName codecName) {
HeapBytesCompressor(CompressionCodecName codecName, CompressionCodec codec) {
this.codecName = codecName;
this.codec = getCodec(codecName);
if (codec != null) {
this.compressor = CodecPool.getCompressor(codec);
this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
} else {
this.compressor = null;
this.compressedOutBuffer = null;
}
this.codec = Objects.requireNonNull(codec);
this.compressor = CodecPool.getCompressor(codec);
this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
}

@Override
public BytesInput compress(BytesInput bytes) throws IOException {
final BytesInput compressedBytes;
if (codec == null) {
compressedBytes = bytes;
} else {
compressedOutBuffer.reset();
if (compressor != null) {
// null compressor for non-native gzip
compressor.reset();
}
try (CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor)) {
bytes.writeAllTo(cos);
cos.finish();
}
compressedBytes = BytesInput.from(compressedOutBuffer);
compressedOutBuffer.reset();
if (compressor != null) {
// null compressor for non-native gzip
compressor.reset();
}
try (CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor)) {
bytes.writeAllTo(cos);
cos.finish();
}
return compressedBytes;
return BytesInput.from(compressedOutBuffer);
}

@Override
Expand Down Expand Up @@ -233,11 +249,13 @@ public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
}

protected BytesCompressor createCompressor(CompressionCodecName codecName) {
return new HeapBytesCompressor(codecName);
CompressionCodec codec = getCodec(codecName);
return codec == null ? NO_OP_COMPRESSOR : new HeapBytesCompressor(codecName, codec);
}

protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
return new HeapBytesDecompressor(codecName);
CompressionCodec codec = getCodec(codecName);
return codec == null ? NO_OP_DECOMPRESSOR : new HeapBytesDecompressor(codec);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,35 +96,37 @@ class DirectCodecFactory extends CodecFactory implements AutoCloseable {

@Override
protected BytesCompressor createCompressor(final CompressionCodecName codecName) {

CompressionCodec codec = getCodec(codecName);
if (codec == null) {
return new NoopCompressor();
} else if (codecName == CompressionCodecName.SNAPPY) {
// avoid using the default Snappy codec since it allocates direct buffers at awkward spots.
return new SnappyCompressor();
} else if (codecName == CompressionCodecName.ZSTD) {
return new ZstdCompressor();
} else {
// todo: create class similar to the SnappyCompressor for zlib and exclude it as
// snappy is above since it also generates allocateDirect calls.
return new HeapBytesCompressor(codecName);
switch (codecName) {
case SNAPPY:
// avoid using the default Snappy codec since it allocates direct buffers at awkward spots.
return new SnappyCompressor();
case ZSTD:
return new ZstdCompressor();
// todo: create class similar to the SnappyCompressor for zlib and exclude it as
// snappy is above since it also generates allocateDirect calls.
default:
return super.createCompressor(codecName);
}
}

@Override
protected BytesDecompressor createDecompressor(final CompressionCodecName codecName) {
CompressionCodec codec = getCodec(codecName);
if (codec == null) {
return new NoopDecompressor();
} else if (codecName == CompressionCodecName.SNAPPY) {
return new SnappyDecompressor();
} else if (codecName == CompressionCodecName.ZSTD) {
return new ZstdDecompressor();
} else if (DirectCodecPool.INSTANCE.codec(codec).supportsDirectDecompression()) {
return new FullDirectDecompressor(codecName);
} else {
return new IndirectDecompressor(codec);
switch (codecName) {
case SNAPPY:
return new SnappyDecompressor();
case ZSTD:
return new ZstdDecompressor();
default:
CompressionCodec codec = getCodec(codecName);
if (codec == null) {
return NO_OP_DECOMPRESSOR;
}
DirectCodecPool.CodecPool pool = DirectCodecPool.INSTANCE.codec(codec);
if (pool.supportsDirectDecompression()) {
return new FullDirectDecompressor(pool.borrowDirectDecompressor());
} else {
return new IndirectDecompressor(pool.borrowDecompressor());
}
}
}

Expand All @@ -140,7 +142,11 @@ public class IndirectDecompressor extends BytesDecompressor {
private final Decompressor decompressor;

public IndirectDecompressor(CompressionCodec codec) {
this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor();
this(DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor());
}

private IndirectDecompressor(Decompressor decompressor) {
this.decompressor = decompressor;
}

@Override
Expand Down Expand Up @@ -280,8 +286,13 @@ public class FullDirectDecompressor extends BaseDecompressor {
private final Object decompressor;

public FullDirectDecompressor(CompressionCodecName codecName) {
CompressionCodec codec = getCodec(codecName);
this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDirectDecompressor();
this(DirectCodecPool.INSTANCE
.codec(Objects.requireNonNull(getCodec(codecName)))
.borrowDirectDecompressor());
}

private FullDirectDecompressor(Object decompressor) {
this.decompressor = decompressor;
}

@Override
Expand Down Expand Up @@ -320,6 +331,10 @@ void closeDecompressor() {
}
}

/**
* @deprecated Use {@link CodecFactory#NO_OP_DECOMPRESSOR} instead
*/
@Deprecated
public class NoopDecompressor extends BytesDecompressor {

@Override
Expand Down Expand Up @@ -424,6 +439,10 @@ void closeCompressor() {
}
}

/**
* @deprecated Use {@link CodecFactory#NO_OP_COMPRESSOR} instead
*/
@Deprecated
public static class NoopCompressor extends BytesCompressor {

public NoopCompressor() {}
Expand Down

0 comments on commit 633d311

Please sign in to comment.