Skip to content

Commit

Permalink
[Enhancement] Add option to control whether to enable exactly-once label
Browse files Browse the repository at this point in the history
gen

Signed-off-by: PengFei Li <[email protected]>
  • Loading branch information
banmoy committed Sep 3, 2023
1 parent 83919ff commit e3c4e4e
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.starrocks.connector.flink.table.data.StarRocksRowData;
import com.starrocks.connector.flink.tools.EnvUtils;
import com.starrocks.connector.flink.tools.JsonWrapper;
import com.starrocks.data.load.stream.LabelGeneratorFactory;
import com.starrocks.data.load.stream.StreamLoadSnapshot;
import com.starrocks.data.load.stream.v2.StreamLoadManagerV2;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunct

// Only valid when using exactly-once and label prefix is set
@Nullable
private transient ExactlyOnceLabelGeneratorFactory labelGeneratorFactory;
private transient ExactlyOnceLabelGeneratorFactory exactlyOnceLabelFactory;

@Deprecated
private transient ListState<Map<String, StarRocksSinkBufferEntity>> legacyState;
Expand Down Expand Up @@ -216,9 +217,25 @@ public void open(Configuration parameters) throws Exception {
}
this.streamLoadListener = new StarRocksStreamLoadListener(getRuntimeContext(), sinkOptions);
sinkManager.setStreamLoadListener(streamLoadListener);
if (labelGeneratorFactory != null) {
sinkManager.setLabelGeneratorFactory(labelGeneratorFactory);

LabelGeneratorFactory labelGeneratorFactory;
String labelPrefix = sinkOptions.getLabelPrefix();
if (labelPrefix == null ||
sinkOptions.getSemantic() == StarRocksSinkSemantic.AT_LEAST_ONCE ||
!sinkOptions.isEnableExactlyOnceLabelGen()) {
labelGeneratorFactory = new LabelGeneratorFactory.DefaultLabelGeneratorFactory(
labelPrefix == null ? "flink" : labelPrefix);
} else {
this.exactlyOnceLabelFactory = new ExactlyOnceLabelGeneratorFactory(
labelPrefix,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask(),
restoredCheckpointId);
exactlyOnceLabelFactory.restore(restoredGeneratorSnapshots);
labelGeneratorFactory = exactlyOnceLabelFactory;
}
sinkManager.setLabelGeneratorFactory(labelGeneratorFactory);

sinkManager.init();
if (rowTransformer != null) {
rowTransformer.setRuntimeContext(getRuntimeContext());
Expand All @@ -245,16 +262,6 @@ private void openForExactlyOnce() throws Exception {
aborter.execute();
}

if (sinkOptions.getLabelPrefix() != null) {
this.labelGeneratorFactory = new ExactlyOnceLabelGeneratorFactory(
sinkOptions.getLabelPrefix(),
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getIndexOfThisSubtask(),
restoredCheckpointId);
labelGeneratorFactory.restore(restoredGeneratorSnapshots);
sinkManager.setLabelGeneratorFactory(labelGeneratorFactory);
}

notifyCheckpointComplete(Long.MAX_VALUE);
}

Expand Down Expand Up @@ -299,8 +306,8 @@ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throw
snapshotMap.put(functionSnapshotContext.getCheckpointId(), Collections.singletonList(snapshot));

snapshotStates.clear();
List<ExactlyOnceLabelGeneratorSnapshot> labelSnapshots = labelGeneratorFactory == null ? null
: labelGeneratorFactory.snapshot(functionSnapshotContext.getCheckpointId());
List<ExactlyOnceLabelGeneratorSnapshot> labelSnapshots = exactlyOnceLabelFactory == null ? null
: exactlyOnceLabelFactory.snapshot(functionSnapshotContext.getCheckpointId());
snapshotStates.add(StarrocksSnapshotState.of(snapshotMap, labelSnapshots));
} else {
sinkManager.abort(snapshot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public Set<ConfigOption<?>> optionalOptions() {
optionalOptions.add(StarRocksSinkOptions.SINK_CHUNK_LIMIT);
optionalOptions.add(StarRocksSinkOptions.SINK_SCAN_FREQUENCY);
optionalOptions.add(StarRocksSinkOptions.SINK_IGNORE_UPDATE_BEFORE);
optionalOptions.add(StarRocksSinkOptions.SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN);
optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_LINGERING_TXNS);
optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_CHECK_NUM_TXNS);
return optionalOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,14 @@ public enum StreamLoadFormat {
"insert the update_after row in StarRocks, and this options should be set false for this case. Note that how " +
"to set this options depends on the user case.");

public static final ConfigOption<Boolean> SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN = ConfigOptions.key("sink.enable.exactly-once.label-gen")
.booleanType().defaultValue(false).withDescription("Only available when using exactly-once and sink.label-prefix is set. " +
"When this option is true, the connector will generate label in the format '{labelPrefix}-{tableName}-{subtaskIndex}-{id}'. " +
"This format could be used to track lingering transactions.");

public static final ConfigOption<Boolean> SINK_ABORT_LINGERING_TXNS = ConfigOptions.key("sink.abort.lingering-txns")
.booleanType().defaultValue(true).withDescription("Whether to abort lingering transactions when the job restore. " +
"This option is only available when you are using exactly-once.");
"This option is only available when using exactly-once.");

