forked from StarRocks/starrocks-connector-for-apache-flink
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: PengFei Li <[email protected]>
- Loading branch information
Showing
8 changed files
with
688 additions
and
7 deletions.
There are no files selected for viewing
144 changes: 144 additions & 0 deletions
144
src/main/java/com/starrocks/connector/flink/table/sink/ExactlyOnceLabelGenerator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
/* | ||
* Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.starrocks.connector.flink.table.sink; | ||
|
||
import com.google.common.base.Objects; | ||
import com.starrocks.data.load.stream.LabelGenerator; | ||
|
||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
/** | ||
* Label generator for a table under exactly-once mode. The generator should guarantee the | ||
* uniqueness of the label across different flink jobs, different tables in one job, different | ||
* subtasks for a table in one job, and different transactions in on subtask. The label is | ||
* in the format {labelPrefix}-{table}-{subtaskIndex}-{id}, and it will be unique because | ||
* 1. labelPrefix is unique across different flink jobs, so there is no conflict among flink | ||
* jobs. Note that the uniqueness of labelPrefix should be guaranteed by users. | ||
* 2. the table name is unique in a database of StarRocks, and the label only need to be unique in | ||
* the scope of a database. so the label can be unique even if write to multiple tables in a job | ||
* 3. use subtaskIndex to make it unique across subtasks if the sink writes parallel | ||
* 4. use incremental id to make it unique across different transactions in a subtask | ||
* | ||
* <p> | ||
* The reason why we design this generator for exactly-once is to clean up lingering transactions. | ||
* Compared to at-least-once, exactly-once will not abort the PREPARED transactions when the job | ||
* failovers or exits because it's 2PC mechanism. Some of those PREPARED transactions may be in a | ||
* successful checkpoint, and will be committed when the job restores from the checkpoint, but some | ||
* of them are just useless, and should be aborted, otherwise they will be lingering in StarRocks | ||
* until timeout which maybe make StarRocks unstable, so we should try to abort these lingering | ||
* transactions when the job restores. The key is to find those lingering transactions, and we achieve | ||
* it by generating labels according to certain rules, storing the label generator information in | ||
* checkpoint, and constructing labels of possible lingering transactions when restoring from the | ||
* checkpoints. First, each label has an incremental id, and when checkpointing, the labels with ids | ||
* less than nextId must be successful, and ids for lingering transactions must be no less than nextId. | ||
* Secondly, make a snapshot {@link ExactlyOnceLabelGeneratorSnapshot} for the generator when checkpointing, | ||
* and store it in the state. Thirdly, when restoring from the checkpoint, we can get the nextId, build | ||
* labels with those ids no less than nextId, and , query label state in StarRocks, and abort them | ||
* if they are PREPARED. | ||
*/ | ||
public class ExactlyOnceLabelGenerator implements LabelGenerator { | ||
|
||
private final String labelPrefix; | ||
private final String db; | ||
private final String table; | ||
private final int numberOfSubtasks; | ||
private final int subTaskIndex; | ||
/** | ||
* The id only increment when checkpoint triggers, so it will not | ||
* be larger than the next checkpoint id. But it may be smaller | ||
* than the next checkpoint id because some checkpoints will not | ||
* trigger on this subtask, and the id will not increment. | ||
*/ | ||
private final AtomicLong nextId; | ||
|
||
public ExactlyOnceLabelGenerator(String labelPrefix, String db, String table, | ||
int numberOfSubtasks, int subTaskIndex, long initId) { | ||
this.labelPrefix = labelPrefix; | ||
this.db = db; | ||
this.table = table; | ||
this.numberOfSubtasks = numberOfSubtasks; | ||
this.subTaskIndex = subTaskIndex; | ||
this.nextId = new AtomicLong(initId); | ||
} | ||
|
||
@Override | ||
public String next() { | ||
return genLabel(labelPrefix, table, subTaskIndex, nextId.getAndIncrement()); | ||
} | ||
|
||
public ExactlyOnceLabelGeneratorSnapshot snapshot(long checkpointId) { | ||
return new ExactlyOnceLabelGeneratorSnapshot( | ||
checkpointId, db, table, labelPrefix, numberOfSubtasks, subTaskIndex, nextId.get()); | ||
} | ||
|
||
public static String genLabel(String labelPrefix, String table, int subTaskIndex, long id) { | ||
return String.format("%s-%s-%s-%s", labelPrefix, table, subTaskIndex, id); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "ExactlyOnceLabelGenerator{" + | ||
"labelPrefix='" + labelPrefix + '\'' + | ||
", db='" + db + '\'' + | ||
", table='" + table + '\'' + | ||
", numberOfSubtasks=" + numberOfSubtasks + | ||
", subTaskIndex=" + subTaskIndex + | ||
", nextId=" + nextId + | ||
'}'; | ||
} | ||
|
||
public static class LabelDbTableSubtask { | ||
|
||
private final String labelPrefix; | ||
private final String db; | ||
private final String table; | ||
private final int subTaskIndex; | ||
|
||
public LabelDbTableSubtask(String labelPrefix, String db, String table, int subTaskIndex) { | ||
this.labelPrefix = labelPrefix; | ||
this.db = db; | ||
this.table = table; | ||
this.subTaskIndex = subTaskIndex; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
|
||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
|
||
LabelDbTableSubtask that = (LabelDbTableSubtask) o; | ||
return subTaskIndex == that.subTaskIndex && | ||
Objects.equal(labelPrefix, that.labelPrefix) && | ||
Objects.equal(db, that.db) && | ||
Objects.equal(table, that.table); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hashCode(labelPrefix, db, table, subTaskIndex); | ||
} | ||
} | ||
} |
108 changes: 108 additions & 0 deletions
108
src/main/java/com/starrocks/connector/flink/table/sink/ExactlyOnceLabelGeneratorFactory.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
/* | ||
* Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.starrocks.connector.flink.table.sink; | ||
|
||
import com.starrocks.data.load.stream.LabelGenerator; | ||
import com.starrocks.data.load.stream.LabelGeneratorFactory; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
/** | ||
* Label generator factory for exactly-once. | ||
*/ | ||
public class ExactlyOnceLabelGeneratorFactory implements LabelGeneratorFactory { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceLabelGeneratorFactory.class); | ||
|
||
private final String labelPrefix; | ||
private final int numberOfSubtasks; | ||
private final int subtaskIndex; | ||
private final long restoreCheckpointId; | ||
// Label generators. The mapping is db -> table -> generator. | ||
private final Map<String, Map<String, ExactlyOnceLabelGenerator>> labelGenerators; | ||
|
||
public ExactlyOnceLabelGeneratorFactory(String labelPrefix, int numberOfSubtasks, int subtaskIndex, long restoreCheckpointId) { | ||
this.labelPrefix = labelPrefix; | ||
this.numberOfSubtasks = numberOfSubtasks; | ||
this.subtaskIndex = subtaskIndex; | ||
this.restoreCheckpointId = restoreCheckpointId; | ||
this.labelGenerators = new ConcurrentHashMap<>(); | ||
LOG.info("Create label generator factory. labelPrefix: {}, numberOfSubtasks: {}, subtaskIndex: {}, " + | ||
"restoreCheckpointId: {}", labelPrefix, numberOfSubtasks, subtaskIndex, restoreCheckpointId); | ||
} | ||
|
||
@Override | ||
public synchronized LabelGenerator create(String db, String table) { | ||
Map<String, ExactlyOnceLabelGenerator> tableMap = labelGenerators.computeIfAbsent(db, key -> new HashMap<>()); | ||
ExactlyOnceLabelGenerator generator = tableMap.get(table); | ||
if (generator == null) { | ||
// use restoreCheckpointId + 1 as the initial id rather than 0 to avoid | ||
// conflicts because the same labelPrefix is switched repeatedly | ||
generator = new ExactlyOnceLabelGenerator(labelPrefix, db, table, | ||
numberOfSubtasks, subtaskIndex, restoreCheckpointId + 1); | ||
tableMap.put(table, generator); | ||
LOG.info("Create label generator: {}", generator); | ||
} | ||
return generator; | ||
} | ||
|
||
public synchronized List<ExactlyOnceLabelGeneratorSnapshot> snapshot(long checkpointId) { | ||
List<ExactlyOnceLabelGeneratorSnapshot> metas = new ArrayList<>(); | ||
for (Map.Entry<String, Map<String, ExactlyOnceLabelGenerator>> entry : labelGenerators.entrySet()) { | ||
for (ExactlyOnceLabelGenerator generator : entry.getValue().values()) { | ||
metas.add(generator.snapshot(checkpointId)); | ||
} | ||
} | ||
return metas; | ||
} | ||
|
||
public synchronized void restore(List<ExactlyOnceLabelGeneratorSnapshot> snapshots) { | ||
Map<ExactlyOnceLabelGenerator.LabelDbTableSubtask, ExactlyOnceLabelGeneratorSnapshot> map = new HashMap<>(); | ||
for (ExactlyOnceLabelGeneratorSnapshot snapshot : snapshots) { | ||
if (snapshot.getSubtaskIndex() != subtaskIndex && !snapshot.getLabelPrefix().equals(labelPrefix)) { | ||
LOG.info("Skip snapshot: {}", snapshot); | ||
continue; | ||
} | ||
|
||
ExactlyOnceLabelGenerator.LabelDbTableSubtask meta = snapshot.createLabelDbTableSubtask(); | ||
ExactlyOnceLabelGeneratorSnapshot oldSnapshot = map.get(meta); | ||
// Sanity check that there should not have duplicated snapshot for a LabelDbTableSubtask | ||
if (oldSnapshot != null) { | ||
LOG.warn("Find duplicate snapshot, old snapshot: {}, new snapshot: {}", oldSnapshot, snapshot); | ||
} | ||
map.put(meta, snapshot); | ||
} | ||
|
||
for (ExactlyOnceLabelGeneratorSnapshot snapshot : map.values()) { | ||
ExactlyOnceLabelGenerator generator = new ExactlyOnceLabelGenerator( | ||
labelPrefix, snapshot.getDb(), snapshot.getTable(), numberOfSubtasks, subtaskIndex, snapshot.getNextId()); | ||
labelGenerators.computeIfAbsent(snapshot.getDb(), key -> new HashMap<>()) | ||
.put(snapshot.getTable(), generator); | ||
LOG.info("Restore snapshot: {}, generator: {}", snapshot, generator); | ||
} | ||
} | ||
} |
96 changes: 96 additions & 0 deletions
96
...main/java/com/starrocks/connector/flink/table/sink/ExactlyOnceLabelGeneratorSnapshot.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
/* | ||
* Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.starrocks.connector.flink.table.sink; | ||
|
||
/** Snapshot for the label generator. */ | ||
public class ExactlyOnceLabelGeneratorSnapshot { | ||
|
||
/** Which checkpoint creates this snapshot. */ | ||
private final long checkpointId; | ||
/** The database which the generator belongs to. */ | ||
private final String db; | ||
/** The table which the generator belongs to. */ | ||
private final String table; | ||
/** The label prefix. */ | ||
private final String labelPrefix; | ||
/** The number of subtasks (the parallelism of the sink) when this snapshot is created. */ | ||
private final int numberOfSubtasks; | ||
/** The index of the subtask that creates this snapshot. */ | ||
private final int subtaskIndex; | ||
/** Next id used to create label. */ | ||
private final long nextId; | ||
|
||
public ExactlyOnceLabelGeneratorSnapshot(long checkpointId, String db, String table, | ||
String labelPrefix, int numberOfSubtasks, int subtaskIndex, long nextId) { | ||
this.checkpointId = checkpointId; | ||
this.db = db; | ||
this.table = table; | ||
this.labelPrefix = labelPrefix; | ||
this.numberOfSubtasks = numberOfSubtasks; | ||
this.subtaskIndex = subtaskIndex; | ||
this.nextId = nextId; | ||
} | ||
|
||
public long getCheckpointId() { | ||
return checkpointId; | ||
} | ||
|
||
public String getDb() { | ||
return db; | ||
} | ||
|
||
public String getTable() { | ||
return table; | ||
} | ||
|
||
public String getLabelPrefix() { | ||
return labelPrefix; | ||
} | ||
|
||
public int getNumberOfSubtasks() { | ||
return numberOfSubtasks; | ||
} | ||
|
||
public int getSubtaskIndex() { | ||
return subtaskIndex; | ||
} | ||
|
||
public long getNextId() { | ||
return nextId; | ||
} | ||
|
||
public ExactlyOnceLabelGenerator.LabelDbTableSubtask createLabelDbTableSubtask() { | ||
return new ExactlyOnceLabelGenerator.LabelDbTableSubtask(labelPrefix, db, table, subtaskIndex); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "ExactlyOnceLabelGeneratorSnapshot{" + | ||
"checkpointId=" + checkpointId + | ||
", db='" + db + '\'' + | ||
", table='" + table + '\'' + | ||
", labelPrefix='" + labelPrefix + '\'' + | ||
", numberOfSubtasks=" + numberOfSubtasks + | ||
", subtaskIndex=" + subtaskIndex + | ||
", nextId=" + nextId + | ||
'}'; | ||
} | ||
} |
Oops, something went wrong.