diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java index 302f1f35..4321a4cc 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java @@ -30,6 +30,7 @@ import com.starrocks.connector.flink.table.data.StarRocksRowData; import com.starrocks.connector.flink.tools.EnvUtils; import com.starrocks.connector.flink.tools.JsonWrapper; +import com.starrocks.data.load.stream.LabelGeneratorFactory; import com.starrocks.data.load.stream.StreamLoadSnapshot; import com.starrocks.data.load.stream.v2.StreamLoadManagerV2; import net.sf.jsqlparser.parser.CCJSqlParserUtil; @@ -88,7 +89,7 @@ public class StarRocksDynamicSinkFunctionV2 extends StarRocksDynamicSinkFunct // Only valid when using exactly-once and label prefix is set @Nullable - private transient ExactlyOnceLabelGeneratorFactory labelGeneratorFactory; + private transient ExactlyOnceLabelGeneratorFactory exactlyOnceLabelFactory; @Deprecated private transient ListState> legacyState; @@ -215,9 +216,25 @@ public void open(Configuration parameters) throws Exception { } this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext(), sinkOptions); sinkManager.setStreamLoadListener(streamLoadListener); - if (labelGeneratorFactory != null) { - sinkManager.setLabelGeneratorFactory(labelGeneratorFactory); + + LabelGeneratorFactory labelGeneratorFactory; + String labelPrefix = sinkOptions.getLabelPrefix(); + if (labelPrefix == null || + sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE || + !sinkOptions.isEnableExactlyOnceLabelGen()) { + labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory( + labelPrefix == null ? "flink" : labelPrefix); + } else { + this.exactlyOnceLabelFactory = new ExactlyOnceLabelGeneratorFactory( + labelPrefix, + getRuntimeContext().getNumberOfParallelSubtasks(), + getRuntimeContext().getIndexOfThisSubtask(), + restoredCheckpointId); + exactlyOnceLabelFactory.restore(restoredGeneratorSnapshots); + labelGeneratorFactory = exactlyOnceLabelFactory; } + sinkManager.setLabelGeneratorFactory(labelGeneratorFactory); + sinkManager.init(); if (rowTransformer != null) { rowTransformer.setRuntimeContext(getRuntimeContext()); @@ -244,16 +261,6 @@ private void openForExactlyOnce() throws Exception { aborter.execute(); } - if (sinkOptions.getLabelPrefix() != null) { - this.labelGeneratorFactory = new ExactlyOnceLabelGeneratorFactory( - sinkOptions.getLabelPrefix(), - getRuntimeContext().getNumberOfParallelSubtasks(), - getRuntimeContext().getIndexOfThisSubtask(), - restoredCheckpointId); - labelGeneratorFactory.restore(restoredGeneratorSnapshots); - sinkManager.setLabelGeneratorFactory(labelGeneratorFactory); - } - notifyCheckpointComplete(Long.MAX_VALUE); } @@ -298,8 +305,8 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw snapshotMap.put(functionSnapshotContext.getCheckpointId(), Collections.singletonList(snapshot)); snapshotStates.clear(); - List labelSnapshots = labelGeneratorFactory == null ? null - : labelGeneratorFactory.snapshot(functionSnapshotContext.getCheckpointId()); + List labelSnapshots = exactlyOnceLabelFactory == null ? null + : exactlyOnceLabelFactory.snapshot(functionSnapshotContext.getCheckpointId()); snapshotStates.add(StarrocksSnapshotState.of(snapshotMap, labelSnapshots)); } else { sinkManager.abort(snapshot); diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java index 15ee604d..af4b61b2 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java @@ -76,7 +76,9 @@ public Set> optionalOptions() { optionalOptions.add(StarRocksSinkOptions.SINK_CHUNK_LIMIT); optionalOptions.add(StarRocksSinkOptions.SINK_SCAN_FREQUENCY); optionalOptions.add(StarRocksSinkOptions.SINK_IGNORE_UPDATE_BEFORE); + optionalOptions.add(StarRocksSinkOptions.SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN); optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_LINGERING_TXNS); + optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_CHECK_NUM_TXNS); return optionalOptions; } } diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index 11faf19a..c36b9b95 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -125,9 +125,14 @@ public enum StreamLoadFormat { "insert the update_after row in StarRocks, and this options should be set false for this case. Note that how " + "to set this options depends on the user case."); + public static final ConfigOption SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN = ConfigOptions.key("sink.enable.exactly-once.label-gen") + .booleanType().defaultValue(false).withDescription("Only available when using exactly-once and sink.label-prefix is set. " + + "When this option is true, the connector will generate label in the format '{labelPrefix}-{tableName}-{subtaskIndex}-{id}'. " + + "This format could be used to track lingering transactions."); + public static final ConfigOption SINK_ABORT_LINGERING_TXNS = ConfigOptions.key("sink.abort.lingering-txns") .booleanType().defaultValue(true).withDescription("Whether to abort lingering transactions when the job restore. " + - "This option is only available when you are using exactly-once."); + "This option is only available when using exactly-once."); public static final ConfigOption SINK_ABORT_CHECK_NUM_TXNS = ConfigOptions.key("sink.abort.check-num-txns") .intType().defaultValue(-1).withDescription("The number of lingering transactions to check. -1 indicates that " + @@ -334,6 +339,10 @@ public boolean isSupportTransactionStreamLoad() { return supportTransactionStreamLoad; } + public boolean isEnableExactlyOnceLabelGen() { + return tableOptions.get(SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN); + } + public boolean isAbortLingeringTxns() { return tableOptions.get(SINK_ABORT_LINGERING_TXNS); } diff --git a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java index 7b9c5be9..517b7c9a 100644 --- a/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java +++ b/src/test/java/com/starrocks/connector/flink/it/sink/StarRocksSinkITTest.java @@ -419,11 +419,30 @@ public void testExactlyOnce() throws Exception { ); } + @Test + public void testEnableExactlyOnceLabelGen() throws Exception { + assumeTrue(isSinkV2); + Map options = new HashMap<>(); + options.put("sink.semantic", "exactly-once"); + options.put("sink.label-prefix", "test_label"); + options.put("sink.enable.exactly-once.label-gen", "true"); + String checkpointDir = temporaryFolder.newFolder().toURI().toString(); + testConfigurationBase(options, + env -> { + env.enableCheckpointing(1000); + env.getCheckpointConfig().setCheckpointStorage(checkpointDir); + Configuration config = new Configuration(); + config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); + env.configure(config); + return null; + } + ); + } + @Test public void testAbortLingeringTransactions() throws Exception { assumeTrue(isSinkV2); Map options = new HashMap<>(); - options.put("sink.at-least-once.use-transaction-stream-load", "false"); options.put("sink.semantic", "exactly-once"); options.put("sink.label-prefix", "test_label"); String checkpointDir = temporaryFolder.newFolder().toURI().toString(); @@ -439,6 +458,26 @@ public void testAbortLingeringTransactions() throws Exception { ); } + @Test + public void testAbortLingeringTransactionsWithCheckNum() throws Exception { + assumeTrue(isSinkV2); + Map options = new HashMap<>(); + options.put("sink.semantic", "exactly-once"); + options.put("sink.label-prefix", "test_label"); + options.put("sink.abort.check-num-txns", "10"); + String checkpointDir = temporaryFolder.newFolder().toURI().toString(); + testConfigurationBase(options, + env -> { + env.enableCheckpointing(1000); + env.getCheckpointConfig().setCheckpointStorage(checkpointDir); + Configuration config = new Configuration(); + config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true); + env.configure(config); + return null; + } + ); + } + @Test public void testJsonFormat() throws Exception { if (isSinkV2) { diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/LabelGeneratorFactory.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/LabelGeneratorFactory.java index b1d29b11..df4daeb3 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/LabelGeneratorFactory.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/LabelGeneratorFactory.java @@ -31,7 +31,7 @@ class DefaultLabelGeneratorFactory implements LabelGeneratorFactory { private final String labelPrefix; public DefaultLabelGeneratorFactory(String labelPrefix) { - this.labelPrefix = labelPrefix == null ? "flink_" : labelPrefix; + this.labelPrefix = labelPrefix == null ? "" : labelPrefix; } @Override