public static final ConfigOption<Integer> SINK_ABORT_CHECK_NUM_TXNS = ConfigOptions.key("sink.abort.check-num-txns")
.intType().defaultValue(-1).withDescription("The number of lingering transactions to check. -1 indicates that " +
Expand Down Expand Up @@ -334,6 +339,10 @@ public boolean isSupportTransactionStreamLoad() {
return supportTransactionStreamLoad;
}

public boolean isEnableExactlyOnceLabelGen() {
return tableOptions.get(SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN);
}

public boolean isAbortLingeringTxns() {
return tableOptions.get(SINK_ABORT_LINGERING_TXNS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,30 @@ public void testExactlyOnce() throws Exception {
);
}

@Test
public void testEnableExactlyOnceLabelGen() throws Exception {
assumeTrue(isSinkV2);
Map<String, String> options = new HashMap<>();
options.put("sink.semantic", "exactly-once");
options.put("sink.label-prefix", "test_label");
options.put("sink.enable.exactly-once.label-gen", "true");
String checkpointDir = temporaryFolder.newFolder().toURI().toString();
testConfigurationBase(options,
env -> {
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
return null;
}
);
}

@Test
public void testAbortLingeringTransactions() throws Exception {
assumeTrue(isSinkV2);
Map<String, String> options = new HashMap<>();
options.put("sink.at-least-once.use-transaction-stream-load", "false");
options.put("sink.semantic", "exactly-once");
options.put("sink.label-prefix", "test_label");
String checkpointDir = temporaryFolder.newFolder().toURI().toString();
Expand All @@ -439,6 +458,26 @@ public void testAbortLingeringTransactions() throws Exception {
);
}

@Test
public void testAbortLingeringTransactionsWithCheckNum() throws Exception {
assumeTrue(isSinkV2);
Map<String, String> options = new HashMap<>();
options.put("sink.semantic", "exactly-once");
options.put("sink.label-prefix", "test_label");
options.put("sink.abort.check-num-txns", "10");
String checkpointDir = temporaryFolder.newFolder().toURI().toString();
testConfigurationBase(options,
env -> {
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env.configure(config);
return null;
}
);
}

@Test
public void testJsonFormat() throws Exception {
if (isSinkV2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class DefaultLabelGeneratorFactory implements LabelGeneratorFactory {
private final String labelPrefix;

public DefaultLabelGeneratorFactory(String labelPrefix) {
this.labelPrefix = labelPrefix == null ? "flink_" : labelPrefix;
this.labelPrefix = labelPrefix == null ? "" : labelPrefix;
}

@Override
Expand Down

0 comments on commit e3c4e4e

Please sign in to comment.