Skip to content

Commit

Permalink
[Feature] Adapt to sink v2 api FLIP-191
Browse files Browse the repository at this point in the history
Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Nov 1, 2023
1 parent f55a5b8 commit 94ced62
Show file tree
Hide file tree
Showing 15 changed files with 721 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.data.load.stream.StreamLoadResponse;
import com.starrocks.data.load.stream.v2.StreamLoadListener;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;

public class StarRocksStreamLoadListener implements StreamLoadListener {
Expand All @@ -46,22 +46,22 @@ public class StarRocksStreamLoadListener implements StreamLoadListener {
private transient Histogram writeDataTimeMs;
private transient Histogram loadTimeMs;

public StarRocksStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions) {
totalFlushBytes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES);
totalFlushRows = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS);
totalFlushTime = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME);
totalFlushTimeWithoutRetries = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
totalFlushSucceededTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
totalFlushFailedTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
flushTimeNs = context.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
offerTimeNs = context.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
public StarRocksStreamLoadListener(MetricGroup metricGroup, StarRocksSinkOptions sinkOptions) {
totalFlushBytes = metricGroup.counter(COUNTER_TOTAL_FLUSH_BYTES);
totalFlushRows = metricGroup.counter(COUNTER_TOTAL_FLUSH_ROWS);
totalFlushTime = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME);
totalFlushTimeWithoutRetries = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
totalFlushSucceededTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
totalFlushFailedTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
flushTimeNs = metricGroup.histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
offerTimeNs = metricGroup.histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));

totalFilteredRows = context.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS);
commitAndPublishTimeMs = context.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
streamLoadPlanTimeMs = context.getMetricGroup().histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
readDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
writeDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
loadTimeMs = context.getMetricGroup().histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
totalFilteredRows = metricGroup.counter(COUNTER_NUMBER_FILTERED_ROWS);
commitAndPublishTimeMs = metricGroup.histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
streamLoadPlanTimeMs = metricGroup.histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
readDataTimeMs = metricGroup.histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
writeDataTimeMs = metricGroup.histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
loadTimeMs = metricGroup.histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.starrocks.connector.flink.tools.JsonWrapper;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.GenericArrayData;
Expand Down Expand Up @@ -78,8 +77,9 @@ public void setTableSchema(TableSchema ts) {

@Override
public void setRuntimeContext(RuntimeContext runtimeCtx) {
final TypeSerializer<RowData> typeSerializer = rowDataTypeInfo.createSerializer(runtimeCtx.getExecutionConfig());
valueTransform = runtimeCtx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity();
// No need to copy the value even if object reuse is enabled,
// because the raw RowData value will not be buffered
this.valueTransform = Function.identity();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.starrocks.connector.flink.table.sink;

import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer;
import com.starrocks.connector.flink.table.sink.v2.StarRocksSink;
import org.apache.flink.table.api.TableSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -114,4 +115,23 @@ public static <T> StarRocksDynamicSinkFunctionBase<T> createSinkFunction(StarRoc
throw new UnsupportedOperationException("Unsupported sink type " + sinkVersion.name());
}
}

public static <T> StarRocksSink<T> createSink(
StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer<T> rowTransformer) {
detectStarRocksFeature(sinkOptions);
SinkVersion sinkVersion = getSinkVersion(sinkOptions);
if (sinkVersion == SinkVersion.V2) {
return new StarRocksSink<>(sinkOptions, schema, rowTransformer);
}
throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name());
}

public static <T> StarRocksSink<T> createSink(StarRocksSinkOptions sinkOptions) {
detectStarRocksFeature(sinkOptions);
SinkVersion sinkVersion = getSinkVersion(sinkOptions);
if (sinkVersion == SinkVersion.V2) {
return new StarRocksSink<>(sinkOptions);
}
throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public void open(Configuration parameters) throws Exception {
if (serializer != null) {
this.serializer.open(new StarRocksISerializer.SerializerContext(getOrCreateJsonWrapper()));
}
this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext(), sinkOptions);
this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext().getMetricGroup(), sinkOptions);
sinkManager.setStreamLoadListener(streamLoadListener);

LabelGeneratorFactory labelGeneratorFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package com.starrocks.connector.flink.table.sink;

import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer;
import com.starrocks.connector.flink.table.sink.v2.StarRocksSink;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.data.RowData;

public class StarRocksDynamicTableSink implements DynamicTableSink {
Expand All @@ -41,12 +43,23 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
@SuppressWarnings("unchecked")
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final TypeInformation<RowData> rowDataTypeInfo = context.createTypeInformation(flinkSchema.toRowDataType());
StarRocksDynamicSinkFunctionBase<RowData> starrocksSinkFunction = SinkFunctionFactory.createSinkFunction(
sinkOptions,
flinkSchema,
new StarRocksTableRowTransformer(rowDataTypeInfo)
);
return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism());
if (sinkOptions.isUseUnifiedSinkApi()) {
StarRocksSink<RowData> starRocksSink =
SinkFunctionFactory.createSink(
sinkOptions,
flinkSchema,
new StarRocksTableRowTransformer(rowDataTypeInfo)
);
return SinkV2Provider.of(starRocksSink, sinkOptions.getSinkParallelism());
} else {
StarRocksDynamicSinkFunctionBase<RowData> starrocksSinkFunction =
SinkFunctionFactory.createSinkFunction(
sinkOptions,
flinkSchema,
new StarRocksTableRowTransformer(rowDataTypeInfo)
);
return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(StarRocksSinkOptions.SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN);
optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_LINGERING_TXNS);
optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_CHECK_NUM_TXNS);
optionalOptions.add(StarRocksSinkOptions.SINK_USE_NEW_SINK_API);
return optionalOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ public enum StreamLoadFormat {
"The number of transactions to check if they are lingering. -1 indicates that check until finding the first " +
"transaction that is not lingering.");

