Skip to content

Commit

Permalink
[Feature] Adapt to sink v2 api FLIP-191
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Oct 31, 2023
1 parent f55a5b8 commit a416061
Show file tree
Hide file tree
Showing 13 changed files with 632 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;

public class StarRocksStreamLoadListener implements StreamLoadListener {
Expand All @@ -46,22 +47,22 @@ public class StarRocksStreamLoadListener implements StreamLoadListener {
private transient Histogram writeDataTimeMs;
private transient Histogram loadTimeMs;

public StarRocksStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions) {
totalFlushBytes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES);
totalFlushRows = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS);
totalFlushTime = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME);
totalFlushTimeWithoutRetries = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
totalFlushSucceededTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
totalFlushFailedTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
flushTimeNs = context.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
offerTimeNs = context.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
public StarRocksStreamLoadListener(MetricGroup metricGroup, StarRocksSinkOptions sinkOptions) {
totalFlushBytes = metricGroup.counter(COUNTER_TOTAL_FLUSH_BYTES);
totalFlushRows = metricGroup.counter(COUNTER_TOTAL_FLUSH_ROWS);
totalFlushTime = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME);
totalFlushTimeWithoutRetries = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
totalFlushSucceededTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
totalFlushFailedTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
flushTimeNs = metricGroup.histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
offerTimeNs = metricGroup.histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));

