Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing extra metadata in scio-parquet #5411

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@

public class WriterUtils {
public static <T, SELF extends ParquetWriter.Builder<T, SELF>> ParquetWriter<T> build(
ParquetWriter.Builder<T, SELF> builder, Configuration conf, CompressionCodecName compression)
ParquetWriter.Builder<T, SELF> builder,
Configuration conf,
CompressionCodecName compression,
Map<String, String> extraMetadata)
throws IOException {
// https://github.com/apache/parquet-mr/tree/master/parquet-hadoop#class-parquetoutputformat
long rowGroupSize =
Expand All @@ -53,6 +56,10 @@ public static <T, SELF extends ParquetWriter.Builder<T, SELF>> ParquetWriter<T>
builder = builder.withBloomFilterNDV(entry.getKey(), entry.getValue());
}

if (extraMetadata != null) {
builder = builder.withExtraMetaData(extraMetadata);
}

return builder
.withConf(conf)
.withCompressionCodec(compression)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.spotify.scio.parquet.BeamOutputFile;
import com.spotify.scio.parquet.WriterUtils;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
Expand All @@ -40,22 +41,25 @@ public class ParquetAvroFileBasedSink<T> extends FileBasedSink<T, Void, T> {
private final String schemaString;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private final Map<String, String> extraMetadata;

public ParquetAvroFileBasedSink(
ValueProvider<ResourceId> baseOutputFileName,
FileBasedSink.DynamicDestinations<T, Void, T> dynamicDestinations,
Schema schema,
Configuration conf,
CompressionCodecName compression) {
CompressionCodecName compression,
Map<String, String> extraMetadata) {
super(baseOutputFileName, dynamicDestinations);
this.schemaString = schema.toString();
this.conf = new SerializableConfiguration(conf);
this.compression = compression;
this.extraMetadata = extraMetadata;
}

@Override
public FileBasedSink.WriteOperation<Void, T> createWriteOperation() {
return new ParquetAvroWriteOperation<T>(this, schemaString, conf, compression);
return new ParquetAvroWriteOperation<T>(this, schemaString, conf, compression, extraMetadata);
}

// =======================================================================
Expand All @@ -67,22 +71,25 @@ static class ParquetAvroWriteOperation<T> extends WriteOperation<Void, T> {
private final String schemaString;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private final Map<String, String> extraMetadata;

public ParquetAvroWriteOperation(
FileBasedSink<T, Void, T> sink,
String schemaString,
SerializableConfiguration conf,
CompressionCodecName compression) {
CompressionCodecName compression,
Map<String, String> extraMetadata) {
super(sink);
this.schemaString = schemaString;
this.conf = conf;
this.compression = compression;
this.extraMetadata = extraMetadata;
}

@Override
public Writer<Void, T> createWriter() throws Exception {
return new ParquetAvroWriter<>(
this, new Schema.Parser().parse(schemaString), conf, compression);
this, new Schema.Parser().parse(schemaString), conf, compression, extraMetadata);
}
}

Expand All @@ -95,17 +102,20 @@ static class ParquetAvroWriter<T> extends FileBasedSink.Writer<Void, T> {
private final Schema schema;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private final Map<String, String> extraMetadata;
private ParquetWriter<T> writer;

public ParquetAvroWriter(
WriteOperation<Void, T> writeOperation,
Schema schema,
SerializableConfiguration conf,
CompressionCodecName compression) {
CompressionCodecName compression,
Map<String, String> extraMetadata) {
super(writeOperation, MimeTypes.BINARY);
this.schema = schema;
this.conf = conf;
this.compression = compression;
this.extraMetadata = extraMetadata;
}

@Override
Expand All @@ -128,7 +138,7 @@ protected void prepareWrite(WritableByteChannel channel) throws Exception {
ReflectionUtils.newInstance(dataModelSupplier, configuration).get());
}

