diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java index e5a39851..7c2191d1 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java @@ -25,8 +25,8 @@ import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; import com.starrocks.connector.flink.row.sink.StarRocksISerializer; import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; -import com.starrocks.connector.flink.table.sink.v2.RecordSerializer; -import com.starrocks.connector.flink.table.sink.v2.RowDataSerializer; +import com.starrocks.connector.flink.table.sink.v2.StarRocksRecordSerializationSchema; +import com.starrocks.connector.flink.table.sink.v2.RowDataSerializationSchema; import com.starrocks.connector.flink.table.sink.v2.StarRocksSink; import com.starrocks.data.load.stream.properties.StreamLoadProperties; import org.slf4j.Logger; @@ -138,7 +138,7 @@ public static StarRocksSink createSink( StarRocksISerializer serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames()); rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping()); rowTransformer.setTableSchema(schema); - RowDataSerializer recordSerializer = new RowDataSerializer( + RowDataSerializationSchema serializationSchema = new RowDataSerializationSchema( sinkOptions.getDatabaseName(), sinkOptions.getTableName(), sinkOptions.supportUpsertDelete(), @@ -146,13 +146,13 @@ public static StarRocksSink createSink( serializer, rowTransformer); StreamLoadProperties streamLoadProperties = sinkOptions.getProperties(sinkTable); - return new StarRocksSink<>(sinkOptions, recordSerializer, streamLoadProperties); + return new StarRocksSink<>(sinkOptions, serializationSchema, streamLoadProperties); } throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name()); } public static StarRocksSink createSink( - StarRocksSinkOptions sinkOptions, RecordSerializer recordSerializer) { + StarRocksSinkOptions sinkOptions, StarRocksRecordSerializationSchema recordSerializer) { detectStarRocksFeature(sinkOptions); SinkVersion sinkVersion = getSinkVersion(sinkOptions); if (sinkVersion == SinkVersion.V2) { diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/RowDataSerializer.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/RowDataSerializationSchema.java similarity index 96% rename from src/main/java/com/starrocks/connector/flink/table/sink/v2/RowDataSerializer.java rename to src/main/java/com/starrocks/connector/flink/table/sink/v2/RowDataSerializationSchema.java index 7d9a1a65..cd029d84 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/v2/RowDataSerializer.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/RowDataSerializationSchema.java @@ -30,7 +30,7 @@ import com.starrocks.connector.flink.tools.JsonWrapper; /** Serializer for the {@link RowData} record. */ -public class RowDataSerializer implements RecordSerializer { +public class RowDataSerializationSchema implements StarRocksRecordSerializationSchema { private static final long serialVersionUID = 1L; @@ -42,7 +42,7 @@ public class RowDataSerializer implements RecordSerializer { private final StarRocksIRowTransformer rowTransformer; private transient DefaultStarRocksRowData reusableRowData; - public RowDataSerializer( + public RowDataSerializationSchema( String databaseName, String tableName, boolean supportUpsertDelete, diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/RecordSerializer.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksRecordSerializationSchema.java similarity index 86% rename from src/main/java/com/starrocks/connector/flink/table/sink/v2/RecordSerializer.java rename to src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksRecordSerializationSchema.java index ff14932c..adbf3b73 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/v2/RecordSerializer.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksRecordSerializationSchema.java @@ -25,11 +25,12 @@ import java.io.Serializable; /** - * Interface for the input record serialization. + * A serialization schema which defines how to convert a value + * of type {@code T} to {@link StarRocksRowData}. * * @param the type of input record being serialized */ -public interface RecordSerializer extends Serializable { +public interface StarRocksRecordSerializationSchema extends Serializable { /** Open the serializer. */ void open(); diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java index a5095e11..50ed33e3 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java @@ -42,15 +42,15 @@ public class StarRocksSink private static final Logger LOG = LoggerFactory.getLogger(StarRocksSink.class); private final StarRocksSinkOptions sinkOptions; - private final RecordSerializer recordSerializer; + private final StarRocksRecordSerializationSchema serializationSchema; private final StreamLoadProperties streamLoadProperties; public StarRocksSink( StarRocksSinkOptions sinkOptions, - RecordSerializer recordSerializer, + StarRocksRecordSerializationSchema serializationSchema, StreamLoadProperties streamLoadProperties) { this.sinkOptions = sinkOptions; - this.recordSerializer = recordSerializer; + this.serializationSchema = serializationSchema; this.streamLoadProperties = streamLoadProperties; } @@ -65,7 +65,7 @@ public StarRocksWriter restoreWriter(InitContext context, Collection( sinkOptions, - recordSerializer, + serializationSchema, streamLoadProperties, context, Collections.emptyList()); diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java index 8a2dd269..81124f7d 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java @@ -53,7 +53,7 @@ public class StarRocksWriter private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriter.class); private final StarRocksSinkOptions sinkOptions; - private final RecordSerializer recordSerializer; + private final StarRocksRecordSerializationSchema serializationSchema; private final StarRocksStreamLoadListener streamLoadListener; private final LabelGeneratorFactory labelGeneratorFactory; private final StreamLoadManagerV2 sinkManager; @@ -61,13 +61,13 @@ public class StarRocksWriter public StarRocksWriter( StarRocksSinkOptions sinkOptions, - RecordSerializer recordSerializer, + StarRocksRecordSerializationSchema serializationSchema, StreamLoadProperties streamLoadProperties, Sink.InitContext initContext, Collection recoveredState) throws Exception { this.sinkOptions = sinkOptions; - this.recordSerializer = recordSerializer; - this.recordSerializer.open(); + this.serializationSchema = serializationSchema; + this.serializationSchema.open(); this.streamLoadListener = new StarRocksStreamLoadListener(initContext.metricGroup(), sinkOptions); long restoredCheckpointId = initContext.getRestoredCheckpointId() .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); @@ -134,7 +134,7 @@ public StarRocksWriter( @Override public void write(InputT element, Context context) throws IOException, InterruptedException { - StarRocksRowData rowData = recordSerializer.serialize(element); + StarRocksRowData rowData = serializationSchema.serialize(element); sinkManager.write(rowData.getUniqueKey(), rowData.getDatabase(), rowData.getTable(), rowData.getRow()); totalReceivedRows += 1; if (totalReceivedRows % 100 == 1) { @@ -178,7 +178,7 @@ public List snapshotState(long checkpointId) throws IOExce @Override public void close() throws Exception { LOG.info("Close StarRocksWriter"); - recordSerializer.close(); + serializationSchema.close(); if (sinkManager != null) { try { StreamLoadSnapshot snapshot = sinkManager.snapshot();