public static final ConfigOption<Boolean> SINK_USE_NEW_SINK_API = ConfigOptions.key("sink.use.new-sink-api")
.booleanType().defaultValue(false).withDescription("Whether to use the implementation with the unified sink api " +
"described in Flink FLIP-191. There is no difference for users whether to enable this flag. This is just " +
"for adapting some frameworks which only support new sink api, and Flink will also remove the old sink api " +
"in the coming 2.0. Note that it's not compatible after changing the flag, that's, you can't recover from " +
"the previous job after changing the flag.");

public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

// Sink semantic
Expand Down Expand Up @@ -351,6 +358,10 @@ public int getAbortCheckNumTxns() {
return tableOptions.get(SINK_ABORT_CHECK_NUM_TXNS);
}

public boolean isUseUnifiedSinkApi() {
return tableOptions.get(SINK_USE_NEW_SINK_API);
}

private void validateStreamLoadUrl() {
tableOptions.getOptional(LOAD_URL).ifPresent(urlList -> {
for (String host : urlList) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.table.sink.v2;

import com.starrocks.data.load.stream.StreamLoadSnapshot;

public class StarRocksCommittable {

private final StreamLoadSnapshot labelSnapshot;

public StarRocksCommittable(StreamLoadSnapshot labelSnapshot) {
this.labelSnapshot = labelSnapshot;
}

public StreamLoadSnapshot getLabelSnapshot() {
return labelSnapshot;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.table.sink.v2;

import com.starrocks.connector.flink.tools.JsonWrapper;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import java.io.IOException;

public class StarRocksCommittableSerializer implements SimpleVersionedSerializer<StarRocksCommittable> {

private final JsonWrapper jsonWrapper;

public StarRocksCommittableSerializer() {
this.jsonWrapper = new JsonWrapper();
}

@Override
public int getVersion() {
return 1;
}

@Override
public byte[] serialize(StarRocksCommittable committable) throws IOException {
return jsonWrapper.toJSONBytes(committable);
}

@Override
public StarRocksCommittable deserialize(int version, byte[] serialized) throws IOException {
return jsonWrapper.parseObject(serialized, StarRocksCommittable.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2021-present StarRocks, Inc. All rights reserved.
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.starrocks.connector.flink.table.sink.v2;

import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
import com.starrocks.connector.flink.tools.EnvUtils;
import com.starrocks.data.load.stream.properties.StreamLoadProperties;
import com.starrocks.data.load.stream.v2.StreamLoadManagerV2;
import org.apache.flink.api.connector.sink2.Committer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;

public class StarRocksCommitter implements Committer<StarRocksCommittable> {

private static final Logger LOG = LoggerFactory.getLogger(StarRocksCommitter.class);

private final int maxRetries;
private final StreamLoadManagerV2 sinkManager;

public StarRocksCommitter(
StarRocksSinkOptions sinkOptions,
StreamLoadProperties streamLoadProperties) {
this.maxRetries = sinkOptions.getSinkMaxRetries();
this.sinkManager = new StreamLoadManagerV2(streamLoadProperties,
sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE);
try {
// TODO no need to start flush thread in sinkManager
sinkManager.init();
} catch (Exception e) {
LOG.error("Failed to init sink manager.", e);
try {
sinkManager.close();
} catch (Exception ie) {
LOG.error("Failed to close sink manager after init failure.", ie);
}
throw new RuntimeException("Failed to init sink manager", e);
}
LOG.info("Create StarRocksCommitter, maxRetries: {}. {}", maxRetries, EnvUtils.getGitInformation());
}

@Override
public void commit(Collection<CommitRequest<StarRocksCommittable>> committables)
throws IOException, InterruptedException {
for (CommitRequest<StarRocksCommittable> commitRequest : committables) {
StarRocksCommittable committable = commitRequest.getCommittable();
RuntimeException firstException = null;
for (int i = 0; i <= maxRetries; i++) {
try {
boolean success = sinkManager.commit(committable.getLabelSnapshot());
if (success) {
break;
}
throw new RuntimeException("Please see the taskmanager log for the failure reason");
} catch (Exception e) {
LOG.error("Fail to commit after {} retries, max retries: {}", i, maxRetries, e);
if (firstException != null) {
firstException = new RuntimeException("Failed to commit", e);
}
}
}
if (firstException != null) {
throw firstException;
}
}
}

@Override
public void close() throws Exception {
LOG.info("Close StarRocksCommitter.");
if(sinkManager != null) {
sinkManager.close();
}
}
}
Loading

0 comments on commit 94ced62

Please sign in to comment.