writer = WriterUtils.build(builder, configuration, compression);
writer = WriterUtils.build(builder, configuration, compression, extraMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.spotify.scio.parquet.BeamOutputFile;
import com.spotify.scio.parquet.WriterUtils;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
Expand All @@ -37,22 +38,25 @@ public class ParquetExampleFileBasedSink extends FileBasedSink<Example, Void, Ex
private final Schema schema;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private final Map<String, String> extraMetadata;

public ParquetExampleFileBasedSink(
ValueProvider<ResourceId> baseOutputFileName,
FileBasedSink.DynamicDestinations<Example, Void, Example> dynamicDestinations,
Schema schema,
Configuration conf,
CompressionCodecName compression) {
CompressionCodecName compression,
Map<String, String> extraMetadata) {
super(baseOutputFileName, dynamicDestinations);
this.schema = schema;
this.conf = new SerializableConfiguration(conf);
this.compression = compression;
this.extraMetadata = extraMetadata;
}

@Override
public FileBasedSink.WriteOperation<Void, Example> createWriteOperation() {
return new ParquetExampleWriteOperation(this, schema, conf, compression);
return new ParquetExampleWriteOperation(this, schema, conf, compression, extraMetadata);
}

// =======================================================================
Expand All @@ -63,21 +67,24 @@ static class ParquetExampleWriteOperation extends FileBasedSink.WriteOperation<V
private final Schema schema;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private final Map<String, String> extraMetadata;

ParquetExampleWriteOperation(
FileBasedSink<Example, Void, Example> sink,
Schema schema,
SerializableConfiguration conf,
CompressionCodecName compression) {
CompressionCodecName compression,
Map<String, String> extraMetadata) {
super(sink);
this.schema = schema;
this.conf = conf;
this.compression = compression;
this.extraMetadata = extraMetadata;
}

@Override
public Writer<Void, Example> createWriter() throws Exception {
return new ParquetExampleWriter(this, schema, conf, compression);
return new ParquetExampleWriter(this, schema, conf, compression, extraMetadata);
}
}

