diff --git a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java index c88233f3..dad64948 100644 --- a/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java +++ b/src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadListener.java @@ -23,9 +23,9 @@ import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; import com.starrocks.data.load.stream.StreamLoadResponse; import com.starrocks.data.load.stream.v2.StreamLoadListener; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram; public class StarRocksStreamLoadListener implements StreamLoadListener { @@ -46,22 +46,22 @@ public class StarRocksStreamLoadListener implements StreamLoadListener { private transient Histogram writeDataTimeMs; private transient Histogram loadTimeMs; - public StarRocksStreamLoadListener(RuntimeContext context, StarRocksSinkOptions sinkOptions) { - totalFlushBytes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES); - totalFlushRows = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS); - totalFlushTime = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME); - totalFlushTimeWithoutRetries = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES); - totalFlushSucceededTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES); - totalFlushFailedTimes = context.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES); - flushTimeNs = context.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - offerTimeNs = context.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + public StarRocksStreamLoadListener(MetricGroup metricGroup, StarRocksSinkOptions sinkOptions) { + totalFlushBytes = metricGroup.counter(COUNTER_TOTAL_FLUSH_BYTES); + totalFlushRows = metricGroup.counter(COUNTER_TOTAL_FLUSH_ROWS); + totalFlushTime = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME); + totalFlushTimeWithoutRetries = metricGroup.counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES); + totalFlushSucceededTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES); + totalFlushFailedTimes = metricGroup.counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES); + flushTimeNs = metricGroup.histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + offerTimeNs = metricGroup.histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - totalFilteredRows = context.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS); - commitAndPublishTimeMs = context.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - streamLoadPlanTimeMs = context.getMetricGroup().histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - readDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - writeDataTimeMs = context.getMetricGroup().histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); - loadTimeMs = context.getMetricGroup().histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + totalFilteredRows = metricGroup.counter(COUNTER_NUMBER_FILTERED_ROWS); + commitAndPublishTimeMs = metricGroup.histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + streamLoadPlanTimeMs = metricGroup.histogram(HISTOGRAM_STREAM_LOAD_PLAN_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + readDataTimeMs = metricGroup.histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + writeDataTimeMs = metricGroup.histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); + loadTimeMs = metricGroup.histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(sinkOptions.getSinkHistogramWindowSize())); } @Override diff --git a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java index 32d36b83..aef0c02b 100644 --- a/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java +++ b/src/main/java/com/starrocks/connector/flink/row/sink/StarRocksTableRowTransformer.java @@ -20,7 +20,6 @@ import com.starrocks.connector.flink.tools.JsonWrapper; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.GenericArrayData; @@ -78,8 +77,9 @@ public void setTableSchema(TableSchema ts) { @Override public void setRuntimeContext(RuntimeContext runtimeCtx) { - final TypeSerializer typeSerializer = rowDataTypeInfo.createSerializer(runtimeCtx.getExecutionConfig()); - valueTransform = runtimeCtx.getExecutionConfig().isObjectReuseEnabled() ? typeSerializer::copy : Function.identity(); + // No need to copy the value even if object reuse is enabled, + // because the raw RowData value will not be buffered + this.valueTransform = Function.identity(); } @Override diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java index 4409f977..bee4b2ad 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/SinkFunctionFactory.java @@ -19,6 +19,7 @@ package com.starrocks.connector.flink.table.sink; import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.table.sink.v2.StarRocksSink; import org.apache.flink.table.api.TableSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,4 +115,23 @@ public static StarRocksDynamicSinkFunctionBase createSinkFunction(StarRoc throw new UnsupportedOperationException("Unsupported sink type " + sinkVersion.name()); } } + + public static StarRocksSink createSink( + StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer rowTransformer) { + detectStarRocksFeature(sinkOptions); + SinkVersion sinkVersion = getSinkVersion(sinkOptions); + if (sinkVersion == SinkVersion.V2) { + return new StarRocksSink<>(sinkOptions, schema, rowTransformer); + } + throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name()); + } + + public static StarRocksSink createSink(StarRocksSinkOptions sinkOptions) { + detectStarRocksFeature(sinkOptions); + SinkVersion sinkVersion = getSinkVersion(sinkOptions); + if (sinkVersion == SinkVersion.V2) { + return new StarRocksSink<>(sinkOptions); + } + throw new UnsupportedOperationException("New sink api don't support sink type " + sinkVersion.name()); + } } diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java index ecf75628..f208b4b1 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java @@ -215,7 +215,7 @@ public void open(Configuration parameters) throws Exception { if (serializer != null) { this.serializer.open(new StarRocksISerializer.SerializerContext(getOrCreateJsonWrapper())); } - this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext(), sinkOptions); + this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext().getMetricGroup(), sinkOptions); sinkManager.setStreamLoadListener(streamLoadListener); LabelGeneratorFactory labelGeneratorFactory; diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java index c5e38eff..41c80227 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSink.java @@ -15,11 +15,13 @@ package com.starrocks.connector.flink.table.sink; import com.starrocks.connector.flink.row.sink.StarRocksTableRowTransformer; +import com.starrocks.connector.flink.table.sink.v2.StarRocksSink; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkFunctionProvider; +import org.apache.flink.table.connector.sink.SinkV2Provider; import org.apache.flink.table.data.RowData; public class StarRocksDynamicTableSink implements DynamicTableSink { @@ -41,12 +43,23 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) { @SuppressWarnings("unchecked") public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { final TypeInformation rowDataTypeInfo = context.createTypeInformation(flinkSchema.toRowDataType()); - StarRocksDynamicSinkFunctionBase starrocksSinkFunction = SinkFunctionFactory.createSinkFunction( - sinkOptions, - flinkSchema, - new StarRocksTableRowTransformer(rowDataTypeInfo) - ); - return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism()); + if (sinkOptions.isUseUnifiedSinkApi()) { + StarRocksSink starRocksSink = + SinkFunctionFactory.createSink( + sinkOptions, + flinkSchema, + new StarRocksTableRowTransformer(rowDataTypeInfo) + ); + return SinkV2Provider.of(starRocksSink, sinkOptions.getSinkParallelism()); + } else { + StarRocksDynamicSinkFunctionBase starrocksSinkFunction = + SinkFunctionFactory.createSinkFunction( + sinkOptions, + flinkSchema, + new StarRocksTableRowTransformer(rowDataTypeInfo) + ); + return SinkFunctionProvider.of(starrocksSinkFunction, sinkOptions.getSinkParallelism()); + } } @Override diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java index 39f858d1..21a2b7c6 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java @@ -85,6 +85,7 @@ public Set> optionalOptions() { optionalOptions.add(StarRocksSinkOptions.SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN); optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_LINGERING_TXNS); optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_CHECK_NUM_TXNS); + optionalOptions.add(StarRocksSinkOptions.SINK_USE_NEW_SINK_API); return optionalOptions; } } diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index e03daf85..78fe1786 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -139,6 +139,13 @@ public enum StreamLoadFormat { "The number of transactions to check if they are lingering. -1 indicates that check until finding the first " + "transaction that is not lingering."); + public static final ConfigOption SINK_USE_NEW_SINK_API = ConfigOptions.key("sink.use.new-sink-api") + .booleanType().defaultValue(false).withDescription("Whether to use the implementation with the unified sink api " + + "described in Flink FLIP-191. There is no difference for users whether to enable this flag. This is just " + + "for adapting some frameworks which only support new sink api, and Flink will also remove the old sink api " + + "in the coming 2.0. Note that it's not compatible after changing the flag, that's, you can't recover from " + + "the previous job after changing the flag."); + public static final ConfigOption SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM; // Sink semantic @@ -351,6 +358,10 @@ public int getAbortCheckNumTxns() { return tableOptions.get(SINK_ABORT_CHECK_NUM_TXNS); } + public boolean isUseUnifiedSinkApi() { + return tableOptions.get(SINK_USE_NEW_SINK_API); + } + private void validateStreamLoadUrl() { tableOptions.getOptional(LOAD_URL).ifPresent(urlList -> { for (String host : urlList) { diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java new file mode 100644 index 00000000..25294257 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittable.java @@ -0,0 +1,36 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.data.load.stream.StreamLoadSnapshot; + +public class StarRocksCommittable { + + private final StreamLoadSnapshot labelSnapshot; + + public StarRocksCommittable(StreamLoadSnapshot labelSnapshot) { + this.labelSnapshot = labelSnapshot; + } + + public StreamLoadSnapshot getLabelSnapshot() { + return labelSnapshot; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java new file mode 100644 index 00000000..bfb69090 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommittableSerializer.java @@ -0,0 +1,50 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.tools.JsonWrapper; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +public class StarRocksCommittableSerializer implements SimpleVersionedSerializer { + + private final JsonWrapper jsonWrapper; + + public StarRocksCommittableSerializer() { + this.jsonWrapper = new JsonWrapper(); + } + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(StarRocksCommittable committable) throws IOException { + return jsonWrapper.toJSONBytes(committable); + } + + @Override + public StarRocksCommittable deserialize(int version, byte[] serialized) throws IOException { + return jsonWrapper.parseObject(serialized, StarRocksCommittable.class); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java new file mode 100644 index 00000000..4a004268 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksCommitter.java @@ -0,0 +1,96 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic; +import com.starrocks.connector.flink.tools.EnvUtils; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import com.starrocks.data.load.stream.v2.StreamLoadManagerV2; +import org.apache.flink.api.connector.sink2.Committer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; + +public class StarRocksCommitter implements Committer { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksCommitter.class); + + private final int maxRetries; + private final StreamLoadManagerV2 sinkManager; + + public StarRocksCommitter( + StarRocksSinkOptions sinkOptions, + StreamLoadProperties streamLoadProperties) { + this.maxRetries = sinkOptions.getSinkMaxRetries(); + this.sinkManager = new StreamLoadManagerV2(streamLoadProperties, + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); + try { + // TODO no need to start flush thread in sinkManager + sinkManager.init(); + } catch (Exception e) { + LOG.error("Failed to init sink manager.", e); + try { + sinkManager.close(); + } catch (Exception ie) { + LOG.error("Failed to close sink manager after init failure.", ie); + } + throw new RuntimeException("Failed to init sink manager", e); + } + LOG.info("Create StarRocksCommitter, maxRetries: {}. {}", maxRetries, EnvUtils.getGitInformation()); + } + + @Override + public void commit(Collection> committables) + throws IOException, InterruptedException { + for (CommitRequest commitRequest : committables) { + StarRocksCommittable committable = commitRequest.getCommittable(); + RuntimeException firstException = null; + for (int i = 0; i <= maxRetries; i++) { + try { + boolean success = sinkManager.commit(committable.getLabelSnapshot()); + if (success) { + break; + } + throw new RuntimeException("Please see the taskmanager log for the failure reason"); + } catch (Exception e) { + LOG.error("Fail to commit after {} retries, max retries: {}", i, maxRetries, e); + if (firstException != null) { + firstException = new RuntimeException("Failed to commit", e); + } + } + } + if (firstException != null) { + throw firstException; + } + } + } + + @Override + public void close() throws Exception { + LOG.info("Close StarRocksCommitter."); + if(sinkManager != null) { + sinkManager.close(); + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java new file mode 100644 index 00000000..56177c32 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksSink.java @@ -0,0 +1,109 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.manager.StarRocksSinkTable; +import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.row.sink.StarRocksISerializer; +import com.starrocks.connector.flink.row.sink.StarRocksSerializerFactory; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import org.apache.flink.api.connector.sink2.Committer; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.table.api.TableSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; + +public class StarRocksSink + implements StatefulSink, TwoPhaseCommittingSink { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksSink.class); + + private final StarRocksSinkOptions sinkOptions; + private final StarRocksISerializer serializer; + private final StarRocksIRowTransformer rowTransformer; + private final StreamLoadProperties streamLoadProperties; + + + public StarRocksSink(StarRocksSinkOptions sinkOptions, TableSchema schema, StarRocksIRowTransformer rowTransformer) { + this.sinkOptions = sinkOptions; + this.rowTransformer = rowTransformer; + StarRocksSinkTable sinkTable = StarRocksSinkTable.builder() + .sinkOptions(sinkOptions) + .build(); + sinkTable.validateTableStructure(sinkOptions, schema); + // StarRocksJsonSerializer depends on SinkOptions#supportUpsertDelete which is decided in + // StarRocksSinkTable#validateTableStructure, so create serializer after validating table structure + this.serializer = StarRocksSerializerFactory.createSerializer(sinkOptions, schema.getFieldNames()); + rowTransformer.setStarRocksColumns(sinkTable.getFieldMapping()); + rowTransformer.setTableSchema(schema); + this.streamLoadProperties = sinkOptions.getProperties(sinkTable); + } + + public StarRocksSink(StarRocksSinkOptions sinkOptions) { + this.sinkOptions = sinkOptions; + this.serializer = null; + this.rowTransformer = null; + this.streamLoadProperties = sinkOptions.getProperties(null); + } + + @Override + public StarRocksWriter createWriter(InitContext context) throws IOException { + return restoreWriter(context, Collections.emptyList()); + } + + @Override + public StarRocksWriter restoreWriter(InitContext context, Collection recoveredState) + throws IOException { + try { + return new StarRocksWriter<>( + sinkOptions, + serializer, + rowTransformer, + streamLoadProperties, + context, + Collections.emptyList()); + } catch (Exception e) { + throw new RuntimeException("Failed to create writer.", e); + } + } + + @Override + public SimpleVersionedSerializer getWriterStateSerializer() { + return new StarRocksWriterStateSerializer(); + } + + @Override + public Committer createCommitter() throws IOException { + return new StarRocksCommitter(sinkOptions, streamLoadProperties); + } + + @Override + public SimpleVersionedSerializer getCommittableSerializer() { + return new StarRocksCommittableSerializer(); + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java new file mode 100644 index 00000000..cfbe1f7b --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriter.java @@ -0,0 +1,254 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.google.common.base.Strings; +import com.starrocks.connector.flink.manager.StarRocksStreamLoadListener; +import com.starrocks.connector.flink.row.sink.StarRocksIRowTransformer; +import com.starrocks.connector.flink.row.sink.StarRocksISerializer; +import com.starrocks.connector.flink.table.data.StarRocksRowData; +import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorFactory; +import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorSnapshot; +import com.starrocks.connector.flink.table.sink.LingeringTransactionAborter; +import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions; +import com.starrocks.connector.flink.table.sink.StarRocksSinkRowDataWithMeta; +import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic; +import com.starrocks.connector.flink.tools.EnvUtils; +import com.starrocks.connector.flink.tools.JsonWrapper; +import com.starrocks.data.load.stream.LabelGeneratorFactory; +import com.starrocks.data.load.stream.StreamLoadSnapshot; +import com.starrocks.data.load.stream.properties.StreamLoadProperties; +import com.starrocks.data.load.stream.v2.StreamLoadManagerV2; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; +import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; +import org.apache.flink.table.data.RowData; +import org.apache.flink.types.RowKind; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class StarRocksWriter + implements StatefulSink.StatefulSinkWriter, + TwoPhaseCommittingSink.PrecommittingSinkWriter { + + private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriter.class); + + private final StarRocksSinkOptions sinkOptions; + private final StarRocksISerializer serializer; + private final StarRocksIRowTransformer rowTransformer; + private final JsonWrapper jsonWrapper; + private final StarRocksStreamLoadListener streamLoadListener; + private final LabelGeneratorFactory labelGeneratorFactory; + private final StreamLoadManagerV2 sinkManager; + private long totalReceivedRows = 0; + + public StarRocksWriter( + StarRocksSinkOptions sinkOptions, + StarRocksISerializer serializer, + StarRocksIRowTransformer rowTransformer, + StreamLoadProperties streamLoadProperties, + Sink.InitContext initContext, + Collection recoveredState) throws Exception { + this.sinkOptions = sinkOptions; + this.serializer = serializer; + this.rowTransformer = rowTransformer; + + this.jsonWrapper = new JsonWrapper(); + if (this.serializer != null) { + this.serializer.open(new StarRocksISerializer.SerializerContext(jsonWrapper)); + } + if (this.rowTransformer != null) { + this.rowTransformer.setRuntimeContext(null); + this.rowTransformer.setFastJsonWrapper(jsonWrapper); + } + this.streamLoadListener = new StarRocksStreamLoadListener(initContext.metricGroup(), sinkOptions); + + long restoredCheckpointId = initContext.getRestoredCheckpointId() + .orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1); + List restoredGeneratorSnapshots = new ArrayList<>(); + for (StarRocksWriterState writerState : recoveredState) { + restoredGeneratorSnapshots.addAll(writerState.getLabelSnapshots()); + } + String labelPrefix = sinkOptions.getLabelPrefix(); + if (labelPrefix == null || + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE || + !sinkOptions.isEnableExactlyOnceLabelGen()) { + this.labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory( + labelPrefix == null ? "flink" : labelPrefix); + } else { + this.labelGeneratorFactory = new ExactlyOnceLabelGeneratorFactory( + labelPrefix, + initContext.getNumberOfParallelSubtasks(), + initContext.getSubtaskId(), + restoredCheckpointId); + } + + this.sinkManager = new StreamLoadManagerV2(streamLoadProperties, + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE); + sinkManager.setStreamLoadListener(streamLoadListener); + sinkManager.setLabelGeneratorFactory(labelGeneratorFactory); + try { + sinkManager.init(); + } catch (Exception e) { + LOG.error("Failed to init sink manager.", e); + try { + sinkManager.close(); + } catch (Exception ie) { + LOG.error("Failed to close sink manager after init failure.", ie); + } + throw new RuntimeException("Failed to init sink manager", e); + } + + try { + if (sinkOptions.getSemantic() == StarRocksSinkSemantic.EXACTLY_ONCE + && sinkOptions.isAbortLingeringTxns()) { + LingeringTransactionAborter aborter = new LingeringTransactionAborter( + sinkOptions.getLabelPrefix(), + restoredCheckpointId, + initContext.getSubtaskId(), + sinkOptions.getAbortCheckNumTxns(), + sinkOptions.getDbTables(), + restoredGeneratorSnapshots, + sinkManager.getStreamLoader()); + + aborter.execute(); + } + } catch (Exception e) { + LOG.error("Failed to abort lingering transactions.", e); + try { + sinkManager.close(); + } catch (Exception ie) { + LOG.error("Failed to close sink manager after aborting lingering transaction failure.", ie); + } + throw new RuntimeException("Failed to abort lingering transactions", e); + } + + LOG.info("Create StarRocksWriter. {}", EnvUtils.getGitInformation()); + } + + @Override + public void write(InputT element, Context context) throws IOException, InterruptedException { + if (serializer == null) { + if (element instanceof StarRocksSinkRowDataWithMeta) { + StarRocksSinkRowDataWithMeta data = (StarRocksSinkRowDataWithMeta) element; + if (Strings.isNullOrEmpty(data.getDatabase()) + || Strings.isNullOrEmpty(data.getTable()) + || data.getDataRows() == null) { + LOG.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}", + data.getDatabase(), data.getTable(), Arrays.toString(data.getDataRows()))); + return; + } + sinkManager.write(null, data.getDatabase(), data.getTable(), data.getDataRows()); + return; + } else if (element instanceof StarRocksRowData) { + StarRocksRowData data = (StarRocksRowData) element; + if (Strings.isNullOrEmpty(data.getDatabase()) + || Strings.isNullOrEmpty(data.getTable()) + || data.getRow() == null) { + LOG.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}", + data.getDatabase(), data.getTable(), data.getRow())); + return; + } + sinkManager.write(data.getUniqueKey(), data.getDatabase(), data.getTable(), data.getRow()); + return; + } + // raw data sink + sinkManager.write(null, sinkOptions.getDatabaseName(), sinkOptions.getTableName(), element.toString()); + return; + } + + if (element instanceof RowData) { + if (RowKind.UPDATE_BEFORE.equals(((RowData) element).getRowKind()) && + (!sinkOptions.supportUpsertDelete() || sinkOptions.getIgnoreUpdateBefore())) { + return; + } + if (!sinkOptions.supportUpsertDelete() && RowKind.DELETE.equals(((RowData) element).getRowKind())) { + // let go the UPDATE_AFTER and INSERT rows for tables who have a group of `unique` or `duplicate` keys. + return; + } + } + String serializedValue = serializer.serialize(rowTransformer.transform(element, sinkOptions.supportUpsertDelete())); + sinkManager.write( + null, + sinkOptions.getDatabaseName(), + sinkOptions.getTableName(), + serializedValue); + + totalReceivedRows += 1; + if (totalReceivedRows % 100 == 1) { + LOG.debug("Received raw record: {}", element); + LOG.debug("Received serialized record: {}", serializedValue); + } + } + + @Override + public void flush(boolean endOfInput) throws IOException, InterruptedException { + sinkManager.flush(); + } + + @Override + public Collection prepareCommit() throws IOException, InterruptedException { + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE) { + return Collections.emptyList(); + } + + StreamLoadSnapshot snapshot = sinkManager.snapshot(); + if (sinkManager.prepare(snapshot)) { + return Collections.singleton(new StarRocksCommittable(snapshot)); + } else { + sinkManager.abort(snapshot); + throw new RuntimeException("Snapshot state failed by prepare"); + } + } + + @Override + public List snapshotState(long checkpointId) throws IOException { + if (sinkOptions.getSemantic() != StarRocksSinkSemantic.EXACTLY_ONCE || + !(labelGeneratorFactory instanceof ExactlyOnceLabelGeneratorFactory)) { + return Collections.emptyList(); + } + + List labelSnapshots = + ((ExactlyOnceLabelGeneratorFactory) labelGeneratorFactory).snapshot(checkpointId); + return Collections.singletonList(new StarRocksWriterState(labelSnapshots)); + } + + @Override + public void close() throws Exception { + LOG.info("Close StarRocksWriter"); + if (sinkManager != null) { + try { + StreamLoadSnapshot snapshot = sinkManager.snapshot(); + sinkManager.abort(snapshot); + } finally { + sinkManager.close(); + } + } + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java new file mode 100644 index 00000000..e07680bd --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterState.java @@ -0,0 +1,38 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.table.sink.ExactlyOnceLabelGeneratorSnapshot; + +import java.util.List; + +public class StarRocksWriterState { + + private List labelSnapshots; + + public StarRocksWriterState(List labelSnapshots) { + this.labelSnapshots = labelSnapshots; + } + + public List getLabelSnapshots() { + return labelSnapshots; + } +} diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java new file mode 100644 index 00000000..c7200463 --- /dev/null +++ b/src/main/java/com/starrocks/connector/flink/table/sink/v2/StarRocksWriterStateSerializer.java @@ -0,0 +1,51 @@ +/* + * Copyright 2021-present StarRocks, Inc. All rights reserved. + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.starrocks.connector.flink.table.sink.v2; + +import com.starrocks.connector.flink.table.sink.StarrocksSnapshotState; +import com.starrocks.connector.flink.tools.JsonWrapper; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import java.io.IOException; + +public class StarRocksWriterStateSerializer implements SimpleVersionedSerializer { + + private final JsonWrapper jsonWrapper; + + public StarRocksWriterStateSerializer() { + this.jsonWrapper = new JsonWrapper(); + } + + @Override + public int getVersion() { + return 1; + } + + @Override + public byte[] serialize(StarRocksWriterState state) throws IOException { + return jsonWrapper.toJSONBytes(state); + } + + @Override + public StarRocksWriterState deserialize(int version, byte[] serialized) throws IOException { + return jsonWrapper.parseObject(serialized, StarrocksSnapshotState.class); + } +} diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 661c53e6..dc486fd0 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -53,14 +53,21 @@ @RunWith(Parameterized.class) public class StarRocksSinkITTest extends StarRocksITTestBase { - @Parameterized.Parameters(name = "sinkV2={0}") - public static List parameters() { - return Arrays.asList(false, true); + @Parameterized.Parameters(name = "sinkV2={0}, newSinkApi={1}") + public static List parameters() { + return Arrays.asList( + new Object[] {false, false}, + new Object[] {true, false}, + new Object[] {true, true} + ); } @Parameterized.Parameter public boolean isSinkV2; + @Parameterized.Parameter(1) + public boolean newSinkApi; + @Test public void testDupKeyWriteFullColumnsInOrder() throws Exception { String ddl = "c0 INT, c1 FLOAT, c2 STRING"; @@ -149,6 +156,7 @@ private void testDupKeyWriteBase(String flinkDDL, RowTypeInfo rowTypeInfo, "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -271,6 +279,7 @@ private void testPkWriteForBase(String flinkDDL, RowTypeInfo rowTypeInfo, "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -321,6 +330,7 @@ public void testPKUpsertAndDelete() throws Exception { "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -374,6 +384,7 @@ public void testPKPartialUpdateDelete() throws Exception { "'jdbc-url'='" + sinkOptions.getJdbcUrl() + "'," + "'load-url'='" + String.join(";", sinkOptions.getLoadUrlList()) + "'," + "'sink.version' = '" + (isSinkV2 ? "V2" : "V1") + "'," + + "'sink.use.new-sink-api' = '" + (newSinkApi ? "true" : "false") + "'," + "'database-name' = '" + DB_NAME + "'," + "'table-name' = '" + sinkOptions.getTableName() + "'," + "'username' = '" + sinkOptions.getUsername() + "'," + @@ -524,6 +535,7 @@ private void testConfigurationBase(Map options, Function