Skip to content

Commit

Permalink
[Refactor] Rename StarRocksRecordSerializationSchema to RecordSeriali…
Browse files Browse the repository at this point in the history
…zationSchema

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Nov 28, 2023
1 parent c88c133 commit 92be62e
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import com.starrocks.connector.flink.table.sink.v2.StarRocksRecordSerializationSchema;
import com.starrocks.connector.flink.table.sink.v2.RecordSerializationSchema;
import com.starrocks.connector.flink.table.sink.v2.RowDataSerializationSchema;
import com.starrocks.connector.flink.table.sink.v2.StarRocksSink;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
Expand Down Expand Up @@ -152,12 +152,12 @@ public static StarRocksSink<RowData> createSink(
}

public static <T> StarRocksSink<T> createSink(
StarRocksSinkOptions sinkOptions, StarRocksRecordSerializationSchema<T> recordSerializer) {
StarRocksSinkOptions sinkOptions, RecordSerializationSchema<T> serializationSchema) {
detectStarRocksFeature(sinkOptions);
SinkVersion sinkVersion = getSinkVersion(sinkOptions);
if (sinkVersion == SinkVersion.V2) {
StreamLoadProperties streamLoadProperties = sinkOptions.getProperties(null);
return new StarRocksSink<>(sinkOptions, recordSerializer, streamLoadProperties);
return new StarRocksSink<>(sinkOptions, serializationSchema, streamLoadProperties);
}
throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*
* @param <T> the type of input record being serialized
*/
public interface StarRocksRecordSerializationSchema<T> extends Serializable {
public interface RecordSerializationSchema<T> extends Serializable {

/** Open the serializer. */
void open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.starrocks.connector.flink.tools.JsonWrapper;

/** Serializer for the {@link RowData} record. */
public class RowDataSerializationSchema implements StarRocksRecordSerializationSchema<RowData> {
public class RowDataSerializationSchema implements RecordSerializationSchema<RowData> {

private static final long serialVersionUID = 1L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ public class StarRocksSink<InputT>
private static final Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);

private final StarRocksSinkOptions sinkOptions;
private final StarRocksRecordSerializationSchema<InputT> serializationSchema;
private final RecordSerializationSchema<InputT> serializationSchema;
private final StreamLoadProperties streamLoadProperties;

public StarRocksSink(
StarRocksSinkOptions sinkOptions,
StarRocksRecordSerializationSchema<InputT> serializationSchema,
RecordSerializationSchema<InputT> serializationSchema,
StreamLoadProperties streamLoadProperties) {
this.sinkOptions = sinkOptions;
this.serializationSchema = serializationSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public class StarRocksWriter<InputT>
private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriter.class);

private final StarRocksSinkOptions sinkOptions;
private final StarRocksRecordSerializationSchema<InputT> serializationSchema;
private final RecordSerializationSchema<InputT> serializationSchema;
private final StarRocksStreamLoadListener streamLoadListener;
private final LabelGeneratorFactory labelGeneratorFactory;
private final StreamLoadManagerV2 sinkManager;
private long totalReceivedRows = 0;

public StarRocksWriter(
StarRocksSinkOptions sinkOptions,
StarRocksRecordSerializationSchema<InputT> serializationSchema,
RecordSerializationSchema<InputT> serializationSchema,
StreamLoadProperties streamLoadProperties,
Sink.InitContext initContext,
Collection<StarRocksWriterState> recoveredState) throws Exception {
Expand Down

0 comments on commit 92be62e

Please sign in to comment.