Expand All @@ -90,25 +97,28 @@ static class ParquetExampleWriter extends FileBasedSink.Writer<Void, Example> {
private final Schema schema;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private final Map<String, String> extraMetadata;
private ParquetWriter<Example> writer;

public ParquetExampleWriter(
FileBasedSink.WriteOperation<Void, Example> writeOperation,
Schema schema,
SerializableConfiguration conf,
CompressionCodecName compression) {
CompressionCodecName compression,
Map<String, String> extraMetadata) {
super(writeOperation, MimeTypes.BINARY);
this.schema = schema;
this.conf = conf;
this.compression = compression;
this.extraMetadata = extraMetadata;
}

@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
BeamOutputFile outputFile = BeamOutputFile.of(channel);
TensorflowExampleParquetWriter.Builder builder =
TensorflowExampleParquetWriter.builder(outputFile).withSchema(schema);
writer = WriterUtils.build(builder, conf.get(), compression);
writer = WriterUtils.build(builder, conf.get(), compression, extraMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.spotify.scio.parquet.BeamOutputFile;
import com.spotify.scio.parquet.WriterUtils;
import java.nio.channels.WritableByteChannel;
import java.util.Map;
import magnolify.parquet.ParquetType;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.fs.ResourceId;
Expand All @@ -35,22 +36,25 @@ public class ParquetTypeFileBasedSink<T> extends FileBasedSink<T, Void, T> {
private final ParquetType<T> type;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private final Map<String, String> extraMetadata;

public ParquetTypeFileBasedSink(
ValueProvider<ResourceId> baseOutputFileName,
FileBasedSink.DynamicDestinations<T, Void, T> dynamicDestinations,
ParquetType<T> type,
Configuration conf,
CompressionCodecName compression) {
CompressionCodecName compression,
Map<String, String> extraMetadata) {
super(baseOutputFileName, dynamicDestinations);
this.type = type;
this.conf = new SerializableConfiguration(conf);
this.compression = compression;
this.extraMetadata = extraMetadata;
}

@Override
public FileBasedSink.WriteOperation<Void, T> createWriteOperation() {
return new ParquetTypeWriteOperation<>(this, type, conf, compression);
return new ParquetTypeWriteOperation<>(this, type, conf, compression, extraMetadata);
}

// =======================================================================
Expand All @@ -61,21 +65,24 @@ static class ParquetTypeWriteOperation<T> extends WriteOperation<Void, T> {
private final ParquetType<T> type;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private final Map<String, String> extraMetadata;

public ParquetTypeWriteOperation(
FileBasedSink<T, Void, T> sink,
ParquetType<T> type,
SerializableConfiguration conf,
CompressionCodecName compression) {
CompressionCodecName compression,
Map<String, String> extraMetadata) {
super(sink);
this.type = type;
this.conf = conf;
this.compression = compression;
this.extraMetadata = extraMetadata;
}

@Override
public Writer<Void, T> createWriter() throws Exception {
return new ParquetTypeWriter<>(this, type, conf, compression);
return new ParquetTypeWriter<>(this, type, conf, compression, extraMetadata);
}
}

Expand All @@ -88,24 +95,28 @@ static class ParquetTypeWriter<T> extends FileBasedSink.Writer<Void, T> {
private final ParquetType<T> type;
private final SerializableConfiguration conf;
private final CompressionCodecName compression;
private final Map<String, String> extraMetadata;
private ParquetWriter<T> writer;

public ParquetTypeWriter(
WriteOperation<Void, T> writeOperation,
ParquetType<T> type,
SerializableConfiguration conf,
CompressionCodecName compression) {
CompressionCodecName compression,
Map<String, String> extraMetadata) {
super(writeOperation, MimeTypes.BINARY);
this.type = type;
this.conf = conf;
this.compression = compression;
this.extraMetadata = extraMetadata;
}

@SuppressWarnings("unchecked")
@Override
protected void prepareWrite(WritableByteChannel channel) throws Exception {
BeamOutputFile outputFile = BeamOutputFile.of(channel);
writer = WriterUtils.build(type.writeBuilder(outputFile), conf.get(), compression);
writer =
WriterUtils.build(type.writeBuilder(outputFile), conf.get(), compression, extraMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import org.apache.parquet.filter2.predicate.FilterPredicate
import org.apache.parquet.hadoop.ParquetInputFormat
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import scala.jdk.CollectionConverters._
import scala.reflect.{classTag, ClassTag}

final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[T] {
Expand Down Expand Up @@ -85,7 +86,8 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
shardNameTemplate: String,
isWindowed: Boolean,
tempDirectory: ResourceId,
isLocalRunner: Boolean
isLocalRunner: Boolean,
metadata: Map[String, String]
) = {
require(tempDirectory != null, "tempDirectory must not be null")
val fp = FilenamePolicySupplier.resolve(
Expand All @@ -104,7 +106,8 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
dynamicDestinations,
schema,
job.getConfiguration,
compression
compression,
metadata.asJava
)
val transform = WriteFiles.to(sink).withNumShards(numShards)
if (!isWindowed) transform else transform.withWindowedWrites()
Expand All @@ -128,7 +131,8 @@ final case class ParquetAvroIO[T: ClassTag: Coder](path: String) extends ScioIO[
params.shardNameTemplate,
ScioUtil.isWindowed(data),
ScioUtil.tempDirOrDefault(params.tempDirectory, data.context),
ScioUtil.isLocalRunner(data.context.options.getRunner)
ScioUtil.isLocalRunner(data.context.options.getRunner),
params.metadata
)
)
tap(ParquetAvroIO.ReadParam(params))
Expand Down Expand Up @@ -260,6 +264,7 @@ object ParquetAvroIO {
val DefaultPrefix: String = null
val DefaultShardNameTemplate: String = null
val DefaultTempDirectory: String = null
val DefaultMetadata: Map[String, String] = null
}

final case class WriteParam private (
Expand All @@ -271,6 +276,7 @@ object ParquetAvroIO {
filenamePolicySupplier: FilenamePolicySupplier = WriteParam.DefaultFilenamePolicySupplier,
prefix: String = WriteParam.DefaultPrefix,
shardNameTemplate: String = WriteParam.DefaultShardNameTemplate,
tempDirectory: String = WriteParam.DefaultTempDirectory
tempDirectory: String = WriteParam.DefaultTempDirectory,
metadata: Map[String, String] = WriteParam.DefaultMetadata
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class SCollectionOps[T <: IndexedRecord](private val self: SCollection[T]) exten
shardNameTemplate: String = WriteParam.DefaultShardNameTemplate,
tempDirectory: String = WriteParam.DefaultTempDirectory,
filenamePolicySupplier: FilenamePolicySupplier = WriteParam.DefaultFilenamePolicySupplier,
prefix: String = WriteParam.DefaultPrefix
prefix: String = WriteParam.DefaultPrefix,
metadata: Map[String, String] = WriteParam.DefaultMetadata
)(implicit ct: ClassTag[T], coder: Coder[T]): ClosedTap[T] = {
val param = WriteParam(
schema = schema,
Expand All @@ -75,7 +76,8 @@ class SCollectionOps[T <: IndexedRecord](private val self: SCollection[T]) exten
filenamePolicySupplier = filenamePolicySupplier,
prefix = prefix,
shardNameTemplate = shardNameTemplate,
tempDirectory = tempDirectory
tempDirectory = tempDirectory,
metadata = metadata
)
self.write(ParquetAvroIO[T](path))(param)
}
Expand Down
Loading
Loading