totalFilteredRows = context.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS);
commitAndPublishTimeMs = context.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
streamLoadPlanTimeMs = context.getMetricGroup().histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
readDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
writeDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
loadTimeMs = context.getMetricGroup().histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
totalFilteredRows = metricGroup.counter(COUNTER_NUMBER_FILTERED_ROWS);
commitAndPublishTimeMs = metricGroup.histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
streamLoadPlanTimeMs = metricGroup.histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
readDataTimeMs = metricGroup.histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
writeDataTimeMs = metricGroup.histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
loadTimeMs = metricGroup.histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ public void setTableSchema(TableSchema ts) {

@Override
public void setRuntimeContext(RuntimeContext runtimeCtx) {
final TypeSerializer<RowData> typeSerializer = rowDataTypeInfo.createSerializer(runtimeCtx.getExecutionConfig());
valueTransform = runtimeCtx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity();
// No need to copy the value even if object reuse is enabled,
// because the raw RowData value will not be buffered
this.valueTransform = Function.identity();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void open(Configuration parameters) throws Exception {
if (serializer != null) {
this.serializer.open(new StarRocksISerializer.SerializerContext(getOrCreateJsonWrapper()));
}
this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext(), sinkOptions);
this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext().getMetricGroup(), sinkOptions);
sinkManager.setStreamLoadListener(streamLoadListener);

LabelGeneratorFactory labelGeneratorFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package com.starrocks.connector.flink.table.sink;

import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import com.starrocks.connector.flink.table.sink.v2.StarRocksSink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.RowData;

public class StarRocksDynamicTableSink implements DynamicTableSink {
Expand All @@ -41,12 +43,19 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
@SuppressWarnings("unchecked")
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final TypeInformation<RowData> rowDataTypeInfo = context.createTypeInformation(flinkSchema.toRowDataType());
StarRocksDynamicSinkFunctionBase<RowData> starrocksSinkFunction = SinkFunctionFactory.createSinkFunction(
sinkOptions,
flinkSchema,
new StarRocksTableRowTransformer(rowDataTypeInfo)
);
return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism());
if (sinkOptions.isUseUnifiedSinkApi()) {
StarRocksSink<RowData> starRocksSink = new StarRocksSink<>(
sinkOptions, flinkSchema, new StarRocksTableRowTransformer(rowDataTypeInfo));
return SinkV2Provider.of(starRocksSink, sinkOptions.getSinkParallelism());
} else {
StarRocksDynamicSinkFunctionBase<RowData> starrocksSinkFunction =
SinkFunctionFactory.createSinkFunction(
sinkOptions,
flinkSchema,
new StarRocksTableRowTransformer(rowDataTypeInfo)
);
return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ public enum StreamLoadFormat {
"The number of transactions to check if they are lingering. -1 indicates that check until finding the first " +
"transaction that is not lingering.");

public static final ConfigOption<Boolean> SINK_USE_UNIFIED_SINK_API = ConfigOptions.key("sink.use.unified-sink-api")
.booleanType().defaultValue(false).withDescription("Whether to use the implementation with the unified sink api " +
"described in Flink FLIP-191. There is no difference for users whether to enable this flag. This is just " +
"for adapting some frameworks which only support new sink api, and Flink will also remove the old sink api " +
"in the coming 2.0. Note that it's not compatible after changing the flag, that's, you can't recover from " +
"the previous job after changing the flag.");

public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

// Sink semantic
Expand Down Expand Up @@ -351,6 +358,10 @@ public int getAbortCheckNumTxns() {
return tableOptions.get(SINK_ABORT_CHECK_NUM_TXNS);
}

public boolean isUseUnifiedSinkApi() {
return tableOptions.get(SINK_USE_UNIFIED_SINK_API);
}

private void validateStreamLoadUrl() {
tableOptions.getOptional(LOAD_URL).ifPresent(urlList -> {
for (String host : urlList) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.table.sink.v2;

import com.starrocks.data.load.stream.StreamLoadSnapshot;

public class StarRocksCommittable {

private final StreamLoadSnapshot labelSnapshot;

public StarRocksCommittable(StreamLoadSnapshot labelSnapshot) {
this.labelSnapshot = labelSnapshot;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.table.sink.v2;

import com.starrocks.connector.flink.tools.JsonWrapper;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;

public class StarRocksCommittableSerializer implements SimpleVersionedSerializer<StarRocksCommittable> {

private final JsonWrapper jsonWrapper;

public StarRocksCommittableSerializer() {
this.jsonWrapper = new JsonWrapper();
}

@Override
public int getVersion() {
return 1;
}

@Override
public byte[] serialize(StarRocksCommittable committable) throws IOException {
return jsonWrapper.toJSONBytes(committable);
}

@Override
public StarRocksCommittable deserialize(int version, byte[] serialized) throws IOException {
return jsonWrapper.parseObject(serialized, StarRocksCommittable.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.table.sink.v2;

import org.apache.flink.api.connector.sink2.Committer;

import java.io.IOException;
import java.util.Collection;

public class StarRocksCommitter implements Committer<StarRocksCommittable> {

public StarRocksCommitter() {
}

@Override
public void commit(Collection<CommitRequest<StarRocksCommittable>> committables)
throws IOException, InterruptedException {

}

@Override
public void close() throws Exception {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.table.sink.v2;

import com.starrocks.connector.flink.manager.StarRocksSinkTable;
import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
import com.starrocks.connector.flink.row.sink.StarRocksISerializer;
import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.api.TableSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

public class StarRocksSink<InputT>
implements StatefulSink<InputT, StarRocksWriterState>, TwoPhaseCommittingSink<InputT, StarRocksCommittable> {

private static final Logger LOG = LoggerFactory.getLogger(StarRocksSink.class);

private final StarRocksSinkOptions sinkOptions;
private final StarRocksISerializer serializer;
private final StarRocksIRowTransformer<InputT> rowTransformer;
private final StreamLoadProperties streamLoadProperties;


public StarRocksSink(StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer<InputT> rowTransformer) {
this.sinkOptions = sinkOptions;
this.rowTransformer = rowTransformer;
StarRocksSinkTable sinkTable = StarRocksSinkTable.builder()
.sinkOptions(sinkOptions)
.build();
sinkTable.validateTableStructure(sinkOptions, schema);
// StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete which is decided in
// StarRocksSinkTable#validateTableStructure, so create serializer after validating table structure
this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames());
rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping());
rowTransformer.setTableSchema(schema);
this.streamLoadProperties = sinkOptions.getProperties(sinkTable);
}

public StarRocksSink(StarRocksSinkOptions sinkOptions) {
this.sinkOptions = sinkOptions;
this.serializer = null;
this.rowTransformer = null;
this.streamLoadProperties = sinkOptions.getProperties(null);
}

@Override
public StarRocksWriter<InputT> createWriter(InitContext context) throws IOException {
return restoreWriter(context, Collections.emptyList());
}

@Override
public StarRocksWriter<InputT> restoreWriter(InitContext context, Collection<StarRocksWriterState> recoveredState)
throws IOException {
try {
return new StarRocksWriter<>(
sinkOptions,
serializer,
rowTransformer,
streamLoadProperties,
context,
Collections.emptyList());
} catch (Exception e) {
throw new RuntimeException("Failed to create writer.", e);
}
}

@Override
public SimpleVersionedSerializer<StarRocksWriterState> getWriterStateSerializer() {
return new StarRocksWriterStateSerializer();
}

@Override
public Committer<StarRocksCommittable> createCommitter() throws IOException {
return new StarRocksCommitter();
}

@Override
public SimpleVersionedSerializer<StarRocksCommittable> getCommittableSerializer() {
return new StarRocksCommittableSerializer();
}
}
Loading

0 comments on commit a416061

Please sign in to comment.